赞
踩
- <!--RocketMQ依赖-->
- <dependency>
- <groupId>com.alibaba.rocketmq</groupId>
- <artifactId>rocketmq-client</artifactId>
- <version>3.2.6</version>
- </dependency>
2、配置MQ文件
- #数据库连接池配置
- spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
- spring.datasource.platform=mysql
- #配置数据源 用户名 密码 驱动
- spring.datasource.url=jdbc:mysql://localhost:3306/asia
- spring.datasource.username=root
- spring.datasource.password=123456
- spring.datasource.driver-class-name=com.mysql.jdbc.Driver
- #Mybatis配置
- #扫描的实体的包
- mybatis.typeAliasesPackage=com.asia.bean
- #扫描的配置文件地址
- mybatis.mapperLocations=classpath:mapper/*.xml
- #mybatis全局配置文件的地址
- mybatis.configLocation=classpath:mybatis-config.xml
- #项目启动端口
- server.port=8088
-
- #配置kafka
- #zk地址
- kafka.consumer.zookeeper.connect=127.0.0.1:2181
- #消费者服务提供配置
- kafka.consumer.servers=127.0.0.1:9092
- #是否自动提交
- kafka.consumer.enable.auto.commit=false
- #超时时间
- kafka.consumer.session.timeout=6000
- #自动提交的间隔
- kafka.consumer.auto.commit.interval=100
- #实时生产,实时消费,不会从头开始消费,earliest,为从头开始消费latest
- kafka.consumer.auto.offset.reset=earliest
- #配置topics
- kafka.consumer.topic=liutopic
-
- kafka.consumer.group.id=liutopic
- #设置消费的线程数
- kafka.consumer.concurrency=10
-
- #消息提供者地址
- kafka.producer.servers=127.0.0.1:9092
- #发送失败的消息,再次发送的次数
- kafka.producer.retries=0
- #批量发送消息的数量,每次批量发送消息的数量,produce积累到一定数据,一次发送
- kafka.producer.batch.size=4096
- #该参数指定了生产者在发送批次之前等待更多消息加入批次的时间
- kafka.producer.linger=1
- #批量数据的缓冲区,produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
- kafka.producer.buffer.memory=40960
-
-
- #redis配置
- ##项目启动端口
- #server.port=8088
- #应用上下文
- server.context-path = /SpringBootStudy
- #redis相关配置
- # Redis数据库索引(默认为0)
- spring.redis.database=0
- # Redis服务器地址
- spring.redis.host=127.0.0.1
- # Redis服务器连接端口
- spring.redis.port=6379
- # Redis服务器连接密码(默认为空)
- #spring.redis.password=
- # 连接池最大连接数(使用负值表示没有限制)
- spring.redis.pool.max-active=200
- # 连接池最大阻塞等待时间(使用负值表示没有限制)
- spring.redis.pool.max-wait=-1
- # 连接池中的最大空闲连接
- spring.redis.pool.max-idle=10
- # 连接池中的最小空闲连接
- spring.redis.pool.min-idle=0
- # 连接超时时间(毫秒)
- spring.redis.timeout=1000
-
- #redis集群配置
- cache.clusterNodes:127.0.0.1:6380,127.0.0.1:6381,127.0.0.1:6382,127.0.0.1:6383,127.0.0.1:6384,127.0.0.1:6385
- ##连接超时时间,设置为5秒
- cache.commandTimeout:5000
-
- #RocketMQ配置
- ###producer
- #该应用是否启用生产者
- rocketmq.producer.isOnOff=on
- #发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
- rocketmq.producer.groupName=SpringBootStudy
- #mq的nameserver地址
- rocketmq.producer.namesrvAddr=127.0.0.1:9876
- #消息最大长度 默认1024*4(4M)
- rocketmq.producer.maxMessageSize=4096
- #发送消息超时时间,默认3000
- rocketmq.producer.sendMsgTimeout=3000
- #发送消息失败重试次数,默认2
- rocketmq.producer.retryTimesWhenSendFailed=2
-
- ###consumer
- ##该应用是否启用消费者
- rocketmq.consumer.isOnOff=on
- rocketmq.consumer.groupName=SpringBootStudy
- #mq的nameserver地址
- rocketmq.consumer.namesrvAddr=127.0.0.1:9876
- #主题需要再服务器上创建好,否则发送的时候,会报错找不到,
- #启动nameSer和Broket(mqbroker -n 127.0.0.1:9876),执行下面的命令来创建topic(在bin目录下),10911为启动Broket日志中显示
- #mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t TEST_TOPIC
- rocketmq.consumer.topics=TEST_TOPIC
- rocketmq.consumer.consumeThreadMin=20
- rocketmq.consumer.consumeThreadMax=64
- #设置一次消费消息的条数,默认为1条
- rocketmq.consumer.consumeMessageBatchMaxSize=3

- package com.asia.web.RocketMq;
-
- import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class MqProducer {
-
- Logger logger = LoggerFactory .getLogger(this.getClass());
-
- @Value("${rocketmq.producer.namesrvAddr}")
- private String nameService;
- @Value("${rocketmq.producer.groupName}")
- private String producerGroupName;
-
-
- @Bean
- public DefaultMQProducer getMqProducer(){
- //其他的配置属性没有设置,,均为默认
- try {
- DefaultMQProducer producer = new DefaultMQProducer(producerGroupName);
- producer.setNamesrvAddr(nameService);
- producer.start();
- logger.info("-----生产者开始生产-----");
- return producer;
-
- }catch (Exception e){
- logger.error("-----获取MQ生产者异常",e);
- }
- return null;
- }
-
- }

- package com.asia.web.RocketMq;
-
- import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
- import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
- import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class MqConsumer {
-
- Logger logger = LoggerFactory.getLogger(this.getClass());
-
- @Value("${rocketmq.producer.namesrvAddr}")
- private String nameService;
- @Value("${rocketmq.consumer.groupName}")
- private String comsumerGroupName;
- @Value("${rocketmq.consumer.topics}")
- private String topic;
- @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
- private int consumerMessMaxSize;
- @Autowired
- private MyMqListener myMqListener;
-
- @Bean
- public DefaultMQPushConsumer getMqConsumer(){
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(comsumerGroupName);
- try {
- consumer.setNamesrvAddr(nameService);
- consumer.setConsumeMessageBatchMaxSize(consumerMessMaxSize);
- //订阅主题,表示订阅该主题下的所有tag
- consumer.subscribe(topic, "*");
- // 如果是格式:topic~tag1||tag2||tag3;topic2~*;等等,则需要循环
- // consumer.subscribe("topic", "tag1||tag2||tag3");// * 代表订阅topic下的所有消息
- // String[] topics = topic.split(";");
- // for(String tp:topics){
- // String[] topicAndTag = tp.split("~");
- // consumer.subscribe(topicAndTag[0],topicAndTag[1]);
- // }
- //默认是集群模式的消费类型
- consumer.setMessageModel(MessageModel.CLUSTERING);
- /**
- * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
- * 如果非第一次启动,那么按照上次消费的位置继续消费CONSUME_FROM_LAST_OFFSET
- */
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
- //监听类
- consumer.registerMessageListener(myMqListener);
- // 启动
- consumer.start();
- logger.info("-----消费者启动-------");
- }catch (Exception e){
- logger.error("-----获取MQ消费者异常------");
- }
- return consumer;
- }
- }

