当前位置:   article > 正文

工作中常用的RabbitMQ实践_rabbittemplate convertandsend routingkey传空

rabbittemplate convertandsend routingkey传空

目录

1.前置知识

准备工作

2.导入依赖

3.生产者

4.消费者

5.验证

验证Direct

验证Fanout

验证Topic

6.ACK消息确认补充


1.前置知识

rabbitmq有五种工作模式;按照有无交换机分为两大类

无交换机的:简单队列(一对一,单生产单消费)、工作队列(工作队列有轮训分发和公平分发两种模式)

有交换机:发布-订阅、路由模式、主题模式

准备工作

安装rabbitmq,并成功启动

2.导入依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

3.生产者

生产端项目结构:
 

逻辑:生产者只对交换机进行生产,至于队列绑定等放在消费端进行执行

BusinessConfig

定义了三个不同类型的交换机

direct类型:(当生产者往该交换机发送消息时,他必须指定固定的routingkey,当routingkey值为空,他也会匹配routingkey为空的队列)

fanout类型:(当生产者往该交换机发送消息时,他所绑定的队列都会收到消息,routingkey即使写了也会忽略,一般为空字符串)

Topic类型:(当生产者往该交换机发送消息时,他并不像direct指定固定的routingkey,可以进行模糊匹配,当该routingkey为空时,他会匹配routingkey为空的队列)

  1. package com.zsp.quartz.queue;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.context.annotation.Configuration;
  4. /**
  5. * @Author: ZhangSP
  6. * @Date: 2023/12/7 14:05
  7. */
  8. public class BusinessConfig {
  9. // 声明direct交换机
  10. public static final String EXCHANGE_DIRECT= "exchange_direct_inform";
  11. // 声明fanout交换机
  12. public static final String EXCHANGE_FANOUT= "exchange_fanout_inform";
  13. // 声明topic交换机
  14. public static final String EXCHANGE_TOPIC= "exchange_topic_inform";
  15. }

TestProducer

生产消息

  1. package com.zsp.quartz.queue;
  2. import com.alibaba.fastjson.JSON;
  3. import com.zsp.quartz.entity.User;
  4. import org.junit.Test;
  5. import org.junit.runner.RunWith;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.boot.test.context.SpringBootTest;
  9. import org.springframework.test.context.junit4.SpringRunner;
  10. @SpringBootTest
  11. @RunWith(SpringRunner.class)
  12. public class TestProducer {
  13. @Autowired
  14. RabbitTemplate rabbitTemplate;
  15. @Test
  16. public void Producer_topics_springbootTest() {
  17. //使用rabbitTemplate发送消息
  18. String message = "";
  19. User user = new User();
  20. user.setName("张三");
  21. user.setEmail("anjduahsd");
  22. message = JSON.toJSONString(user);
  23. // direct
  24. rabbitTemplate.convertAndSend(BusinessConfig.EXCHANGE_DIRECT,"",message);
  25. // fanout
  26. rabbitTemplate.convertAndSend(BusinessConfig.EXCHANGE_FANOUT,"",message);
  27. // topic
  28. rabbitTemplate.convertAndSend(BusinessConfig.EXCHANGE_TOPIC,"",message);
  29. }
  30. }

4.消费者

消费者目录结构:

BusinessConfig内容解析:

①定义交换机类型

②配置交换机与队列的绑定关系

