赞
踩
1、引入依赖
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-spring-boot-starter</artifactId>
- <version>2.0.2</version>
- </dependency>
2、消息生产者类
- package com.lx.business.mq;
-
- import org.apache.rocketmq.client.producer.DefaultMQProducer;
- import org.apache.rocketmq.client.producer.SendResult;
- import org.apache.rocketmq.common.message.Message;
-
- public class RocketMqSendTest {
-
- //发送消息
- public static void main(String[] args) throws Exception {
- //1. 创建消息生产者, 指定生产者所属的组名
- DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
- //2. 指定Nameserver地址
- producer.setNamesrvAddr("127.0.0.1:9876");
- //3. 启动生产者
- producer.start();
- //4. 创建消息对象,指定主题、标签和消息体
- Message msg = new Message("myTopic", "myTag", ("RocketMQ Message").getBytes());
-
- //5. 发送消息
- SendResult sendResult = producer.send(msg, 10000);
- System.out.println(sendResult);
- //6. 关闭生产者
- producer.shutdown();
- }
- }
3、消费者类
- package com.lx.business.mq;
-
- import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
- import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.common.message.MessageExt;
-
- import java.util.List;
-
- public class RocketMqReceiveTest {
-
- public static void main(String[] args) throws MQClientException {
- //1. 创建消息消费者, 指定消费者所属的组名
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-group");
- //2. 指定Nameserver地址
- consumer.setNamesrvAddr("127.0.0.1:9876");
- //3. 指定消费者订阅的主题和标签
- consumer.subscribe("myTopic", "*");
- //4. 设置回调函数,编写处理消息的方法
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
- msgs,
- ConsumeConcurrentlyContext
- context) {
- System.out.println("Receive New Messages: " + msgs);
- //返回消费状态
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
- //5. 启动消息消费者
- consumer.start();
- System.out.println("Consumer Started.");
- }
- }
4、先启动消费者,一直监听
5、再启动消费者
1、pom文件
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-spring-boot-starter</artifactId>
- <version>2.0.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-client</artifactId>
- <version>4.4.0</version>
- </dependency>
2、yml 配置
- rocketmq:
- name-server: 127.0.0.1:9876 #rocketMQ服务的地址
- producer:
- group: sale-order # 生产者组
3、java测试类、生产者类
- package com.lx.business.mq;
-
- import com.lx.business.feign.StockFeignService;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.rocketmq.spring.core.RocketMQTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RequestParam;
- import org.springframework.web.bind.annotation.RestController;
-
- @RestController
- @Slf4j
- public class ContractController {
-
- @Autowired
- private StockFeignService stockFeignService;
-
- @Autowired
- private RocketMQTemplate rocketMQTemplate;
-
- @GetMapping(value = "/feignTest")
- public String feignTest(@RequestParam String id){
- rocketMQTemplate.convertAndSend("order-topic", byStockId);
- return byStockId;
- }
- }
4、消费者服务 pom
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-spring-boot-starter</artifactId>
- <version>2.0.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-client</artifactId>
- <version>4.4.0</version>
- </dependency>
5、yml配置
- rocketmq:
- name-server: 127.0.0.1:9876
6、消费者测试类
- package com.lx.business.mq;
-
- import com.alibaba.fastjson.JSON;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
- import org.apache.rocketmq.spring.core.RocketMQListener;
- import org.springframework.stereotype.Service;
-
- @Slf4j
- @Service
- @RocketMQMessageListener(consumerGroup = "shop-user", topic = "order-topic")
- public class TestRocketMQListener implements RocketMQListener<String> {
-
- @Override
- public void onMessage(String order) {
- log.info("收到一个消息", JSON.toJSONString(order));
- }
-
- }
- package com.lx.business.mq;
-
- import com.lx.business.feign.StockFeignService;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.rocketmq.client.producer.SendResult;
- import org.apache.rocketmq.client.producer.SendStatus;
- import org.apache.rocketmq.spring.core.RocketMQTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RequestParam;
- import org.springframework.web.bind.annotation.RestController;
-
- @RestController
- @Slf4j
- public class ContractController {
-
- @Autowired
- private StockFeignService stockFeignService;
-
- @Autowired
- private RocketMQTemplate rocketMQTemplate;
-
- @GetMapping(value = "/feignTest")
- public String feignTest(@RequestParam String id){
- String byStockId = stockFeignService.getByStockId(id);
- rocketMQTemplate.convertAndSend("order-topic", byStockId);
- return byStockId;
- }
-
- @GetMapping(value = "/mqTest1")
- public String mqTest1(@RequestParam String id){
- String byStockId = stockFeignService.getByStockId(id);
- // 同步消息
- SendResult sendResult = rocketMQTemplate.syncSend("sale-topic", byStockId);
- SendStatus sendStatus = sendResult.getSendStatus();
- return sendStatus.toString();
- }
-
-
- }
消费者端
- package com.lx.business.mq;
-
- import com.alibaba.fastjson.JSON;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
- import org.apache.rocketmq.spring.core.RocketMQListener;
- import org.springframework.stereotype.Service;
-
- @Slf4j
- @Service
- @RocketMQMessageListener(consumerGroup = "shop-user2", topic = "sale-topic")
- public class TestRocketMQListener2 implements RocketMQListener<String> {
-
- @Override
- public void onMessage(String message) {
- log.info("收到一个消息", JSON.toJSONString(message));
- }
-
- }
消息生产者使用Test 单元测试,
-
- //异步消息
- @Test
- public void testAsyncSend() throws InterruptedException {
- //参数一: topic, 如果想添加tag 可以使用"topic:tag"的写法
- //参数二: 消息内容
- //参数三: 回调函数, 处理返回结果
- rocketMQTemplate.asyncSend("sale-topic", "这是一条异步消息", new
- SendCallback() {
- @Override
- public void onSuccess(SendResult sendResult) {
- System.out.println(sendResult);
- }
-
- @Override
- public void onException(Throwable throwable) {
- System.out.println(throwable);
- }
- });
- //让线程不要终止
- Thread.sleep(30000000);
- }
-
- //单向消息
- @Test
- public void testOneWay() {
- rocketMQTemplate.sendOneWay("sale-topic", "这是一条单向消息");
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。