当前位置:   article > 正文

基于Spring Boot实现RabbitMQ生产者和消费者的简单通信以及原理说明_mq消费者和生产者 怎么互通消息

mq消费者和生产者 怎么互通消息

1.设计概要

主要是基于springboot框架,配置好rabbitMQ服务器,创建两个微服务分别实现消费者和生产者。

2.主要步骤

1.交换机的创建

2.队列的创建

3.交换机与队列信息的绑定

前三步都是基于RabbitMQ的网页版管理页面实现,基本有手就行,但是创建队列与交换机时会有一些参数的选择,我来简述一下各个参数的含义

type:代表交换机的种类,主要区别就是在绑定队列,根据路由键选取队列传递信息上的区别。

  1. 直接交换机(Direct Exchange): 直接交换机将消息发送到与指定路由键完全匹配的队列。只有当消息的路由键与队列的绑定路由键完全相同时,消息才会被路由到队列。这是一种点对点的消息路由模式。

  2. 扇出交换机(Fanout Exchange): 扇出交换机将消息发送到与交换机绑定的所有队列,无论消息的路由键是什么。这种模式适用于广播消息给多个消费者的场景。

  3. 主题交换机(Topic Exchange): 主题交换机根据消息的路由键和绑定队列的通配符模式来进行匹配。这允许更复杂的消息路由策略,可以根据路由键的一部分进行匹配。例如,可以使用通配符*(匹配一个单词)和#(匹配零个或多个单词)来定义路由键的模式。

  4. 头交换机(Headers Exchange): 头交换机使用消息的属性(而不是路由键)来匹配队列。发送者在发送消息时可以定义一组键值对属性,这些属性将与队列的绑定条件进行匹配。这种模式适用于基于消息属性的路由。

Durability:是否持久化,如果你将 Durability 设置为 true,那么创建的交换机会被标记为持久的(durable),这意味着它的定义会被保存到磁盘上。即使 RabbitMQ 服务器在创建交换机后重启,该交换机的定义也会被保留,不会丢失。这是一种防止定义丢失的机制。

4.定义生产者

生产者这里首先要开启微服务,利用springboot中application.yaml上的配置信息连接到exchange上,然后实现消息的序列化,回调函数的实现,以及消息发送。

5.定义消费者

消费者就比较简单了,连接到RabbitMQ服务,然后利用注解设立RabbitMQ消费者,连接到对应的队列,就可以实现消息的接收,连接到队列后,RabbitMQ控制端会显示出连接信息,包括消费者的地址信息,tag等。

后面实现信息消费时,会根据这个信息发送到对应的消费者。

3.实现代码

配置信息:
  1. spring:
  2. rabbitmq:
  3. host: localhost
  4. port: 5672
  5. username: guest
  6. password: guest
  7. publisherConfirms: true
  8. publisherReturns: true
  9. template:
  10. mandatory: true
生产者实现:

序列化信息以及获取回调信息

  1. @Bean
  2. public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
  3. final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  4. rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
  5. rabbitTemplate.setMandatory(true);
  6. rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
  7. @Override
  8. public void returnedMessage(Message message, int replyCode, String replyText,
  9. String exchange, String routingKey) {
  10. try {
  11. System.out.println("--------收到无法路由回发的消息--------");
  12. System.out.println("ReturnCallback: " + "消息:" + message);
  13. System.out.println("ReturnCallback: " + "回应码:" + replyCode);
  14. System.out.println("ReturnCallback: " + "回应信息:" + replyText);
  15. System.out.println("ReturnCallback: " + "交换机:" + exchange);
  16. System.out.println("ReturnCallback: " + "路由键:" + routingKey);
  17. System.out.println("properties:" + message.getMessageProperties());
  18. System.out.println("body:" + new String(message.getBody(), "UTF-8"));
  19. } catch (UnsupportedEncodingException e) {
  20. e.printStackTrace();
  21. }
  22. }
  23. });
  24. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  25. @Override
  26. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  27. System.out.println("--------收到服务端异步确认--------");
  28. System.out.println("ConfirmCallback: " + "相关数据:" + correlationData);
  29. System.out.println("ConfirmCallback: " + "确认情况:" + ack);
  30. System.out.println("ConfirmCallback: " + "原因:" + cause);
  31. }
  32. });
  33. return rabbitTemplate;
  34. }

序列化主要是转为json实现消息的传递

returnedMessage:实现交换机无法接收信息后的处理逻辑

ConfirmCallback:服务器端异步确认的实现

这么设计主要由于RabbitMQ保证可靠性的机制,确保生产者已经将消息交给RabbitMQ:

  1. 连接到 RabbitMQ 服务器: 生产者应用程序首先与 RabbitMQ 服务器建立连接,通常是通过 AMQP 协议来实现。
  2. 创建通道: 在连接建立后,生产者在连接上创建一个通道,用于执行后续操作。
  3. 声明队列: 如果消息队列不存在,生产者可以声明一个消息队列,该队列用于存储消息。
  4. 构造消息: 生产者构造要发送的消息,将数据和元数据(如路由键、优先级等)组成消息对象。
  5. 将消息发送到队列: 生产者将构造的消息发送到先前声明的队列中,消息会暂时存储在队列中等待消费者获取。
  6. 等待确认: 生产者可以等待来自 RabbitMQ 的确认信号,表示消息已被接收。确认机制可确保消息成功进入队列。
  7. 关闭连接: 在生产者完成消息发送后,通常会关闭连接或通道,以释放资源。

