当前位置:   article > 正文

springcloud集成rocketMQ

springcloud集成rocketmq

1、简单测试rocketMQ

1、引入依赖

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-spring-boot-starter</artifactId>
  4. <version>2.0.2</version>
  5. </dependency>

2、消息生产者类

  1. package com.lx.business.mq;
  2. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  3. import org.apache.rocketmq.client.producer.SendResult;
  4. import org.apache.rocketmq.common.message.Message;
  5. public class RocketMqSendTest {
  6. //发送消息
  7. public static void main(String[] args) throws Exception {
  8. //1. 创建消息生产者, 指定生产者所属的组名
  9. DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
  10. //2. 指定Nameserver地址
  11. producer.setNamesrvAddr("127.0.0.1:9876");
  12. //3. 启动生产者
  13. producer.start();
  14. //4. 创建消息对象,指定主题、标签和消息体
  15. Message msg = new Message("myTopic", "myTag", ("RocketMQ Message").getBytes());
  16. //5. 发送消息
  17. SendResult sendResult = producer.send(msg, 10000);
  18. System.out.println(sendResult);
  19. //6. 关闭生产者
  20. producer.shutdown();
  21. }
  22. }

3、消费者类

  1. package com.lx.business.mq;
  2. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  4. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  5. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  6. import org.apache.rocketmq.client.exception.MQClientException;
  7. import org.apache.rocketmq.common.message.MessageExt;
  8. import java.util.List;
  9. public class RocketMqReceiveTest {
  10. public static void main(String[] args) throws MQClientException {
  11. //1. 创建消息消费者, 指定消费者所属的组名
  12. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-group");
  13. //2. 指定Nameserver地址
  14. consumer.setNamesrvAddr("127.0.0.1:9876");
  15. //3. 指定消费者订阅的主题和标签
  16. consumer.subscribe("myTopic", "*");
  17. //4. 设置回调函数,编写处理消息的方法
  18. consumer.registerMessageListener(new MessageListenerConcurrently() {
  19. @Override
  20. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
  21. msgs,
  22. ConsumeConcurrentlyContext
  23. context) {
  24. System.out.println("Receive New Messages: " + msgs);
  25. //返回消费状态
  26. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  27. }
  28. });
  29. //5. 启动消息消费者
  30. consumer.start();
  31. System.out.println("Consumer Started.");
  32. }
  33. }

4、先启动消费者,一直监听

5、再启动消费者

2、服务间调用

1、pom文件

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-spring-boot-starter</artifactId>
  4. <version>2.0.2</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.rocketmq</groupId>
  8. <artifactId>rocketmq-client</artifactId>
  9. <version>4.4.0</version>
  10. </dependency>

2、yml 配置

  1. rocketmq:
  2. name-server: 127.0.0.1:9876 #rocketMQ服务的地址
  3. producer:
  4. group: sale-order # 生产者组

3、java测试类、生产者类

  1. package com.lx.business.mq;
  2. import com.lx.business.feign.StockFeignService;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.rocketmq.spring.core.RocketMQTemplate;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.web.bind.annotation.GetMapping;
  7. import org.springframework.web.bind.annotation.RequestParam;
  8. import org.springframework.web.bind.annotation.RestController;
  9. @RestController
  10. @Slf4j
  11. public class ContractController {
  12. @Autowired
  13. private StockFeignService stockFeignService;
  14. @Autowired
  15. private RocketMQTemplate rocketMQTemplate;
  16. @GetMapping(value = "/feignTest")
  17. public String feignTest(@RequestParam String id){
  18. rocketMQTemplate.convertAndSend("order-topic", byStockId);
  19. return byStockId;
  20. }
  21. }

4、消费者服务 pom

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-spring-boot-starter</artifactId>
  4. <version>2.0.2</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.rocketmq</groupId>
  8. <artifactId>rocketmq-client</artifactId>
  9. <version>4.4.0</version>
  10. </dependency>

5、yml配置

  1. rocketmq:
  2. name-server: 127.0.0.1:9876

6、消费者测试类

  1. package com.lx.business.mq;
  2. import com.alibaba.fastjson.JSON;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
  5. import org.apache.rocketmq.spring.core.RocketMQListener;
  6. import org.springframework.stereotype.Service;
  7. @Slf4j
  8. @Service
  9. @RocketMQMessageListener(consumerGroup = "shop-user", topic = "order-topic")
  10. public class TestRocketMQListener implements RocketMQListener<String> {
  11. @Override
  12. public void onMessage(String order) {
  13. log.info("收到一个消息", JSON.toJSONString(order));
  14. }
  15. }

同步消息发送  生产者端

  1. package com.lx.business.mq;
  2. import com.lx.business.feign.StockFeignService;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.rocketmq.client.producer.SendResult;
  5. import org.apache.rocketmq.client.producer.SendStatus;
  6. import org.apache.rocketmq.spring.core.RocketMQTemplate;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.web.bind.annotation.GetMapping;
  9. import org.springframework.web.bind.annotation.RequestParam;
  10. import org.springframework.web.bind.annotation.RestController;
  11. @RestController
  12. @Slf4j
  13. public class ContractController {
  14. @Autowired
  15. private StockFeignService stockFeignService;
  16. @Autowired
  17. private RocketMQTemplate rocketMQTemplate;
  18. @GetMapping(value = "/feignTest")
  19. public String feignTest(@RequestParam String id){
  20. String byStockId = stockFeignService.getByStockId(id);
  21. rocketMQTemplate.convertAndSend("order-topic", byStockId);
  22. return byStockId;
  23. }
  24. @GetMapping(value = "/mqTest1")
  25. public String mqTest1(@RequestParam String id){
  26. String byStockId = stockFeignService.getByStockId(id);
  27. // 同步消息
  28. SendResult sendResult = rocketMQTemplate.syncSend("sale-topic", byStockId);
  29. SendStatus sendStatus = sendResult.getSendStatus();
  30. return sendStatus.toString();
  31. }
  32. }

消费者端

  1. package com.lx.business.mq;
  2. import com.alibaba.fastjson.JSON;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
  5. import org.apache.rocketmq.spring.core.RocketMQListener;
  6. import org.springframework.stereotype.Service;
  7. @Slf4j
  8. @Service
  9. @RocketMQMessageListener(consumerGroup = "shop-user2", topic = "sale-topic")
  10. public class TestRocketMQListener2 implements RocketMQListener<String> {
  11. @Override
  12. public void onMessage(String message) {
  13. log.info("收到一个消息", JSON.toJSONString(message));
  14. }
  15. }

异步消息和单向消息,消费者公用,生产者不同

消息生产者使用Test 单元测试,

  1. //异步消息
  2. @Test
  3. public void testAsyncSend() throws InterruptedException {
  4. //参数一: topic, 如果想添加tag 可以使用"topic:tag"的写法
  5. //参数二: 消息内容
  6. //参数三: 回调函数, 处理返回结果
  7. rocketMQTemplate.asyncSend("sale-topic", "这是一条异步消息", new
  8. SendCallback() {
  9. @Override
  10. public void onSuccess(SendResult sendResult) {
  11. System.out.println(sendResult);
  12. }
  13. @Override
  14. public void onException(Throwable throwable) {
  15. System.out.println(throwable);
  16. }
  17. });
  18. //让线程不要终止
  19. Thread.sleep(30000000);
  20. }
  21. //单向消息
  22. @Test
  23. public void testOneWay() {
  24. rocketMQTemplate.sendOneWay("sale-topic", "这是一条单向消息");
  25. }

 

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/713035
推荐阅读
相关标签
  

闽ICP备14008679号