当前位置:   article > 正文

《Spring Boot 实战派》--12.集成RabbitMQ,实现系统间的 数据交换_convertandsend

convertandsend

第12章 集成RabbitMQ,实现系统间的数据交换

RabbitMQ是近年来使用非常广泛的消息中间件

本章首先介绍它的原理、概念、6种工作模式、常用的注解;

然后用实例讲解在Spring Boot 中如何使用AmqpTemplate接口实现消息的发送和监听。

12.1 认识 RabbitMQ

12.1.1 介绍 RabbitMQ

RabbitMQ 是开源的高级消息队列协议(Advanced Message Queueing Protocol, AMQP) 的实现,用Erlang语言编写,支持多种客户端。

RabbitMQ是目前应用相当广泛的消息中间件(其他同类的消息处理中间件有ActiveMQ、

Kafka等)。在企业级应用、微服务应 用中,RabbitMQ担当着十分重要的角 色。例如,在业务服务模块中解耦' 异 步通信、高并发限流、超时业务、数据 延迟处理等都可以使用RabbitMQo

RabbitMQ的处理流程如图

12.1.2使用场景

1、推送通知

“发布/订阅”是RabbitMQ的重要功能。可以用“发布/订阅”功能来实现通知功能。消费者 (consumer)-直监听RabbitM Q的数据。如果RabbitM Q有数据,则消费者会按照“先进先出” 规则逐条进行消费。而生产者(producer)只需要将数据存入RabbitMQ。这样既降低了不同系统 之间的耦合度,也确保了消息通知的及时性,且不影响系统的性能。

“发布/订阅”功能支持三种模式:一对一、一对多、广播。这三种模式都可以根据规则选择分发的对象。众多消费者(consumer)可以根据规则选择是否接收这些数据,扩展性非常强。

2、异步任务

        后台系统接到任务后,将其分解成多个小任务,只要分别完成这些小任务,整个任务便可以完 成。但是,如果某个小任务很费时,且延迟执行并不影响整个任务,则可以将该任务放入消息队列 中去处理,以便加快请求响应时间。

        如果用户注册会员时有一项需求——发送验证邮件或短信验证码以完成验证,则可以使用 RabbitMQ的消息队列来实现,这样可以及时提醒用户操作已经成功。等待收到邮件或验证码,然 后进行相应的确认,即完成验证。

3、多平台应用的通信

        RabbitMQ可以用于不同开发语言开发的应用间的通信(如Java开发的应用程序需要与C++ 开发的应用程序进行通信),实现企业应用集成。由于消息队列是无关平台和语言的,而且语义上也 不是函数调用,因此RabbitMQ适合作为多个应用之间的松耦合的接口,且不需要发送方和接收方同时在线。

不同语言的软件解耦,可以最大限度地减少程序之间的相互依赖,提高系统可用性及可扩展性, 同时还增加了消息的可靠传输和事务管理功能。

RabbitMQ提供两种事务模式:

  • AMQP事务模式。
  • Confirm事务模式。

4、消息延迟

        利用RabbitMQ消息队列延迟功能,可以实现订单、支付过期定时取消功能。因为延迟队列存 储延时消息,所以,当消息被发送以后,消费者不是立即拿到消息,而是等待指定时间后才拿到这个消息进行消费。

        当然,死信、计时器、定时任务也可以实现延迟或定时功能,但是需要开发者去处理。

        要实现消息队列延迟功能,一般采用官方提供的插件“rabbitmq_delayed_message_ exchange"来实现,但RabbitMQ版本必须是3.5.8版本以上才支持该插件。如果低于这个版本, 则可以利用“死信”来完成。

5、远程过程调用

        在实际的应用场景中,有时需要一些同步处理,以等待服务器端将消息处理完成后再进行下一 步处理,这相当于RPC ( Remote Procedure Call,远程过程调用)。RabbitMQ也支持RPC

12.1.3特性