③通过容器工厂声明队列

  1. package com.zsp.consumer.queue;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.amqp.rabbit.connection.Connection;
  6. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.beans.factory.annotation.Qualifier;
  9. import org.springframework.context.annotation.Configuration;
  10. import javax.annotation.PostConstruct;
  11. /**
  12. * @Author: ZhangSP
  13. * @Date: 2023/12/7 14:05
  14. */
  15. @Slf4j
  16. @Configuration
  17. public class BusinessConfig {
  18. // 声明direct
  19. public static final String EXCHANGE_DIRECT= "exchange_direct_inform";
  20. public static final String QUEUE_DIRECT_EMAIL = "queue_direct_inform_email";
  21. public static final String QUEUE_DIRECT_SMS = "queue_direct_inform_sms";
  22. public void BindDirectEmail(Channel channel) {
  23. try {
  24. channel.exchangeDeclare(EXCHANGE_DIRECT, BuiltinExchangeType.DIRECT.getType(), true);
  25. channel.queueDeclare(QUEUE_DIRECT_EMAIL, true, false, false, null);
  26. channel.queueBind(QUEUE_DIRECT_EMAIL, EXCHANGE_DIRECT, "");
  27. } catch (Exception e) {
  28. log.error("声明Direct->email队列时失败", e);
  29. }
  30. }
  31. public void BindDirectSms(Channel channel) {
  32. try {
  33. channel.exchangeDeclare(EXCHANGE_DIRECT, BuiltinExchangeType.DIRECT.getType(), true);
  34. channel.queueDeclare(QUEUE_DIRECT_SMS, true, false, false, null);
  35. channel.queueBind(QUEUE_DIRECT_SMS, EXCHANGE_DIRECT, "123");
  36. } catch (Exception e) {
  37. log.error("声明Direct->sms失败", e);
  38. }
  39. }
  40. // 声明fanout
  41. public static final String EXCHANGE_FANOUT= "exchange_fanout_inform";
  42. public static final String QUEUE_FANOUT_EMAIL = "queue_fanout_inform_email";
  43. public static final String QUEUE_FANOUT_SMS = "queue_fanout_inform_sms";
  44. public void BindFanoutEmail(Channel channel) {
  45. try {
  46. channel.exchangeDeclare(EXCHANGE_FANOUT, BuiltinExchangeType.FANOUT.getType(), true);
  47. channel.queueDeclare(QUEUE_FANOUT_EMAIL, true, false, false, null);
  48. channel.queueBind(QUEUE_FANOUT_EMAIL, EXCHANGE_FANOUT, "");
  49. } catch (Exception e) {
  50. log.error("声明Fanout->email队列时失败", e);
  51. }
  52. }
  53. public void BindFanoutSms(Channel channel) {
  54. try {
  55. channel.exchangeDeclare(EXCHANGE_FANOUT, BuiltinExchangeType.FANOUT.getType(), true);
  56. channel.queueDeclare(QUEUE_FANOUT_SMS, true, false, false, null);
  57. channel.queueBind(QUEUE_FANOUT_SMS, EXCHANGE_FANOUT,"");
  58. } catch (Exception e) {
  59. log.error("声明Fanout->sms失败", e);
  60. }
  61. }
  62. // 声明topic
  63. public static final String EXCHANGE_TOPIC= "exchange_topic_inform";
  64. public static final String QUEUE_TOPIC_EMAIL = "queue_topic_inform_email";
  65. public static final String QUEUE_TOPIC_SMS = "queue_topic_inform_sms";
  66. public static final String ROUTINGKEY_EMAIL="inform.#.email.#";
  67. public static final String ROUTINGKEY_SMS="inform.#.sms.#";
  68. public void BindTopicEmail(Channel channel) {
  69. try {
  70. channel.exchangeDeclare(EXCHANGE_TOPIC, BuiltinExchangeType.TOPIC.getType(),true);
  71. channel.queueDeclare(QUEUE_TOPIC_EMAIL, true, false, false, null);
  72. channel.queueBind(QUEUE_TOPIC_EMAIL, EXCHANGE_TOPIC, ROUTINGKEY_EMAIL);
  73. } catch (Exception e) {
  74. log.error("声明Topic->email队列时失败", e);
  75. }
  76. }
  77. public void BindTopicSms(Channel channel) {
  78. try {
  79. channel.exchangeDeclare(EXCHANGE_TOPIC, BuiltinExchangeType.TOPIC.getType(),true);
  80. channel.queueDeclare(QUEUE_TOPIC_SMS, true, false, false, null);
  81. channel.queueBind(QUEUE_TOPIC_SMS, EXCHANGE_TOPIC,"");
  82. } catch (Exception e) {
  83. log.error("声明Topic->sms失败", e);
  84. }
  85. }
  86. // 声明队列
  87. @Autowired
  88. @Qualifier(value = "zspConnectionFactory")
  89. private ConnectionFactory connectionFactory;
  90. @PostConstruct
  91. public void shengmingQueue() {
  92. try {
  93. Connection connection = connectionFactory.createConnection();
  94. Channel channel = connection.createChannel(false);
  95. BindDirectEmail(channel);
  96. BindDirectSms(channel);
  97. BindFanoutEmail(channel);
  98. BindFanoutSms(channel);
  99. BindTopicEmail(channel);
  100. BindTopicSms(channel);
  101. } catch (Exception e) {
  102. log.error("业务实例声明绑定队列报错:",e);
  103. }
  104. }
  105. }

