当前位置:   article > 正文

Springboot 整合 RocketMQ_springboot集成mq

springboot集成mq

一、RocketMQ整体结构

  • Producer:消息的发送者;举例:发信者。
  • Consumer:消息接受者;举例:收信者。
  • Broker:暂存和传输消息;举例:邮局。
  • Nameserver:管理broker;举例:各个邮局的管理机构
  • Topic:区分消息的种类;一个发送者可以发送一个或多个Topic;一个消息的接收者可以订阅一个或多个Topic消息。
  • Message Queue:相当于是Topic的分区;用于并行发送和接收消息。

二、在Linux服务器环境上搭建RocketMQ服务端

1、下载安装包

官网地址:http://rocketmq.apache.org/dowloading/releases/

2、解压压缩包,重命名文件夹为rocketmq (本人下载的 4.6.1版本)

  1. unzip rocketmq-all-4.6.1-bin-release .zip
  2. mv rocketmq-all-4.6.1-bin-release rocketmq

3、启动NameServer:

nohup sh mqnamesrv &

4、启动broker:

nohup sh mqbroker -n 192.168.12.250:9876  autoCreateTopicEnable=true  &

【注】autoCreateTopicEnable=true 表示可以动态创建topic

三、springboot项目中添加配置

1、pom.xml文件中引入依赖

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-client</artifactId>
  4. <version>4.2.0</version>
  5. </dependency>

2、配置文件application.yml中添加配置

  1. #RocketMQ:
  2. rocketmq:
  3. producer:
  4. isOnOff: on
  5. groupName: group1
  6. namesrvAddr: 192.168.12.250:9876
  7. topic: TRAFFIC_EVENT
  8. #tag: test
  9. maxMessageSize: 100
  10. sendMsgTimeout: 1000
  11. consumer:
  12. namesrvAddr: 192.168.12.250:9876
  13. groupName: group1
  14. topic: TRAFFIC_EVENT
  15. #tag: test
  16. consumeThreadMin: 20
  17. consumerThreadMax: 64

3、生产者

方式一、同步发送
Producer 向 broker 发送消息,阻塞当前线程等待 broker 响应 发送结果。

方式二、异步发送
Producer 首先构建一个向 broker 发送消息的任务,把该任务提交给线程池,等执行完该任务时,回调用户自定义的回调函数,执行处理结果。

方式三、Oneway 发送
Oneway 方式只负责发送请求,不等待应答,Producer 只负责把请求发出去,而不处理响应结果。

  1. // 1、同步发送
  2. SendResult sendResult = producer.send(msg);
  3. //2、异步发送
  4. producer.send(msg, new SendCallback() {
  5. @Override
  6. public void onSuccess(SendResult sendResult) {
  7. }
  8. @Override
  9. public void onException(Throwable e) {
  10. }
  11. });
  12. //3、 Oneway发送
  13. producer.sendOneway(msg);

4、消费者

方式一:PushConsumer
       push方式里,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。
缺点:实时性高,但增加服务端负载,消费端能力不同,如果push的速度过快,消费端会出现很多问题

方式二:PullConsumer
        pull方式里,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。
缺点:消费者从server端拉消息,主动权在消费端,可控性好,但是时间间隔不好设置,间隔太短,则空请求会多,浪费资源,间隔太长,则消息不能及时处理
 

推荐使用PushConsumer,直接上代码!

  1. @Component
  2. @Slf4j
  3. public class MQPushConsumer implements MessageListenerConcurrently {
  4. private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
  5. @Value("${rocketmq.consumer.namesrvAddr}")
  6. private String rocketmqNameServer;
  7. @Value("${rocketmq.consumer.topic}")
  8. private String topic;
  9. /**
  10. * 初始化
  11. */
  12. @PostConstruct //java5中引入,指在项目启动的时候执行这个方法
  13. public void start() {
  14. try {
  15. consumer.setNamesrvAddr(rocketmqNameServer);
  16. consumer.setVipChannelEnabled(false);
  17. //从消费队列头开始消费
  18. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  19. //consumer.setMessageModel(MessageModel.CLUSTERING);
  20. //订阅主题
  21. consumer.subscribe("TRAFFIC_EVENT","*");
  22. consumer.registerMessageListener(this);
  23. consumer.start();
  24. log.info("[启动日志]:MQ消费者已启动");
  25. } catch (MQClientException e) {
  26. e.printStackTrace();
  27. }
  28. }
  29. @Override
  30. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
  31. int index=0;
  32. try {
  33. for (;index<list.size();index++) {
  34. MessageExt msg = list.get(index);
  35. String messageBody = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
  36. log.info("MQ:消费者接受新消息:{}{}{}{}{}", msg.getMsgId(), msg.getTopic(), msg.getTags(), msg.getKeys(), messageBody);
  37. //拿到json通过websocket发给前端
  38. if (!StringUtils.isEmpty(messageBody) && MyWebSocket.getOnlineCount() >0) {
  39. MyWebSocket.sendInfo(messageBody);
  40. log.info("[webSocket发送]" + messageBody);
  41. }
  42. }
  43. } catch (Exception e) {
  44. log.error(e.getMessage(),e);
  45. }finally {
  46. if (index < list.size()) {
  47. consumeConcurrentlyContext.setAckIndex(index+1);
  48. }
  49. }
  50. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  51. }
  52. @PreDestroy //在关闭spring容器后释放一些资源*
  53. public void stop() {
  54. if (null != consumer) {
  55. consumer.shutdown();
  56. log.error("MQ: 关闭消费者");
  57. }
  58. }
  59. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/434689
推荐阅读
相关标签
  

闽ICP备14008679号