RabbitMQ具有以下特性。

  • 信息确认:RabbitMQ有以下两种应答模式。
  • 自动应答:当RabbitMQ把消息发送到接收端,接收端从队列接收消息时,会自动发送 应答消息给服务器端。
  • 手动应答:需要开发人员手动调用方法告诉服务器端已经收到。

        

    注:   如果实际场景中对个别消息的丢失不是很敏感,则选用自动应答比较理想。

             如果是一个消息都不能丢的场景,则需要选用手动应答,在正确处理完以后才应答。

如果选择了自动应答,那消息重发这个功能就没有了;

  • 队列持久化:队列可以被持久化,但是否为持久化,要看持久化设置。
  • 信息持久化:设置properties.DeliveryMode值即可。默认值为1,代表不是持久的,2代表持久化。
  • 消息拒收:接收端可以拒收消息,而且在发送“reject”命令时,可以选择是否要把拒收的消息重新放回队列中。
  • 消息的QoS:在接收端设置的。发送端没有任何变化,接收端的代码也比较简单,只需要加上如 “channel.BasicQos(0, 1, false);"的代码即可。

12.2 RabbitMQ的基本概念

12.2.1生产者、消费者和代理

RabbitMQ的角色有以下三种。

  • 生产者:消息的创建者,负责创建和推送数据到消息服务器。
  • 消费者:消息的接收方,用于处理数据和确认消息。
  • 代理:RabbitMQ本身,扮演“快递”的角色,本身不生产消息。

注: 生产者和消费者并不属于RabbitMQ, RabbitMQ只是为生产者和消费者提供发送和接收消息的API

12.2.2消息队列

        Queue(队列)是RabbitMQ的内部对象,用于存储生产者的消息直到发送给消费者,也是消费者接收消息的地方。RabbitMQ中的消息也都只能存储在Queue中,多个消费者可以订阅同一个Queue。        

Queue有以下一些重要的属性。

  • 持久性:如果启用,则队列将会在消息协商器(broker)重启前都有效。
  • 自动删除:如果启用,则队列将会在所有的消费者停止使用之后自动删除掉。
  • 惰性:如果没有声明队列,则应用程序调用队列时会导致异常,并不会主动声明。
  • 排他性:如果启用,则声明它的消费者才能使用。

12.2.3交换机

        Exchange (交换机)用于接收、分配消息。生产者先要指定一个“routing key”,然后将消息 发送到交换机。这个“routing key”需要与“Exchange Type”及”binding key"联合使用才能最终生效,然后,交换机将消息路由到一个或多个Queue中,或丢弃。

        在虚拟主机的消息协商器(broker)中,每个Exchange都有唯一的名字。

        Exchange包含4种类型:direct、topic、fanout、headerso不同的类型代表绑定到队列的行为不同。

AMQP规范里还有两种交换机类型--system与自定义。

1. direct

        direct类型的行为是“先匹配,再投送”。在绑定队列时会设定一个routing key,只有在消息的routing key与队列匹配时,消息才会被交换机投送到绑定的队列中。允许一个队列通过一个固定的routing key (通常是队列的名字)进行绑定。Direct交换机将消息根据其routing key属性投 递到包含对应key属性的绑定器上。

        Direct Exchange是RabbitMQ默认的交换机模式,也是最简单的模式。它根据routing key 全文匹配去寻找队列。

2、topic

        按规则转发消息(最灵活)。主题交换机(topic exchange )转发消息主要根据通配符。队列和交换机的绑定会定义一种路由模式,通配符就要在这种路由模式和路由键之间匹配后,交换机才能转发消息。

        在这种交换机模式下,路由键必须是一串字符,用"."隔开。

        路由模式必须包含一个星号“*”,主要用于匹配路由键指定位置的一个单词。

        topic还支持消息的routing key,用“*”或“#”的模式进行绑定。“*”匹配一个单词,“#” 匹配0个或多个单词。例如,"'binding key *.user.#"匹配 routing key 为“cn.user”和“us.user.db”, 但是不匹配"user.hello"。

3、headers

        它根据应用程序消息的特定属性进行匹配,可以在binding key中标记消息为可选或必选。在队列与交换机绑定时,会设定一组键值对规则。消息中也包括一组键值对(headers属性),当这些 键值对中有一对,或全部匹配时,消息被投送到对应队列。