RabbitFactory内容解析:

①创建自定义连接工厂

②通过@Qualifier准确注入连接工厂,创建个性化容器工厂

  1. package com.zsp.consumer.queue;
  2. import org.springframework.amqp.rabbit.annotation.EnableRabbit;
  3. import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
  4. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  5. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  6. import org.springframework.beans.factory.annotation.Qualifier;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. @Configuration
  10. @EnableRabbit
  11. public class RabbitFactory {
  12. @Bean("zspConnectionFactory")
  13. public ConnectionFactory connectionFactory() {
  14. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  15. // 设置RabbitMQ的连接信息,如主机名、端口号、用户名和密码等
  16. connectionFactory.setHost("localhost");
  17. connectionFactory.setPort(5672);
  18. connectionFactory.setUsername("root");
  19. connectionFactory.setPassword("root");
  20. return connectionFactory;
  21. }
  22. @Bean("rabbitListenerContainerFactory")
  23. public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(@Qualifier("zspConnectionFactory") ConnectionFactory connectionFactory) {
  24. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  25. factory.setConnectionFactory(connectionFactory);
  26. factory.setConcurrentConsumers(5);
  27. factory.setMaxConcurrentConsumers(10);
  28. return factory;
  29. }
  30. }

ReceiveHandler内容解析:

监听绑定的队列消息

  1. package com.zsp.consumer.queue;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.rabbitmq.client.Channel;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Component;
  7. @Component
  8. public class ReceiveHandler {
  9. //监听自定义的Direct队列
  10. @RabbitListener(queues = BusinessConfig.QUEUE_DIRECT_SMS, containerFactory = "rabbitListenerContainerFactory")
  11. public void directSMS(String msg, Message message, Channel channel) {
  12. JSONObject jsonObject = JSONObject.parseObject(msg);
  13. System.out.println("Direct队列->sms队列" + jsonObject);
  14. }
  15. @RabbitListener(queues = BusinessConfig.QUEUE_DIRECT_EMAIL, containerFactory = "rabbitListenerContainerFactory")
  16. public void directEmail(String msg, Message message, Channel channel) {
  17. JSONObject jsonObject = JSONObject.parseObject(msg);
  18. System.out.println("Direct队列->email队列" + jsonObject);
  19. }
  20. //监听自定义的Fanout队列
  21. @RabbitListener(queues = BusinessConfig.QUEUE_FANOUT_SMS, containerFactory = "rabbitListenerContainerFactory")
  22. public void FanoutSMS(String msg, Message message, Channel channel) {
  23. JSONObject jsonObject = JSONObject.parseObject(msg);
  24. System.out.println("Fanout队列->sms队列" + jsonObject);
  25. }
  26. @RabbitListener(queues = BusinessConfig.QUEUE_FANOUT_EMAIL, containerFactory = "rabbitListenerContainerFactory")
  27. public void FanoutEmail(String msg, Message message, Channel channel) {
  28. JSONObject jsonObject = JSONObject.parseObject(msg);
  29. System.out.println("Fanout队列->email队列" + jsonObject);
  30. }
  31. //监听自定义的Topic队列
  32. @RabbitListener(queues = BusinessConfig.QUEUE_TOPIC_SMS, containerFactory = "rabbitListenerContainerFactory")
  33. public void TopicSMS(String msg, Message message, Channel channel) {
  34. JSONObject jsonObject = JSONObject.parseObject(msg);
  35. System.out.println("Topic队列->sms队列" + jsonObject);
  36. }
  37. @RabbitListener(queues = BusinessConfig.QUEUE_TOPIC_EMAIL, containerFactory = "rabbitListenerContainerFactory")
  38. public void TopicEmail(String msg, Message message, Channel channel) {
  39. JSONObject jsonObject = JSONObject.parseObject(msg);
  40. System.out.println("Topic队列->email队列" + jsonObject);
  41. }
  42. }

