当前位置:   article > 正文

RocketMQ使用_rocketmq最大连接数

rocketmq最大连接数

1、加入MQ的依赖文件


  1. <!--RocketMQ依赖-->
  2. <dependency>
  3. <groupId>com.alibaba.rocketmq</groupId>
  4. <artifactId>rocketmq-client</artifactId>
  5. <version>3.2.6</version>
  6. </dependency>

2、配置MQ文件

  1. #数据库连接池配置
  2. spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
  3. spring.datasource.platform=mysql
  4. #配置数据源 用户名 密码 驱动
  5. spring.datasource.url=jdbc:mysql://localhost:3306/asia
  6. spring.datasource.username=root
  7. spring.datasource.password=123456
  8. spring.datasource.driver-class-name=com.mysql.jdbc.Driver
  9. #Mybatis配置
  10. #扫描的实体的包
  11. mybatis.typeAliasesPackage=com.asia.bean
  12. #扫描的配置文件地址
  13. mybatis.mapperLocations=classpath:mapper/*.xml
  14. #mybatis全局配置文件的地址
  15. mybatis.configLocation=classpath:mybatis-config.xml
  16. #项目启动端口
  17. server.port=8088
  18. #配置kafka
  19. #zk地址
  20. kafka.consumer.zookeeper.connect=127.0.0.1:2181
  21. #消费者服务提供配置
  22. kafka.consumer.servers=127.0.0.1:9092
  23. #是否自动提交
  24. kafka.consumer.enable.auto.commit=false
  25. #超时时间
  26. kafka.consumer.session.timeout=6000
  27. #自动提交的间隔
  28. kafka.consumer.auto.commit.interval=100
  29. #实时生产,实时消费,不会从头开始消费,earliest,为从头开始消费latest
  30. kafka.consumer.auto.offset.reset=earliest
  31. #配置topics
  32. kafka.consumer.topic=liutopic
  33. kafka.consumer.group.id=liutopic
  34. #设置消费的线程数
  35. kafka.consumer.concurrency=10
  36. #消息提供者地址
  37. kafka.producer.servers=127.0.0.1:9092
  38. #发送失败的消息,再次发送的次数
  39. kafka.producer.retries=0
  40. #批量发送消息的数量,每次批量发送消息的数量,produce积累到一定数据,一次发送
  41. kafka.producer.batch.size=4096
  42. #该参数指定了生产者在发送批次之前等待更多消息加入批次的时间
  43. kafka.producer.linger=1
  44. #批量数据的缓冲区,produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
  45. kafka.producer.buffer.memory=40960
  46. #redis配置
  47. ##项目启动端口
  48. #server.port=8088
  49. #应用上下文
  50. server.context-path = /SpringBootStudy
  51. #redis相关配置
  52. # Redis数据库索引(默认为0
  53. spring.redis.database=0
  54. # Redis服务器地址
  55. spring.redis.host=127.0.0.1
  56. # Redis服务器连接端口
  57. spring.redis.port=6379
  58. # Redis服务器连接密码(默认为空)
  59. #spring.redis.password=
  60. # 连接池最大连接数(使用负值表示没有限制)
  61. spring.redis.pool.max-active=200
  62. # 连接池最大阻塞等待时间(使用负值表示没有限制)
  63. spring.redis.pool.max-wait=-1
  64. # 连接池中的最大空闲连接
  65. spring.redis.pool.max-idle=10
  66. # 连接池中的最小空闲连接
  67. spring.redis.pool.min-idle=0
  68. # 连接超时时间(毫秒)
  69. spring.redis.timeout=1000
  70. #redis集群配置
  71. cache.clusterNodes:127.0.0.1:6380,127.0.0.1:6381,127.0.0.1:6382,127.0.0.1:6383,127.0.0.1:6384,127.0.0.1:6385
  72. ##连接超时时间,设置为5
  73. cache.commandTimeout:5000
  74. #RocketMQ配置
  75. ###producer
  76. #该应用是否启用生产者
  77. rocketmq.producer.isOnOff=on
  78. #发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
  79. rocketmq.producer.groupName=SpringBootStudy
  80. #mq的nameserver地址
  81. rocketmq.producer.namesrvAddr=127.0.0.1:9876
  82. #消息最大长度 默认1024*4(4M)
  83. rocketmq.producer.maxMessageSize=4096
  84. #发送消息超时时间,默认3000
  85. rocketmq.producer.sendMsgTimeout=3000
  86. #发送消息失败重试次数,默认2
  87. rocketmq.producer.retryTimesWhenSendFailed=2
  88. ###consumer
  89. ##该应用是否启用消费者
  90. rocketmq.consumer.isOnOff=on
  91. rocketmq.consumer.groupName=SpringBootStudy
  92. #mq的nameserver地址
  93. rocketmq.consumer.namesrvAddr=127.0.0.1:9876
  94. #主题需要再服务器上创建好,否则发送的时候,会报错找不到,
  95. #启动nameSer和Broket(mqbroker -n 127.0.0.1:9876),执行下面的命令来创建topic(在bin目录下),10911为启动Broket日志中显示
  96. #mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t TEST_TOPIC
  97. rocketmq.consumer.topics=TEST_TOPIC
  98. rocketmq.consumer.consumeThreadMin=20
  99. rocketmq.consumer.consumeThreadMax=64
  100. #设置一次消费消息的条数,默认为1
  101. rocketmq.consumer.consumeMessageBatchMaxSize=3

3、创建生产者


  1. package com.asia.web.RocketMq;
  2. import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. @Configuration
  9. public class MqProducer {
  10. Logger logger = LoggerFactory .getLogger(this.getClass());
  11. @Value("${rocketmq.producer.namesrvAddr}")
  12. private String nameService;
  13. @Value("${rocketmq.producer.groupName}")
  14. private String producerGroupName;
  15. @Bean
  16. public DefaultMQProducer getMqProducer(){
  17. //其他的配置属性没有设置,,均为默认
  18. try {
  19. DefaultMQProducer producer = new DefaultMQProducer(producerGroupName);
  20. producer.setNamesrvAddr(nameService);
  21. producer.start();
  22. logger.info("-----生产者开始生产-----");
  23. return producer;
  24. }catch (Exception e){
  25. logger.error("-----获取MQ生产者异常",e);
  26. }
  27. return null;
  28. }
  29. }

4、创建消费者


  1. package com.asia.web.RocketMq;
  2. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
  3. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
  4. import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
  5. import org.apache.kafka.clients.consumer.ConsumerConfig;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.beans.factory.annotation.Value;
  10. import org.springframework.context.annotation.Bean;
  11. import org.springframework.context.annotation.Configuration;
  12. @Configuration
  13. public class MqConsumer {
  14. Logger logger = LoggerFactory.getLogger(this.getClass());
  15. @Value("${rocketmq.producer.namesrvAddr}")
  16. private String nameService;
  17. @Value("${rocketmq.consumer.groupName}")
  18. private String comsumerGroupName;
  19. @Value("${rocketmq.consumer.topics}")
  20. private String topic;
  21. @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
  22. private int consumerMessMaxSize;
  23. @Autowired
  24. private MyMqListener myMqListener;
  25. @Bean
  26. public DefaultMQPushConsumer getMqConsumer(){
  27. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(comsumerGroupName);
  28. try {
  29. consumer.setNamesrvAddr(nameService);
  30. consumer.setConsumeMessageBatchMaxSize(consumerMessMaxSize);
  31. //订阅主题,表示订阅该主题下的所有tag
  32. consumer.subscribe(topic, "*");
  33. // 如果是格式:topic~tag1||tag2||tag3;topic2~*;等等,则需要循环
  34. // consumer.subscribe("topic", "tag1||tag2||tag3");// * 代表订阅topic下的所有消息
  35. // String[] topics = topic.split(";");
  36. // for(String tp:topics){
  37. // String[] topicAndTag = tp.split("~");
  38. // consumer.subscribe(topicAndTag[0],topicAndTag[1]);
  39. // }
  40. //默认是集群模式的消费类型
  41. consumer.setMessageModel(MessageModel.CLUSTERING);
  42. /**
  43. * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
  44. * 如果非第一次启动,那么按照上次消费的位置继续消费CONSUME_FROM_LAST_OFFSET
  45. */
  46. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  47. //监听类
  48. consumer.registerMessageListener(myMqListener);
  49. // 启动
  50. consumer.start();
  51. logger.info("-----消费者启动-------");
  52. }catch (Exception e){
  53. logger.error("-----获取MQ消费者异常------");
  54. }
  55. return consumer;
  56. }
  57. }