4、fanout

        消息广播的模式,即将消息广播到所有绑定到它的队列中,而不考虑routing key的值(不管路由键或是路由模式)。如果配置了 routing key,则routing key依然会被忽略。

12.2.4绑定

        RabbitMQ中通过绑定(binding ),将Exchange与Queue关联起来。这样RabbitMQ就知道如何正确地将消息路由到指定的Queue 了。

        在绑定Exchange与Queue时,一般会指定一个binding key。消费者将消息发送给Exchange 时,一般会指定一个routing key。如果binding key与routing key相匹配,则消息将会被路由到对应的Queue中。

        绑定是生产者和消费者消息传递的连接。生产者发送消息到Exchange,消费者从Queue接 收消息,都是根据绑定来执行的。

12.2.5 通道

        有些应用需要与AMQP代理建立多个连接。但同时开启多个TCP ( Transmission Control Protocol,传输控制协议)连接会消耗过多的系统资源,并使得防火墙的配置变得更加困难。“AMQP 0-9-1”协议用通道(channel)来处理多连接,可以把通道理解成“共享一个TCP连接的多个轻 量化连接”。

        —个特定通道上的通信与其他通道上的通信是完全隔离的,因此,每个AMQP方法都需要携带 一个通道号。这样客户端就可以指定此方法是为哪个通道准备的。

12.2.6 消息确认

        消息确认(message acknowledgement)是指:当一个消息从队列中投递给消费者 (consumer)后,消费者会通知一下消息代理(broker),这个过程可以是自动的,也可以由处理 消息的应用的开发者执行。当“消息确认”启用时,消息代理需要收到来自消费者的确认回执后, 才完全将消息从队列中删除。

        如果消息无法被成功路由,或被返给发送者并被丢弃,或消息代理执行了延期操作,则消息会被放入一个“死信”队列中。此时,消息发送者可以选择某些参数来处理这些特殊情况。

12.3 RabbitMQ的6种工作模式

12.3.1简单模式

        生产者把消息放入队列,消费者获得消息,如图12-2所示。这个模式只有一个消费者、一个 生产者、一个队列,只需要配置主机参数,其他参数使用默认值即可通信。

12.3.2工作队列模式

        这种模式出现了多个消费者,如图12-3所示。为了保证消费者之间的负载均衡和同步,需要 在消息队列之间加上同步功能。

        工作队列(任务队列)背后的主要思想是:避免立即执行资源密集型任务(耗时),以便下一个 任务执行时不用等待它完成。工作队列将任务封装为消息并将其发送到队列中。

12.3.3交换机模式

        实际上,前两种模式也使用了交换机,只是使用了采用默认设置的交换机。交换机参数是可以 配置的,如果消息配置的交换机参数和RabbitMQ队列绑定(binding)的交换机名称相同,则转发,否则丢弃,如图12-4所示。

12.3.4 Routing 转发模式

        交换机要配置为direct类型,转发的规则变为检查队列的routing key值。如果routing key 值相同,则转发,否则丢弃,如图12-5所示。

12.3.5主题转发模式

        这种模式下交换机要配置为topic类型,routing key配置失效。发送到主题交换机的信息, 不能是任意routing key,它必须是一个单词的列表,用逗号分隔。

        特点是可以模糊匹配,匹配 规则为:*(星号)可以代替一个词;#(#号)可以代替零个或更多的单词,其模式情况如图12-6 所示。

12.3.6 RPC 模式

        这种模式主要使用在远程调用的场景下。如果一个应用程序需要另外一个应用程序来最终返回 运行结果,那这个过程可能是比较耗时的操作,使用RPC模式是最合适的。其模式情况如图12-7 所示。