- package com.asia.web.RocketMq;
-
- import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
- import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
- import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
- import com.alibaba.rocketmq.common.message.MessageExt;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.boot.CommandLineRunner;
- import org.springframework.stereotype.Component;
-
- import java.util.List;
-
- @Component
- public class MyMqListener implements MessageListenerConcurrently {
-
- Logger logger = LoggerFactory.getLogger(this.getClass());
-
-
- // 默认list里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
- try{
- //虽然是个List,但是MQ每次都是一条一条的消费
- // for(MessageExt mess : list){
- logger.info("-------开始监听------");
- MessageExt mess = list.get(0);
- String message = new String(mess.getBody());
- String topic = mess.getTopic();
- String tag = mess.getTags();
- //重复消费的次数,直到返回成功
- int count = mess.getReconsumeTimes();
- logger.info("---重复消费的次数="+count);
- logger.info("------消费的消息为:"+message);
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- // }
- }catch (Exception e){
- logger.error("-----消费异常----");
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- }

- package com.asia.web.Controller;
-
- import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
- import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
- import com.alibaba.rocketmq.common.message.Message;
- import com.asia.bean.UserDO;
- import com.asia.web.RocketMq.MqProducer;
- import com.asia.web.cache.JsonUtils;
- import kafka.message.MessageWriter;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import java.util.ArrayList;
- import java.util.List;
-
- @RestController
- public class RocketMqController {
-
- Logger logger = LoggerFactory.getLogger(this.getClass());
- /**
- * (和KAFKA很像,但是KAFKA的分布式结构,性能更好)
- * 0、添加依赖文件
- * 1、配置文件(服务地址、提供者和消费者的组名,主题(主题一般都是服务器先创建好,否则向该主题发送的时候会报错))
- * 2、创建提供者和消费者,消费者需要创建监听类、
- * 3、创建监听类需要实现MQ监听类接口
- * 4、进行消费
- */
- @Autowired
- private DefaultMQProducer producer;
-
- @RequestMapping(value = "/senMqmess")
- public String testMqSend() {
- try {
- List<UserDO> list = new ArrayList();
- UserDO userDO = new UserDO();
- UserDO userDO1 = new UserDO();
- userDO.setUserNm("dog");
- userDO.setUserId(0001L);
- userDO1.setUserNm("大黄");
- userDO1.setUserId(0002L);
- list.add(userDO);
- list.add(userDO1);
- String jsonStr = JsonUtils.convertObject2Json(list);
- Message message = new Message("TEST_TOPIC",jsonStr.getBytes());
- producer.send(message);
- return "success";
- } catch (Exception e) {
- logger.error("-----发送MQ消息异常", e);
- }
- return "";
-
- }
-
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。