5.验证

先启动消费者端,然后执行TestProducer

验证Direct

1.向routingkey为空的队列发消息

我们在消费者端配置了routingkey为空的队列,叫做 QUEUE_DIRECT_EMAIL

因此会打印出下面这条记录

2.向routingkey为123的队列发消息

我们在消费者端配置了routingkey为123的队列,叫做 QUEUE_DIRECT_SMS

因此会打出下面这条记录

验证Fanout

谁跟我绑定了,我都发

验证Topic

模糊匹配routingkey

匹配sms队列

会把下面这个打印出来

需要注意的是如果我们没有自定义容器工厂的话,这个containerFactory可以不写
简单理解就是实例,也就是rabbitmq服务地址是在哪里,实例包括了域名、端口、账号、密码等。


6.ACK消息确认补充

消费者监听队列时增加消息确认ack,改动两个地方
1.自定义容器工厂,增加手动确认机制

RabbitFactory

factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

  1. package com.zsp.consumer.queue;
  2. import org.springframework.amqp.core.AcknowledgeMode;
  3. import org.springframework.amqp.rabbit.annotation.EnableRabbit;
  4. import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
  5. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  6. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  7. import org.springframework.beans.factory.annotation.Qualifier;
  8. import org.springframework.context.annotation.Bean;
  9. import org.springframework.context.annotation.Configuration;
  10. @Configuration
  11. @EnableRabbit
  12. public class RabbitFactory {
  13. @Bean("zspConnectionFactory")
  14. public ConnectionFactory connectionFactory() {
  15. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  16. // 设置RabbitMQ的连接信息,如主机名、端口号、用户名和密码等
  17. connectionFactory.setHost("localhost");
  18. connectionFactory.setPort(5672);
  19. connectionFactory.setUsername("root");
  20. connectionFactory.setPassword("zsproot");
  21. return connectionFactory;
  22. }
  23. @Bean("rabbitListenerContainerFactory")
  24. public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(@Qualifier("zspConnectionFactory") ConnectionFactory connectionFactory) {
  25. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  26. factory.setConnectionFactory(connectionFactory);
  27. factory.setConcurrentConsumers(5);
  28. factory.setMaxConcurrentConsumers(10);
  29. factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  30. return factory;
  31. }
  32. }

2.监听队列增加try{}catch(){}finally{}代码块

以DirectSMS举例

  1. public void DirectSMS(String msg, Message message, Channel channel) {
  2. try {
  3. JSONObject jsonObject = JSONObject.parseObject(msg);
  4. System.out.println("Direct队列->sms队列" + jsonObject);
  5. } catch (Exception e) {
  6. e.printStackTrace();
  7. log.info("[referral],directSMS队列error", e);
  8. } finally {
  9. try {
  10. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  11. } catch (IOException e) {
  12. e.printStackTrace();
  13. }
  14. }
  15. }

解释:

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); 是用于手动确认消息的方法。在RabbitMQ中,当消费者成功处理消息后,需要向RabbitMQ服务器发送确认消息,告诉服务器可以将该消息从队列中移除。这个方法就是用来发送这个确认消息的。

在这行代码中,channel 是用来与RabbitMQ服务器通信的通道对象,basicAck 是发送确认消息的方法。message.getMessageProperties().getDeliveryTag() 是获取消息的交付标签,用于告诉RabbitMQ服务器是哪条消息需要被确认。false 是一个布尔值,表示是否批量确认,这里是单条确认,所以是false

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/399266?site
推荐阅读
相关标签
  

闽ICP备14008679号