6种工作模式的主要特点如下。

  • 简单模式:只有一个生产者,一个消费者
  • 工作队列模式:一个生产者,多个消费者,每个消费者获取到的消息唯一。
  • 订阅模式:一个生产者发送的消息会被多个消费者获取。
  • 路由模式:发送消息到交换机,并且要指定路由key,消费者在将队列绑定到交换机时需要 指定路由key。
  • topic模式:根据主题进行匹配,此时队列需要绑定在一个模式上,“#”匹配一个词或多个词,“*”只匹配一个词。

12.4 认识 AmqpTemplate 接口

        Spring AMQP提供了操作AMQP协议的模板类AmqpTemplate,用于发送和接收消息, 它定义发送和接收消息等操作,还提供了 RabbitTemplate用于实现AmqpTemplate接口, 而且还提供了错误抛出类AmqpException;RabbitTemplate支持消息的确认与返回(默认禁用)

12.4.1发送消息

1、send方法

AmqpTemplate模板提供了 send方法用来发送消息,它有以下3个“重载”:

  • void send(Message message) throws AmqpException
  • void send(String routingKey, Message message) throws AmqpException
  • void send(String exchange, String routingKey, Message message)throws Amqp- Exceptiori

2、convertAndSend 方法

        AmqpTemplate模板还提供了 convertAndSend方法用来发送消息。convertAndSend方法相当于简化了的send方法,可以自动处理消息的序列化。下面通过两个功能一样的代码来比较两者的区别:

  1. @Test
  2. public void send() {
  3. Message message = MessageBuilder.withBody("body content".getBytes())
  4. .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
  5. .setMessageld("1")
  6. .setHeader("header", "header")
  7. .build();
  8. amqpTemplate.send("QueueHello", message);
  9. }

  

上面代码和下面代码的效果一样。

  1. @Test
  2. public void send2() {
  3. amqpTemplate.convertAndSend("QueueHello","body content");
  4. }

  

12.4.2接收消息

接收消息可以有两种方式。

  • 直接去查询获取消息,即调用receive方法。如果该方法没有获得消息,则直接返回null, 因为receive方法不阻塞。
  • 异步接收,通过注册一个Listener (监听器)来实现消息接收。接收消息需要指定队列 (Queue),或设置默认的队列。

AmqpTemplate提供的直接获得消息的方法是receive;

        另外,AmqpTemplate也提供了直接接收POJO (代替消息对象)的方法receiveAndConvert,并提供了各种的Messageconverter用来处理返回的Object (对象)。

        从 Spring-Rabbit 1.3 版本开始,AmqpTemplate 也提供了 receiveAndReply 方法来异步接收、处理及回复消息。

12.4.3异步接收消息

        Spring AMQP也提供了多种不同的方式来实现异步接收消息,比如常用通过 MessageListener (消息监听器)的方式来实现。

        从Spring-rabbit 1.4版本开始,可使用注解@RabbitListener来异步接收消息,它更为简便。 使用方法见以下代码:

  1. @Component
  2. //监听QueueHello的消息队列
  3. @RabbitListener(queues = "QueueHello")
  4. public class QueueReceiver {
  5. //注解@RabbitHandler用来实现具体消费消息
  6. @RabbitHandler
  7. public void QueueReceiver(String QueueHello) {
  8. System.out.println("Receiver(QueueHello):" + QueueHello);
  9. }
  10. }

  在较低版本中,需要在容器中配置@EnableRabbit来支持@RabbitListener

12.5 在 Spring Boot 中集成 RabbitMQ

12.5.1 安装 RabbitMQ

        RabbitMQ是用Erlang语言开发的。所以,需要先安装Erlang环境,再安装RabbitMQ

(1)下载 Erlang 环境和 RabbitMQ。

                到Erlang官网下载Erlang环境。

                到 RabbitMQ 官网下载 RabbitMQ

(2)安装。

        下载完成后,先单击Erlang安装文件进行安装,然后单击RabbitMQ安装文件进行安装。在 安装过程中,按照提示一步一步操作即可。在RabbitMQ成功安装后,会自动启动服务器。

(3)开启网页管理界面。

        虽然可以在命令行管理RabbitMQ,但稍微麻烦。RabbitMQ提供了可视化的网页管理平台, 可以使用 11 rabbitmq-plugins.bat enable rabbitmq_management”命令开启网页管理界面。