发送消息:

  1. final String directExchange="exchange2";
  2. // 自定义的模板,所有的消息都会转换成JSON发送
  3. @Autowired
  4. AmqpTemplate amqpTemplate;
  5. public void send(String message) {
  6. // 发送JSON字符串
  7. ObjectMapper mapper = new ObjectMapper();
  8. String sycMsg = null;
  9. try {
  10. sycMsg = mapper.writeValueAsString(message);
  11. } catch (JsonProcessingException e) {
  12. e.printStackTrace();
  13. }
  14. //System.out.println(sycMsg);
  15. amqpTemplate.convertAndSend(directExchange,"spring.#", sycMsg);
  16. }

convertAndSend方法的三个传参分别是,交换机名称,路由键,消息内容

消费者实现:

  1. @RabbitHandler
  2. @RabbitListener(queues = "queue4")
  3. public void Show2(Object message){
  4. System.out.println("queue4订阅的消息为:"+message);
  5. }
  1. @RabbitListener 注解: 这个注解用于标记一个方法,表示它是一个消息监听器,会监听指定队列中的消息。在这个注解中,你可以指定队列的名称,表示这个监听器关注哪个队列。在你的例子中,@RabbitListener(queues = "queue4") 表示这个监听器关注名为 "queue4" 的队列。

  2. @RabbitHandler 注解: 这个注解用于标记被 @RabbitListener 注解的方法中的具体处理方法。如果一个消息监听器监听的队列可能接收多种不同类型的消息,你可以在同一个监听器类中定义多个处理方法,并用 @RabbitHandler 标记每个方法来处理不同类型的消息。@RabbitHandler 可以根据消息的类型来选择调用哪个处理方法。

消费者处理消息的流程:

  1. 连接到 RabbitMQ 服务器: 消费者应用程序与 RabbitMQ 服务器建立连接。
  2. 创建通道: 类似于生产者,消费者在连接上创建通道。
  3. 声明队列: 消费者需要在相同的队列上声明,以便从中获取消息。
  4. 注册消费者回调: 消费者注册一个回调函数,用于接收从队列中获取的消息。
  5. 开始消费消息: 一旦回调函数注册完成,消费者开始从队列中获取消息,并将消息传递给回调函数进行处理。
  6. 处理消息: 回调函数处理消息,可以是对消息的业务逻辑操作,如数据处理、计算等。
  7. 发送确认: 消费者在成功处理消息后,向 RabbitMQ 发送确认信号,表示消息已被成功处理。根据配置,消息将从队列中删除或标记为已处理。
  8. 继续消费: 消费者回到等待状态,继续从队列中获取和处理下一条消息。
  9. 关闭连接: 在消费者完成处理后,通常会关闭连接或通道。

其他有关RabbitMQ的机制

1.基于什么协议实现消息的传递

RabbitMQ基于AMQP(Advanced Message Queuing Protocol)协议实现消息的传递

2.RabbitMQ队列的种类有哪些

  1. 普通队列(Standard Queue): 普通队列是最基本的队列类型。消息会按照先进先出(FIFO)的顺序排列在队列中。多个消费者可以从队列中获取消息,但每条消息只会被一个消费者接收。

  2. 优先级队列(Priority Queue): 优先级队列允许消息设置不同的优先级。具有更高优先级的消息将在队列中排在具有较低优先级的消息之前,以确保高优先级消息尽快被处理。

  3. 延迟队列(Delayed Queue): 延迟队列允许将消息延迟发送到队列中。消息会在指定的延迟时间之后才可被消费者获取。这对于处理定时任务和调度任务非常有用。

  4. 死信队列(Dead Letter Queue,DLQ): 死信队列用于处理无法被消费者成功处理的消息。如果消息因为某种原因无法被消费,它们可以被重新路由到死信队列,以便后续处理或分析失败的消息。

  5. 绑定队列(Binding Queue): 绑定队列是一种特殊的队列,它不用于存储消息,而是用于表示队列和交换机之间的绑定关系。消息不会在绑定队列中排队等待,而是直接路由到与之绑定的队列。

  6. 镜像队列(Mirrored Queue): 镜像队列是一种高可用性队列,它将队列的数据镜像到多个节点,以防止单个节点的故障导致消息丢失。

  7. 自动删除队列(Auto-Delete Queue): 自动删除队列是一种在不再使用时自动删除自身的队列类型。当最后一个消费者断开连接后,队列会被自动删除。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Guff_9hys/article/detail/774530
推荐阅读
相关标签
  

闽ICP备14008679号