5、创建监听类


  1. package com.asia.web.RocketMq;
  2. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  3. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  4. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  5. import com.alibaba.rocketmq.common.message.MessageExt;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import org.springframework.boot.CommandLineRunner;
  9. import org.springframework.stereotype.Component;
  10. import java.util.List;
  11. @Component
  12. public class MyMqListener implements MessageListenerConcurrently {
  13. Logger logger = LoggerFactory.getLogger(this.getClass());
  14. // 默认list里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
  15. @Override
  16. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
  17. try{
  18. //虽然是个List,但是MQ每次都是一条一条的消费
  19. // for(MessageExt mess : list){
  20. logger.info("-------开始监听------");
  21. MessageExt mess = list.get(0);
  22. String message = new String(mess.getBody());
  23. String topic = mess.getTopic();
  24. String tag = mess.getTags();
  25. //重复消费的次数,直到返回成功
  26. int count = mess.getReconsumeTimes();
  27. logger.info("---重复消费的次数="+count);
  28. logger.info("------消费的消息为:"+message);
  29. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  30. // }
  31. }catch (Exception e){
  32. logger.error("-----消费异常----");
  33. }
  34. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  35. }
  36. }

6、测试MQ

  1. package com.asia.web.Controller;
  2. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  3. import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
  4. import com.alibaba.rocketmq.common.message.Message;
  5. import com.asia.bean.UserDO;
  6. import com.asia.web.RocketMq.MqProducer;
  7. import com.asia.web.cache.JsonUtils;
  8. import kafka.message.MessageWriter;
  9. import org.slf4j.Logger;
  10. import org.slf4j.LoggerFactory;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.web.bind.annotation.RequestMapping;
  13. import org.springframework.web.bind.annotation.RestController;
  14. import java.util.ArrayList;
  15. import java.util.List;
  16. @RestController
  17. public class RocketMqController {
  18. Logger logger = LoggerFactory.getLogger(this.getClass());
  19. /**
  20. * (和KAFKA很像,但是KAFKA的分布式结构,性能更好)
  21. * 0、添加依赖文件
  22. * 1、配置文件(服务地址、提供者和消费者的组名,主题(主题一般都是服务器先创建好,否则向该主题发送的时候会报错))
  23. * 2、创建提供者和消费者,消费者需要创建监听类、
  24. * 3、创建监听类需要实现MQ监听类接口
  25. * 4、进行消费
  26. */
  27. @Autowired
  28. private DefaultMQProducer producer;
  29. @RequestMapping(value = "/senMqmess")
  30. public String testMqSend() {
  31. try {
  32. List<UserDO> list = new ArrayList();
  33. UserDO userDO = new UserDO();
  34. UserDO userDO1 = new UserDO();
  35. userDO.setUserNm("dog");
  36. userDO.setUserId(0001L);
  37. userDO1.setUserNm("大黄");
  38. userDO1.setUserId(0002L);
  39. list.add(userDO);
  40. list.add(userDO1);
  41. String jsonStr = JsonUtils.convertObject2Json(list);
  42. Message message = new Message("TEST_TOPIC",jsonStr.getBytes());
  43. producer.send(message);
  44. return "success";
  45. } catch (Exception e) {
  46. logger.error("-----发送MQ消息异常", e);
  47. }
  48. return "";
  49. }
  50. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/484977
推荐阅读
相关标签
  

闽ICP备14008679号