12.5.2界面化管理RabbitMQ

1.概览

        在安装配置完成后,开启网页管理,然后可以通过"http://localhost:15672"进行查看和管理, 输入默认的用户名“guest”和密码"guest”进行登录。RabbitMQ的后台界面如图12-8所示

2.管理交换机

        进入交换机管理页面后,单击"Add exchange (添加交换机)”按钮,弹出添加界面,可以看 到列出了 RabbitMQ默认的4种类型,由于笔者已经添加了消息延迟插件,所以会有 “x-delayed-message”类型,

3.管理管理员

        消息中间件的安全配置也是必不可少的。在RabbitMQ中,可以通过命令行创建用户、设置密 码、绑定角色。常用的命令如下:

  • rabbitmqctl.bat list_users:查看现有用户。
  • abbitmqctl.bat add_user username password:新增用户。新增的用户只有用户名、密码,没有管理员、超级管理员等角色。
  • rabbitmqctl.bat set_user_tags username administrator:设置角色。角色分         为 none、 managements policymaker、monitorings administrator。
  • rabbitmqctl change_password userName newPassword:修改密码命令。
  • rabbitmqctl.batdelete_user username:删除用户命令。

 

        还可以在开启RabbitMQ网页管理界面之后,用可视化界面进行操作,如图12-10所示。其中“Tags"是管理员类型。

        在创建用户后,需要指定用户访问一个虚拟机(如图12-11所示),并且该用户只能访问该虚 拟机下的队列和交换机。如果没有指定,则默认是“No access",而不是"/”(所有)。在一个 RabbitMQ服务器上可以运行多个vhost,以适应不同的业务需要。这样做既可以满足权限配置的要求,也可以避免不同业务之间队列、交换机的命名冲突问题,因为不同vhost之间是隔离的,权限设置可以细化到主题。

12.5.3 在 Spring Boot 中酉己置 RabbitMQ

(1)添加依赖,见以下代码:

  1. <dependency>
  2.   <groupld>org.springframework. boot</groupld>
  3.   <artifactld>spring-boot-starter-web</artifactld>
  4. </dependency>
  5. <dependency>
  6.   <groupld>org.springframework.boot</groupld>
  7.   <artifactld>spring-boot-starter-amqp</artifactld>
  8. </dependency>

  

(2 )配置 application.properties 文件。 设置好连接的地址、端口号、用户名和密码。

  1. spring.application.name=rabbitmq-hello
  2. spring.rabbitmq.host=localhost
  3. spring.rabbitmq.port=5672
  4. spring.rabbitmq.usemame=guest
  5. spring. rabbitmq.password=guest

  

12.6 在Spring Boot中实现RabbitMQ的4种发送/接收模式

12.6.1 实例50:实现发送和接收队列

        本实例实现发送和接收队列。读者通过此实例可以很快理解本章之前所讲解的相关知识点。

        本实例的源代码可以在"/12/RabbitmcLQueueDemo"目录下找到。

(1 )配置队列。

首先要配置队列的名称,并将队列交由loC管理,见以下代码:

  1. @Configuration
  2. public class RabbitmqConfig {
  3. @Bean
  4. public Queue queue() {
  5. return new Queue("Queue1");
  6. }
  7. }

  

(2 )创建接收者。

注意,发送者和接收者的Queue名称必须一致,否则不能接收,见以下代码:

  1. @Component
  2. //监听QueueHello的消息队列
  3. @RabbitListener(queues = "Queue1")
  4. public class ReceiverA {
  5. //@RabbitHandler来实现具体消费
  6. @RabbitHandler
  7. public void QueueReceiver(String Queue1) {
  8. System.out.println("Receiver A:" + Queue1);
  9. }
  10. }

  

(3) 创建发送者。

