当前位置:   article > 正文

springboot集成阿里云rocketMQ代码示例_aliyun.openservices.ons.api.product

aliyun.openservices.ons.api.product

集成目标:完成生产者发送消息,消费者接收消息的整个流程

集成步骤:

    1、引入jar包依赖

  1. <!--rocketMq消息队列-->
  2. <dependency>
  3. <groupId>org.apache.rocketmq</groupId>
  4. <artifactId>rocketmq-client</artifactId>
  5. <version>4.3.0</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>com.aliyun.openservices</groupId>
  9. <artifactId>ons-client</artifactId>
  10. <version>1.8.4.Final</version>
  11. </dependency>

2、初始化生产者连接

  1. package com.gaozhen.webservicedemo.config;
  2. import com.aliyun.openservices.ons.api.ONSFactory;
  3. import com.aliyun.openservices.ons.api.Producer;
  4. import com.aliyun.openservices.ons.api.PropertyKeyConst;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.stereotype.Component;
  7. import javax.annotation.PostConstruct;
  8. import java.util.Properties;
  9. @Component
  10. public class RocketMqProducerConfiguration {
  11. @Value("GID_sgcc_1")
  12. private String producerGroupName;
  13. @Value("172.16.205.55:9876")
  14. private String namesrvAddr;
  15. @Value("36Rl3QPMNNXJifNC")
  16. private String accessKey;
  17. @Value("ENpAJPWOnKcSdKcXNkw5XVPGNMTYk0")
  18. private String secretKey;
  19. private static Producer producer;
  20. @PostConstruct
  21. public void init() {
  22. // producer 实例配置初始化
  23. Properties properties = new Properties();
  24. //您在控制台创建的Producer ID
  25. // properties.setProperty(PropertyKeyConst.ProducerId,RocketMqConfig.producerGroupName);
  26. properties.setProperty(PropertyKeyConst.ProducerId,producerGroupName);
  27. // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
  28. // properties.setProperty(PropertyKeyConst.AccessKey, RocketMqConfig.accessKey);
  29. properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
  30. // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
  31. //properties.setProperty(PropertyKeyConst.SecretKey, RocketMqConfig.secretKey);
  32. properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
  33. //设置发送超时时间,单位毫秒
  34. properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
  35. // 设置 TCP 接入域名(此处以公共云生产环境为例),设置 TCP 接入域名,进入 MQ 控制台的消费者管理页面,在左侧操作栏单击获取接入点获取
  36. // properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, RocketMqConfig.namesrvAddr);
  37. properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr);
  38. producer = ONSFactory.createProducer(properties);
  39. //在发送消息前,初始化调用start方法来启动Producer,只需调用一次即可,当项目关闭时,自动shutdown
  40. producer.start();
  41. }
  42. /**
  43. * 初始化生产者
  44. * @return
  45. */
  46. public Producer getProducer(){
  47. return producer;
  48. }
  49. }

3、使用初始化的生产者producer发送消息massage

  1. package com.gaozhen.webservicedemo.controller;
  2. import com.aliyun.openservices.ons.api.Message;
  3. import com.aliyun.openservices.ons.api.SendResult;
  4. import com.gaozhen.webservicedemo.config.RocketMqProducerConfiguration;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.beans.factory.annotation.Value;
  7. import org.springframework.web.bind.annotation.GetMapping;
  8. import org.springframework.web.bind.annotation.RestController;
  9. import java.util.Date;
  10. @RestController
  11. public class TestController {
  12. @Autowired
  13. private RocketMqProducerConfiguration rocketMqProducerConfiguration;
  14. @GetMapping("/sendMsg")
  15. public String sendMsg(){
  16. String toTopic = "topic_sx";
  17. String tag = "tag1";
  18. Message msg = new Message(toTopic, tag, "topic_sx,tag1发送的信息".getBytes());
  19. try {
  20. SendResult result = rocketMqProducerConfiguration.getProducer().send(msg);
  21. if(result!=null){
  22. System.out.println(new Date() + " Send mq message success. Topic is:"+ toTopic + " messageId is: " + result.getMessageId());
  23. } else {
  24. //logger.warn(".sendResult is null.........");
  25. System.out.println(".sendResult is null.........");
  26. }
  27. return "发送Mq消息成功";
  28. } catch (Exception e) {
  29. e.printStackTrace();
  30. return "发送Mq消息失败:"+ e.getMessage();
  31. }
  32. }
  33. }

