赞
踩
集成目标:完成生产者发送消息,消费者接收消息的整个流程
集成步骤:
1、引入jar包依赖
- <!--rocketMq消息队列-->
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-client</artifactId>
- <version>4.3.0</version>
- </dependency>
- <dependency>
- <groupId>com.aliyun.openservices</groupId>
- <artifactId>ons-client</artifactId>
- <version>1.8.4.Final</version>
- </dependency>
2、初始化生产者连接
- package com.gaozhen.webservicedemo.config;
-
- import com.aliyun.openservices.ons.api.ONSFactory;
- import com.aliyun.openservices.ons.api.Producer;
- import com.aliyun.openservices.ons.api.PropertyKeyConst;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.PostConstruct;
- import java.util.Properties;
-
- @Component
- public class RocketMqProducerConfiguration {
- @Value("GID_sgcc_1")
- private String producerGroupName;
-
- @Value("172.16.205.55:9876")
- private String namesrvAddr;
-
- @Value("36Rl3QPMNNXJifNC")
- private String accessKey;
-
- @Value("ENpAJPWOnKcSdKcXNkw5XVPGNMTYk0")
- private String secretKey;
-
- private static Producer producer;
-
- @PostConstruct
- public void init() {
- // producer 实例配置初始化
- Properties properties = new Properties();
- //您在控制台创建的Producer ID
- // properties.setProperty(PropertyKeyConst.ProducerId,RocketMqConfig.producerGroupName);
- properties.setProperty(PropertyKeyConst.ProducerId,producerGroupName);
- // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
- // properties.setProperty(PropertyKeyConst.AccessKey, RocketMqConfig.accessKey);
- properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
- // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
- //properties.setProperty(PropertyKeyConst.SecretKey, RocketMqConfig.secretKey);
- properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
- //设置发送超时时间,单位毫秒
- properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
- // 设置 TCP 接入域名(此处以公共云生产环境为例),设置 TCP 接入域名,进入 MQ 控制台的消费者管理页面,在左侧操作栏单击获取接入点获取
- // properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, RocketMqConfig.namesrvAddr);
- properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr);
- producer = ONSFactory.createProducer(properties);
- //在发送消息前,初始化调用start方法来启动Producer,只需调用一次即可,当项目关闭时,自动shutdown
- producer.start();
- }
-
- /**
- * 初始化生产者
- * @return
- */
- public Producer getProducer(){
- return producer;
- }
-
-
-
- }
3、使用初始化的生产者producer发送消息massage
- package com.gaozhen.webservicedemo.controller;
-
- import com.aliyun.openservices.ons.api.Message;
- import com.aliyun.openservices.ons.api.SendResult;
- import com.gaozhen.webservicedemo.config.RocketMqProducerConfiguration;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import java.util.Date;
-
- @RestController
- public class TestController {
-
- @Autowired
- private RocketMqProducerConfiguration rocketMqProducerConfiguration;
-
- @GetMapping("/sendMsg")
- public String sendMsg(){
-
- String toTopic = "topic_sx";
- String tag = "tag1";
- Message msg = new Message(toTopic, tag, "topic_sx,tag1发送的信息".getBytes());
- try {
- SendResult result = rocketMqProducerConfiguration.getProducer().send(msg);
- if(result!=null){
- System.out.println(new Date() + " Send mq message success. Topic is:"+ toTopic + " messageId is: " + result.getMessageId());
- } else {
- //logger.warn(".sendResult is null.........");
- System.out.println(".sendResult is null.........");
- }
- return "发送Mq消息成功";
- } catch (Exception e) {
- e.printStackTrace();
- return "发送Mq消息失败:"+ e.getMessage();
- }
- }
-
- }
4、初始化消费者监听listener
- package com.gaozhen.webservicedemo.config;
-
- import com.aliyun.openservices.ons.api.*;
- import com.aliyun.openservices.ons.api.PropertyKeyConst;
- import com.gaozhen.webservicedemo.service.RocketMqListener;
- import com.gaozhen.webservicedemo.util.UUIDUtil;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.PostConstruct;
- import java.util.Properties;
-
- @Component
- public class RocketMqConsumerConfiguration {
- @Autowired
- RocketMqListener rocketMqListener;
-
- @Value("GID_sgcc_1")
- private String consumerGroupName;
-
- @Value("172.16.205.55:9876")
- private String namesrvAddr;
-
- @Value("36Rl3QPMNNXJifNC")
- private String accessKey;
-
- @Value("ENpAJPWOnKcSdKcXNkw5XVPGNMTYk0")
- private String secretKey;
-
- public static final String tag = "tag1";
-
- private static Consumer consumer;
-
- @PostConstruct
- public void init() {
- // consumer 实例配置初始化
- Properties properties = new Properties();
- //您在控制台创建的consumer ID
- //properties.setProperty(PropertyKeyConst.ConsumerId, RocketMqConfig.consumerGroupName);
- properties.setProperty(PropertyKeyConst.ConsumerId, consumerGroupName);
- // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
- //properties.setProperty(PropertyKeyConst.AccessKey, RocketMqConfig.accessKey);
- properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
- // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
- //properties.setProperty(PropertyKeyConst.SecretKey, RocketMqConfig.secretKey);
- properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
- //设置发送超时时间,单位毫秒
- properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
- // 设置 TCP 接入域名(此处以公共云生产环境为例),设置 TCP 接入域名,进入 MQ 控制台的消费者管理页面,在左侧操作栏单击获取接入点获取
- //properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, RocketMqConfig.namesrvAddr);
- properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr);
- properties.setProperty(PropertyKeyConst.InstanceName, UUIDUtil.getUUID32());
- consumer = ONSFactory.createConsumer(properties);
- //------------------------------订阅topic-------------------------------------------------
- consumer.subscribe("topic_sx",tag, rocketMqListener);//监听第一个topic,new对应的监听器
- // 在发送消息前,必须调用start方法来启动consumer,只需调用一次即可,当项目关闭时,自动shutdown
- consumer.start();
- System.out.println("ConsumerConfig start success.");
- }
-
- /**
- * 初始化消费者
- * @return
- */
- public Consumer getconsumer(){
- return consumer;
- }
- }
5、其中的rocketMqListener实现MessageListener的自定义接收消息的监听类
- package com.gaozhen.webservicedemo.service;
-
- import com.aliyun.openservices.ons.api.Action;
- import com.aliyun.openservices.ons.api.ConsumeContext;
- import com.aliyun.openservices.ons.api.Message;
- import com.aliyun.openservices.ons.api.MessageListener;
- import org.springframework.stereotype.Service;
- @Service
- public class RocketMqListener implements MessageListener {
-
- @Override
- public Action consume(Message message, ConsumeContext consumeContext) {
- try {
- System.out.println("MessageListener.consume ok:" + message);
- byte[] body = message.getBody();
- String messageBody = new String(body);// 获取到接收的消息,由于接收到的是byte数组,所以需要转换成字符串
- System.out.println("收到发送的信息: " + messageBody);
-
- } catch (Exception e) {
- System.out.println("MessageListener.consume error:" + e.getMessage() );
- }
- System.out.println("MessageListener.Receive message");
- // 如果想测试消息重投的功能,可以将Action.CommitMessage 替换成Action.ReconsumeLater
- return Action.CommitMessage;
- }
-
- }
最后,当访问sendMsg接口,生产者讲发送一个条消息到制定的topic和tag中去,消费者也必须用相同的topic和tag来接收,其中topic和tag可以理解为消息的一级标题和二级标签,如果不清楚tag可以用通配符“*”或者null来接收全部topic的消息,groupid可以一致也可以不一致,具体三者的区别和用法,我将另外写一篇文章重点介绍
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。