利用convertAndSend方法发送消息,见以下代码:

  1. ©Component
  2. public class SenderA {
  3. @Autowired
  4. private AmqpTemplate rabbitTemplate;
  5. public void send(String context) {
  6. System.out.println("Sender:" + context);
  7. //使用AmqpTemplate将消息发送到消息队列中
  8. this.rabbitTemplate.convertAndSend("Queue1", context);
  9. }
  10. }

  

(4) 测试发送和接收情况。

这里测试一次发送两条信息,见以下代码:

  1. @Test
  2. public void QueueSend() {
  3. int i = 2;
  4. for (int j = 0; j < i; j++) {
  5. String msg = "Queue1 msg" + j + new Date();
  6. try{
  7. queueSender.send(msg);
  8. } catch (Exception e) {
  9. e.printStackTrace();
  10. }
  11. }
  12. }

  运行测试,可以看到控制台输出如下结果:

  1. Receiver A: Queuel msgOWed May 08 23:41:00 CST 2019
  2. Receiver B: Queuel msg 1 Wed May 08 23:41:00 CST 2019

  上述信息表示发送成功,且接收成功。

        如果是多个接收者,则会均匀地将消息发送到N个接收者中,并不是全部发送一遍,“多对多” 也会和“一对多”一样,接收端仍然会均匀地接收到消息。

12.6.2实例51:实现发送和接收对象

        本实例实现发送和接收对象。读者通过此实例可以很快理解本章之前所讲解的相关知识点。

本实例的源代码可以在712/Rabbitmq_ObjectDemo,'目录下找到。

(1 )编辑配置类。

配置发送接收对象的队列,见以下代码:

  1. @Configuration
  2. public class RabbitmqConfig {
  3. @Bean
  4. public Queue objectQueue() {
  5. return new Queue("object");
  6. }
  7. }

  

(2)编写接收类。

用于接收消息,见以下代码:

  1. @Component
  2. @RabbitListener(queues = "object")
  3. public class ObjectReceiver {
  4. @RabbitHandler
  5. public void process(User user) {
  6. System.out.println("Receiver object:" + user);
  7. }
  8. }

  

(3 )编写发送类。

用convertAndSend方法发送,见以下代码:

  1. @Component
  2. public class Objectsender {
  3. @Autowired
  4. private AmqpTemplate amqpTemplate;
  5. public void send(User user) {
  6. System.out.println("Sender object:" + user.toString());
  7. this.amqpTemplate.convertAndSend("object", user);
  8. }
  9. }

  

(4)编写测试。

这里实例化了一个User对象,用于发送消息,见以下代码:

  1. @Autowired
  2. private Objectsender objectsender;
  3. @Test
  4. public void sendOjectController() {
  5. try{
  6. User user = new User();
  7. user.setName("longzhiran");
  8. user.setAge("2");
  9. objectSender.send(user);
  10. } catch (Exception e) {
  11. e.printStackTrace();
  12. }
  13. }

  

运行测试,可以看到控制台输出如下结果:

  1. Sender object: User(name=longzhiran, age=2)
  2. Receiver object: User(name=longzhiran, age=2)

  

12.6.3 实例52:实现用接收器接收多个主题

topic是RabbitMQ中最灵活的一种方式,可以根据routing key自由地绑定不同的队列。

本实例的源代码可以在"12/Rabbitmq_TopicDemo”目录下找到。

(1 )配置 topiC

配置处理消息的队列,见以下代码:

  1. @Configuration
  2. public class RabbitmqConfig {
  3. @Bean
  4. public Queue queueMessage() {
  5. return new Queue("topic.a");
  6. }
  7. @Bean
  8. public Queue queueMessages() {
  9. return new Queue("topic.b");
  10. }
  11. @Bean
  12. TopicExchange exchange() {
  13. return new TopicExchange("topicExchange");
  14. }
  15. @Bean
  16. Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
  17. return BindingBuilder.bind(queueMessage).to(exchange).with("topic.a");
  18. }
  19. @Bean
  20. Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
  21. return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
  22. }
  23. }

  

(2) 编写接收者A。