4、初始化消费者监听listener

  1. package com.gaozhen.webservicedemo.config;
  2. import com.aliyun.openservices.ons.api.*;
  3. import com.aliyun.openservices.ons.api.PropertyKeyConst;
  4. import com.gaozhen.webservicedemo.service.RocketMqListener;
  5. import com.gaozhen.webservicedemo.util.UUIDUtil;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.beans.factory.annotation.Value;
  8. import org.springframework.stereotype.Component;
  9. import javax.annotation.PostConstruct;
  10. import java.util.Properties;
  11. @Component
  12. public class RocketMqConsumerConfiguration {
  13. @Autowired
  14. RocketMqListener rocketMqListener;
  15. @Value("GID_sgcc_1")
  16. private String consumerGroupName;
  17. @Value("172.16.205.55:9876")
  18. private String namesrvAddr;
  19. @Value("36Rl3QPMNNXJifNC")
  20. private String accessKey;
  21. @Value("ENpAJPWOnKcSdKcXNkw5XVPGNMTYk0")
  22. private String secretKey;
  23. public static final String tag = "tag1";
  24. private static Consumer consumer;
  25. @PostConstruct
  26. public void init() {
  27. // consumer 实例配置初始化
  28. Properties properties = new Properties();
  29. //您在控制台创建的consumer ID
  30. //properties.setProperty(PropertyKeyConst.ConsumerId, RocketMqConfig.consumerGroupName);
  31. properties.setProperty(PropertyKeyConst.ConsumerId, consumerGroupName);
  32. // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
  33. //properties.setProperty(PropertyKeyConst.AccessKey, RocketMqConfig.accessKey);
  34. properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
  35. // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
  36. //properties.setProperty(PropertyKeyConst.SecretKey, RocketMqConfig.secretKey);
  37. properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
  38. //设置发送超时时间,单位毫秒
  39. properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
  40. // 设置 TCP 接入域名(此处以公共云生产环境为例),设置 TCP 接入域名,进入 MQ 控制台的消费者管理页面,在左侧操作栏单击获取接入点获取
  41. //properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, RocketMqConfig.namesrvAddr);
  42. properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr);
  43. properties.setProperty(PropertyKeyConst.InstanceName, UUIDUtil.getUUID32());
  44. consumer = ONSFactory.createConsumer(properties);
  45. //------------------------------订阅topic-------------------------------------------------
  46. consumer.subscribe("topic_sx",tag, rocketMqListener);//监听第一个topic,new对应的监听器
  47. // 在发送消息前,必须调用start方法来启动consumer,只需调用一次即可,当项目关闭时,自动shutdown
  48. consumer.start();
  49. System.out.println("ConsumerConfig start success.");
  50. }
  51. /**
  52. * 初始化消费者
  53. * @return
  54. */
  55. public Consumer getconsumer(){
  56. return consumer;
  57. }
  58. }

 

5、其中的rocketMqListener实现MessageListener的自定义接收消息的监听类

  1. package com.gaozhen.webservicedemo.service;
  2. import com.aliyun.openservices.ons.api.Action;
  3. import com.aliyun.openservices.ons.api.ConsumeContext;
  4. import com.aliyun.openservices.ons.api.Message;
  5. import com.aliyun.openservices.ons.api.MessageListener;
  6. import org.springframework.stereotype.Service;
  7. @Service
  8. public class RocketMqListener implements MessageListener {
  9. @Override
  10. public Action consume(Message message, ConsumeContext consumeContext) {
  11. try {
  12. System.out.println("MessageListener.consume ok:" + message);
  13. byte[] body = message.getBody();
  14. String messageBody = new String(body);// 获取到接收的消息,由于接收到的是byte数组,所以需要转换成字符串
  15. System.out.println("收到发送的信息: " + messageBody);
  16. } catch (Exception e) {
  17. System.out.println("MessageListener.consume error:" + e.getMessage() );
  18. }
  19. System.out.println("MessageListener.Receive message");
  20. // 如果想测试消息重投的功能,可以将Action.CommitMessage 替换成Action.ReconsumeLater
  21. return Action.CommitMessage;
  22. }
  23. }

最后,当访问sendMsg接口,生产者讲发送一个条消息到制定的topic和tag中去,消费者也必须用相同的topic和tag来接收,其中topic和tag可以理解为消息的一级标题和二级标签,如果不清楚tag可以用通配符“*”或者null来接收全部topic的消息,groupid可以一致也可以不一致,具体三者的区别和用法,我将另外写一篇文章重点介绍

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/386600
推荐阅读
相关标签
  

闽ICP备14008679号