接收者A监听主题是“topic.a”,见以下代码:

  1. ©Component
  2. @RabbitListener(queues = "topic.a")
  3. public class TopicReceiverA {
  4. @RabbitHandler
  5. public void process(String msg) {
  6. System.out.println("Topic ReceiverA:" + msg);
  7. }
  8. }

  

(3) 编写接收者B。

接收者B监听主题是“topic.b”,见以下代码:

  1. @Component
  2. @RabbitListener(queues = "topic.b")
  3. public class TopicReceiverB {
  4. @RabbitHandler
  5. public void process(String msg) {
  6. System.out.println("Topic ReceiverB:" + msg);
  7. }
  8. }

  

(4) 编写发送者。

编写发送者,通过发送不同的“topic”来测试效果,见以下代码:

  1. @Component
  2. public class TopicSender {
  3. @Autowired
  4. private AmqpTemplate amqpTemplate;
  5. public void send() {
  6. String context = "topic";
  7. System.out.println("Sender:" + context);
  8. this.amqpTemplate.convertAndSend("topicExchange", "topic.1", context);
  9. }
  10. public void send2() {
  11. String context = "topic 2";
  12. System.out.println("Sender:" + context);
  13. this.amqpTemplate.convertAndSend("topicExchange", "topic.a", context);
  14. }
  15. public void send3() {
  16. String context = "topic3";
  17. System.out.println("Sender:" + context);
  18. this.amqpTemplate.convertAndSend("topicExchange", "topic.b", context);
  19. }
  20. }

  

(5)编写测试,见以下代码:

  1. public class TopicSendControllerTest {
  2. @Autowired
  3. private TopicSender sender;
  4. @Test
  5. public void topic() throws Exception {
  6. sender.send();
  7. }
  8. @Test
  9. public void topic1() throws Exception {
  10. sender.send2();
  11. }
  12. @Test
  13. public void topic2() throws Exception {
  14. sender.send3();
  15. }
  16. }

  

运行测试,可以看到控制台输出如下结果:

  1. Topic Receiver2 : topic
  2. Topic Receiver2 : topic 2
  3. Topic Receiver1 : topic 2
  4. Topic Receiver2 : topic 3

  

12.6.4实例53:实现广播模式

        fanout是广播模式。在该模式下,绑定了交换机的所有队列都能接收到这个消息。

        本实例的源代码可以在"/12/Rabbitmq_FanoutDemon目录下找到。

(1)配置 fanouto

配置广播模式的队列,见以下代码:

  

  1. @Configuration
  2. public class RabbitmqConfig {
  3. @Bean
  4. public Queue queueA() {
  5. return new Queue("fanout.A");
  6. }
  7. @Bean
  8. public Queue queueB() {
  9. return new Queue("fanout.B");
  10. }
  11. @Bean
  12. FanoutExchange fanoutExchange() {
  13. return new FanoutExchange("fanoutExchanger");
  14. }
  15. @Bean
  16. Binding bindingExchangeA(Queue queueA, FanoutExchange fanoutExchanger) {
  17. return BindingBuilder.bind(queueA).to(fanoutExchanger);
  18. }
  19. @Bean
  20. Binding bindingExchangeB(Queue queueB, FanoutExchange fanoutExchanger) {
  21. return BindingBuilder.bind(queueB).to(fanoutExchanger);
  22. }
  23. }

(2) 编写发送者。

编写发送者发送广播,见以下代码:

  1. @Component
  2. public class FanoutSender {
  3. @Autowired
  4. private AmqpTemplate rabbitTemplate;
  5. public void send() throws Exception{
  6. String context = "Fanout";
  7. System.out.println("Sender:" + context);
  8. this.rabbitTemplate.convertAndSend("fanoutExchange", "", context);
  9. }
  10. }

  

(3) 编写接收者A。

接收者A监听“fanout.A”,见以下代码:

  1. @Component
  2. @RabbitListener(queues = "fanout.A")
  3. public class FanoutReceiverA {
  4. @RabbitHandler
  5. public void process(String message) {
  6. System.out.println("fanout Receiver A:" + message);
  7. }
  8. }

  

(4) 编写接收者B。

接收者B监听“fanout.B”,见以下代码:

  1. @Component
  2. @RabbitListener(queues = "fanout.B")
  3. public class FanoutReceiverB {
  4. @RabbitHandler
  5. public void process(String message) {
  6. System.out.println("fanout Receiver B:" + message);
  7. }
  8. }

  

(5 )编写测试。

编写测试,用于测试效果,见以下代码:

  1. public class FanoutSendControllerTest {
  2. @Autowired
  3. private FanoutSender sender;
  4. @Test
  5. public void fanoutSender() throws Exception {
  6. sender.send();
  7. }
  8. }

  运行测试,可以看到控制台输出如下结果:

  1. fanout Receiver A:Fanout
  2. fanout Receiver B:Fanout

  这表示绑定到fanout交换机上的队列都接收到了消息。

12.7 实例54:实现消息队列延迟功能

        要实现这个功能,一般使用RabbitMQ的消息队列延迟功能,即采用官方提供的插件 “rabbitmq_delayed_message_exchange” 来实现。但 RabbitMQ 版本必须是 3.5.8 以上才支 持该插件,否则得用其“死信”功能。

本实例的源代码可以在“/12/Rabbitmq_DelayedDemo”目录下找到。

(1)安装延迟插件。

        用rabbitmq-plugins list命令可以查看安装的插件。如果没有,则直接访问官网进行下载,下载完成后,将其解压到RabbitMQ的plugins目录,如笔者的目录路径是uG:\Program Files\ RabbitMQ Server\rabbitmq_server-3.7.12\plugins” 

然后执行下面的命令进行安装:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

(2)配置交换机,见以下代码:

  1. @Bean
  2. public CustomExchange delayExchange() {
  3. Map<String, Object> args = new HashMap<>();
  4. args.put("x-delayed-type", "direct");
  5. return new CustomExchange("delayed_exchange", "x-delayed-message", true, false, args);
  6. }

  

这里要使用 CustomExchange,而不是 DirectExchange。CustomExchange 的类型必须是 x-delayed - message。

(3)实现消息发送。这里设置消息延迟5s,见以下代码:

  1. @Service
  2. public class CustomSender {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. public void sendMsg(String queueName, String msg) {
  6. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  7. System.out.println("消息发送时间:"+ sdf.format(new Date()));
  8. rabbitTemplate.convertAndSend("delayed_exchange", queueName, msg, new
  9. MessagePostProcessor() {
  10. @Override
  11. public Message postProcessMessage(Message message) throws AmqpException {
  12. //消息延迟5s
  13. message.getMessageProperties().setHeader("x-delay", 5000);
  14. return message;
  15. }
  16. });
  17. }
  18. }

  

(4) 实现消息接收,见以下代码:

  1. @Component
  2. public class CustomReceiver {
  3. @RabbitListener(queues = "delay_queue_1")
  4. public void receive(String msg) {
  5. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  6. System.out.println(sdf.format(new Date()) + msg);
  7. System.out.println("Receiver :执行取消订单");
  8. }
  9. }

  

(5) 测试发送延迟消息,见以下代码:

  1. @Autowired
  2. private CustomSender CustomSender;
  3. @Test
  4. public void send() {
  5. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  6. customSender.sendMsg("delay_queue_1","支付超时,取消订单通知");
  7. }

  

运行测试,可以看到控制台输出如下结果:

  1. 2019-05-08 21:50:37支付超时,取消订单通知!
  2. Receiver :执行取消订单

  

至此,消息队列延迟功能成功实现。在rabbitmq_delayed_message_exchange插件产生之 前,我们大都是使用“死信”功能来达到延迟队列的效果。

        “死信”在创建Queue(队列)时,要声明“死信”队列。队列里的消息到一定时间没被消费, 就会变成死信转发到死信相应的Exchange或Queue中。

        延迟消息是Exchange到Queue或其他Exchange的延迟。但如果消息延迟到期了,或消息 不能被分配给其他的Exchange或Queue,则消息会被丢弃。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/728276
推荐阅读
相关标签
  

闽ICP备14008679号