当前位置:   article > 正文

RabbitMQ消息中间件使用详解_消息中间件使用样例

消息中间件使用样例

RabbitMQ官网:https://www.rabbitmq.com

一、引言

        Message Queue(消息 队列),从字⾯上理解:⾸先它是⼀个队列。FIFO先进先出的数据结构——队列。消息队列就是所谓的存放消息的队列消息队列解决的不是存放消息的队列的⽬的,解决的是通信问题

        ⽐如以电商订单系统为例,如果各服务之间使⽤同步通信,不仅耗时较久,且过程中受到⽹络波动的影响,不能保证⾼成功率。因此,使⽤异步的通信⽅式对架构进⾏改造。

        使⽤异步通信⽅式对模块间的调⽤进⾏解耦,可以快速的提升系统的吞吐量。上游执⾏完消息的发送业务后⽴即获得结果,下游多个服务订阅到消息后各⾃消费。 通过消息队列,屏蔽底层的通信协议,使得解藕和并⾏消费得以实现。

二、RabbitMQ介绍

市⾯上⽐较⽕爆的⼏款MQ:

ActiveMQ,RocketMQ,Kafka,RabbitMQ。

  • 语⾔的⽀持:ActiveMQ,RocketMQ只⽀持Java语⾔,Kafka可以⽀持多们语⾔,RabbitMQ⽀持多种语⾔。
  • 效率⽅⾯:ActiveMQ,RocketMQ,Kafka效率都是毫秒级别,RabbitMQ是微 秒级别的。
  • 消息丢失,消息重复问题: RabbitMQ针对消息的持久化,和重复问题都有 ⽐较成熟的解决⽅案。
  • 学习成本:RabbitMQ⾮常简单。 RabbitMQ是由Rabbit

RabbitMQ是由Rabbit公司去研发和维护的,最终是在Pivotal。

RabbitMQ严格的遵循AMQP协议,⾼级消息队列协议,帮助我们在进程之间传 递异步消息。

三.RabbitMQ安装(Docker安装)

1、启动容器

  1. docker run -d -p 15672:15672 -p 5672:5672 \
  2. -e RABBITMQ_DEFAULT_VHOST=rabbitmq \
  3. -e RABBITMQ_DEFAULT_USER=admin \
  4. -e RABBITMQ_DEFAULT_PASS=admin \
  5. --hostname myRabbit --name rabbitmq \
  6. rabbitmq

 参数说明:

  • -d:表示在后台运行容器;
  • -p:将容器的端口 5672(通行端口)和 15672 (后台管理端口)映射到主机中;
  • -e:指定环境变量:
    • RABBITMQ_DEFAULT_VHOST:默认虚拟机名;
    • RABBITMQ_DEFAULT_USER:默认的用户名;
    • RABBITMQ_DEFAULT_PASS:默认的用户密码;
  • --hostname  :指定主机名(RabbitMQ 的一个重要注意事项是它根据所谓的 节点名称 存储数据,默认为主机名);
  • --name rabbitmq :设置容器名称;
  • rabbitmq :容器使用的镜像名称;

2、启动 rabbitmq_management

docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_management

3、访问 RabbitMQ 后台管理

  • 浏览器输入地址:http://虚拟机IP地址:15672 即可访问后台管理页面
  • 默认的用户名和密码都是 admin容器创建的时候指定用户名密码);

 注意:如果是云服务器记得开放相关端口。

四、RabbitMQ架构

  • Publisher - ⽣产者:发布消息到RabbitMQ中的Exchange
  • Consumer - 消费者:监听RabbitMQ中的Queue中的消息
  • Exchange - 交换机:和⽣产者建⽴连接并接收⽣产者的消息
  • Queue - 队列:Exchange会将消息分发到指定的Queue,Queue和消费者进⾏交互
  • Routes - 路由:交换机以什么样的策略将消息发布到Queue

1、简单架构

 2、RabbitMQ的完整架构图

3.查看图形化界⾯并创建⼀个Virtual Host

虚拟主机就是⽤来将⼀个rabbitmq内部划分成多个主机,给不同的⽤户来使 ⽤,⽽不会冲突。

 创建一个新的测试用户,添加 /test Virtual Host ,并将测试用户设置为可操作 /test 的权限。

 五、RabbitMQ的队列模式

1.RabbitMQ的通讯⽅式

2.HelloWorld模式-简单队列模式 

1)新建一个Maven项目,用于方便管理生产者和消费者。

2)  创建消息的生产者(发送消息)

步骤:

  • 创建一个SpringBoot项目,命名为my-priduer-demo
  • 引入依赖
  1. <dependency>
  2. <groupId>com.rabbitmq</groupId>
  3. <artifactId>amqp-client</artifactId>
  4. <version>>5.10.0</version>
  5. </dependency>
  • 编写生产者
  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import java.io.IOException;
  5. import java.nio.charset.StandardCharsets;
  6. import java.util.concurrent.TimeoutException;
  7. public class MyProducerDemoApplication {
  8. public static final String QUEUE_NAME = "my_queue";
  9. public static void main(String[] args) throws IOException, TimeoutException {
  10. // 1.连接Broker
  11. // 1.1 获得连接工厂
  12. ConnectionFactory factory = new ConnectionFactory();
  13. factory.setHost("虚拟机地址");
  14. factory.setPort(5672);
  15. factory.setUsername("test_user");
  16. factory.setPassword("test");
  17. factory.setVirtualHost("/test"); //设置连接的虚拟主机
  18. factory.setHandshakeTimeout(3000000);
  19. // 1.2 从连接工厂获得连接对象
  20. Connection connection = factory.newConnection();
  21. // 1.3 获取chanel,用于之后发送消息的对象
  22. Channel channel = connection.createChannel();
  23. // 1.4 声明队列 (队列不存在则创建,存在则使用)
  24. /*
  25. * queue – 队列的名称 the name of the queue
  26. * durable – 是否开启持久化 true if we are declaring a durable queue (the queue will survive a server restart)
  27. * exclusive – 是否独占连接(只允许当前客户端连接) true if we are declaring an exclusive queue (restricted to this connection)
  28. * autoDelete – 是否自动删除(长时间空闲未使用) true if we are declaring an autodelete queue (server will delete it when no longer in use)
  29. * arguments – 用于封装描述队列中的其他数据 other properties (construction arguments) for the queue
  30. */
  31. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  32. // 1.5 定义消息
  33. String message = "hello,rabbitmq!";
  34. // 1.6 发送消息
  35. /*
  36. * exchange – 交换机(Hello world模式下,一定是空串,不能为Null) the exchange to publish the message to
  37. * routingKey – 路由键(当exchange为空串时,路由键为队列名称) the routing key
  38. * immediate – 立即的 true if the 'immediate' flag is to be set. Note that the RabbitMQ server does not support this flag.
  39. * mandatory – 强制的 true if the 'mandatory' flag is to be set
  40. * props – 封装描述消息的数据 other properties for the message - routing headers etc
  41. * body – 消息体 the message body
  42. */
  43. channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
  44. System.out.println("发送完毕!");
  45. // 1.7 断开连接
  46. channel.close();
  47. connection.close();
  48. }
  49. }

在客户端查看:

3) 创建消息的消费者

  • 引入依赖
  • 编写消费者
  1. import java.nio.charset.StandardCharsets;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.nio.charset.StandardCharsets;
  5. import java.util.concurrent.TimeoutException;
  6. public class MyConsumer {
  7. public static final String QUEUE_NAME = "my_queue";
  8. public static void main(String[] args) throws IOException, TimeoutException {
  9. // 1.连接Broker
  10. // 1.1 获得连接工厂
  11. ConnectionFactory factory = new ConnectionFactory();
  12. factory.setHost("虚拟机ip");
  13. factory.setPort(5672);
  14. factory.setUsername("test_user");
  15. factory.setPassword("test");
  16. factory.setVirtualHost("/test"); //设置连接的虚拟主机
  17. factory.setHandshakeTimeout(3000000);
  18. // 1.2 从连接工厂获得连接对象
  19. Connection connection = factory.newConnection();
  20. // 1.3 获取chanel,用于之后发送消息的对象
  21. Channel channel = connection.createChannel();
  22. // 1.4 创建一个Consumer对象,来处理消息----打印消息
  23. Consumer consumer = new DefaultConsumer(channel) {
  24. @Override
  25. public void handleDelivery(String consumerTag,
  26. Envelope envelope,
  27. AMQP.BasicProperties properties,
  28. byte[] body) throws IOException {
  29. System.out.println(new String(body));
  30. }
  31. };
  32. // 设置消费者监听某个队列
  33. channel.basicConsume(QUEUE_NAME,consumer);
  34. }
  35. }
  • 启动并查看控制台

  • 打开管理页面发现消息并没有被确认消费,当我们重启消费者还是会接收到这条消息

 只需要在消费者监听队列时,将AutoAck置为 true 即可

  1. // 设置消费者监听某个队列
  2. channel.basicConsume(QUEUE_NAME,true,consumer);

简单队列的问题:

        当多个消费者消费同⼀个队列时。这个时候rabbitmq的公平调度机制就开启了, 于是,⽆论消费者的消费能⼒如何,每个消费者都能公平均分到相同数量的消息, ⽽不能出现能者多劳的情况。 

手动ACK存在的问题:

        不管消费者是否消费完毕,都会马上发送ACK告知Broker消费完成,意味着Broker马上会推送下一条消息给消费者,如果此消费者消费能力较弱,则会造成消息堆积,或影响整个消息队列的消费能力。

解决方法:手动ACK

3. work 队列模式: 能者多劳模式   

将⾃动ack 改成⼿动ack

  • 消费者1
  1. public class MyConsumer2 {
  2. public static final String QUEUE_NAME = "my_work_queue";
  3. public static void main(String[] args) throws Exception {
  4. // 1.连接Broker
  5. // 1.1 获得连接工厂
  6. ConnectionFactory factory = new ConnectionFactory();
  7. factory.setHost("虚拟机ip");
  8. factory.setPort(5672);
  9. factory.setUsername("test_user");
  10. factory.setPassword("test");
  11. factory.setVirtualHost("/test"); //设置连接的虚拟主机
  12. factory.setHandshakeTimeout(3000000);
  13. // 1.2 从连接工厂获得连接对象
  14. Connection connection = factory.newConnection();
  15. // 1.3 获取chanel,用于之后发送消息的对象
  16. Channel channel = connection.createChannel();
  17. // 声明队列
  18. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  19. // 声明一次只能消费一条消息
  20. channel.basicQos(1);
  21. // 1.4 创建一个Consumer对象,来处理消息----打印消息
  22. Consumer consumer = new DefaultConsumer(channel) {
  23. @Override
  24. public void handleDelivery(String consumerTag,
  25. Envelope envelope,
  26. AMQP.BasicProperties properties,
  27. byte[] body) throws IOException {
  28. System.out.println(new String(body));
  29. // 手动ASC,告诉Broker这条消息已经被消费,可以被移除队列,并且不需要批量确认消费
  30. channel.basicAck(envelope.getDeliveryTag(),false);
  31. }
  32. };
  33. // 设置消费者监听某个队列,并修改ASC模式为手动
  34. channel.basicConsume(QUEUE_NAME,false,consumer);
  35. }
  36. }
  • 消费者2

        在消费者1的基础上将当前线程睡眠3秒,来体现消费者2消费能力弱于消费者1

        channel.basicQos(1) 声明一次只能消费一条消息

        channel.basicAck(envelope.getDeliveryTag(),false) 手动ASC,告诉Broker这条消息已经被消费,可以被移除消息队列,并且不需要批量确认消费

  • 生产者(向队列发送100条消息)
  1. public class MyProducerDemoApplication {
  2. public static final String QUEUE_NAME = "my_work_queue";
  3. public static void main(String[] args) throws IOException, TimeoutException {
  4. ConnectionFactory factory = new ConnectionFactory();
  5. factory.setHost("虚拟机ip地址");
  6. factory.setPort(5672);
  7. factory.setUsername("test_user");
  8. factory.setPassword("test");
  9. factory.setVirtualHost("/test"); //设置连接的虚拟主机
  10. factory.setHandshakeTimeout(3000000);
  11. // 1.2 从连接工厂获得连接对象
  12. Connection connection = factory.newConnection();
  13. // 1.3 获取chanel,用于之后发送消息的对象
  14. Channel channel = connection.createChannel();
  15. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  16. for (int i = 0;i<=99;i++) {
  17. // 1.5 定义消息
  18. String message = "hello,rabbitmq!"+i;
  19. channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
  20. }
  21. System.out.println("发送完毕!");
  22. // 1.7 断开连接
  23. channel.close();
  24. connection.close();
  25. }
  26. }
结果:消费者1(消费能力正常) 消费了97条消息,消费者2(消费能力较弱) 消费了3条消息,体现出了Work模式下的“能者多劳”。

4. 发布订阅模式-fanout

        对于之前的队列模式,是没有办法解决⼀条消息同时被多个消费者消费。于是使⽤ 发布订阅模式来实现。

步骤:

        生产者声明交换机,并向交换机发送消息(不再向队列发送消息)

        -->

        消费者声明队列与交换机,并将队列与交换机进行绑定

  • 编写生产者
  1. public class MyProducer {
  2. // 定义交换机名称
  3. public static final String EXCHANGE_NAME = "my_fanout_exchange";
  4. public static void main(String[] args) throws IOException, TimeoutException {
  5. // 获取连接对象
  6. Connection connection = RabbitUtil.getConnection();
  7. // 获取channel通道
  8. Channel channel = connection.createChannel();
  9. // 1、声明交换机
  10. channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
  11. // 2、生产消息,发送给交换机
  12. for (int i = 0; i < 10; i++) {
  13. String message = "message:"+i;
  14. channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes(StandardCharsets.UTF_8));
  15. }
  16. System.out.println("消息已全部发送!");
  17. }
  18. }
  • 编写消费者1

        关键动作:

                创建队列

                创建交换机

                把队列绑定在交换机上

                让消费者监听队列

  1. public class MyConsumer1 {
  2. private static String EXCHANGE_NAME = "my_fanout_exchange";
  3. private static String QUEUE_NAME = "my_fanout_queue_1";
  4. public static void main(String[] args) throws Exception {
  5. Connection connection = RabbitUtil.getConnection();
  6. Channel channel = connection.createChannel();
  7. // 1、声明交换机
  8. channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
  9. // 2、声明队列
  10. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  11. // 3、将队列与交换机进行绑定
  12. channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
  13. // 4、创建消费者
  14. Consumer consumer = new DefaultConsumer(channel){
  15. @Override
  16. public void handleDelivery(String consumerTag,
  17. Envelope envelope,
  18. AMQP.BasicProperties properties,
  19. byte[] body) throws IOException {
  20. // 消费消息
  21. System.out.println("消费者1:"+new String(body));
  22. }
  23. };
  24. // 5、让消费者监听队列
  25. channel.basicConsume(QUEUE_NAME,true,consumer);
  26. }
  27. }
  • 编写消费者2
  1. public class MyConsumer2 {
  2. private static String EXCHANGE_NAME = "my_fanout_exchange";
  3. private static String QUEUE_NAME = "my_fanout_queue_2";
  4. public static void main(String[] args) throws Exception {
  5. Connection connection = RabbitUtil.getConnection();
  6. Channel channel = connection.createChannel();
  7. // 1、声明交换机
  8. channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
  9. // 2、声明队列
  10. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  11. // 3、将队列与交换机进行绑定
  12. channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
  13. // 4、创建消费者
  14. Consumer consumer = new DefaultConsumer(channel){
  15. @Override
  16. public void handleDelivery(String consumerTag,
  17. Envelope envelope,
  18. AMQP.BasicProperties properties,
  19. byte[] body) throws IOException {
  20. // 消费消息
  21. System.out.println("消费者2:"+new String(body));
  22. }
  23. };
  24. // 5、让消费者监听队列
  25. channel.basicConsume(QUEUE_NAME,true,consumer);
  26. }
  27. }

分别启动两个消费者和一个生产者,发现两个消费者都接收到了所有的消息。

5.routing模式-direct 

关键动作:

         在⽣产者发送消息时指明routing-key

         在消费者声明队列和交换机的绑定关系时,指明routing-key

  • 编写⽣产者

        对交换机发送消息时,指定 routing-key 为 apple ,

  1. public class MyProducer {
  2. // 定义交换机名称
  3. public static final String EXCHANGE_NAME = "my_routing_exchange";
  4. public static void main(String[] args) throws IOException, TimeoutException {
  5. // 获取连接对象
  6. Connection connection = RabbitUtil.getConnection();
  7. // 获取channel通道
  8. Channel channel = connection.createChannel();
  9. // 1、声明路由模式的交换机
  10. channel.exchangeDeclare(EXCHANGE_NAME,"direct");
  11. // 2、生产消息,发送给交换机
  12. String message = "apple-message:";
  13. channel.basicPublish(EXCHANGE_NAME,"apple",null,message.getBytes(StandardCharsets.UTF_8));
  14. System.out.println("消息已全部发送!");
  15. // 3、关闭连接
  16. channel.close();
  17. connection.close();
  18. }
  19. }
  • 编写消费者1

        绑定交换机与队列时,指定 routing-key 为 apple

  1. public class MyConsumer1 {
  2. private static String EXCHANGE_NAME = "my_routing_exchange";
  3. private static String QUEUE_NAME = "my_routing_queue_1";
  4. private static String ROUTING_KEY = "apple";
  5. public static void main(String[] args) throws Exception {
  6. Connection connection = RabbitUtil.getConnection();
  7. Channel channel = connection.createChannel();
  8. // 1、声明交换机
  9. channel.exchangeDeclare(EXCHANGE_NAME,"direct");
  10. // 2、声明队列
  11. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  12. // 3、将队列与交换机进行绑定,并指定routingKey
  13. channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
  14. // 4、创建消费者
  15. Consumer consumer = new DefaultConsumer(channel){
  16. @Override
  17. public void handleDelivery(String consumerTag,
  18. Envelope envelope,
  19. AMQP.BasicProperties properties,
  20. byte[] body) throws IOException {
  21. // 消费消息
  22. System.out.println(ROUTING_KEY+":"+new String(body));
  23. }
  24. };
  25. // 5、让消费者监听队列
  26. channel.basicConsume(QUEUE_NAME,true,consumer);
  27. }
  28. }
  • 编写消费者2

        绑定交换机与队列时,指定 routing-key 为 banana

  1. public class MyConsumer2 {
  2. private static String EXCHANGE_NAME = "my_routing_exchange";
  3. private static String QUEUE_NAME = "my_routing_queue_2";
  4. private static String ROUTING_KEY = "banana";
  5. public static void main(String[] args) throws Exception {
  6. Connection connection = RabbitUtil.getConnection();
  7. Channel channel = connection.createChannel();
  8. // 1、声明交换机
  9. channel.exchangeDeclare(EXCHANGE_NAME,"direct");
  10. // 2、声明队列
  11. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  12. // 3、将队列与交换机进行绑定,并指定routingKey
  13. channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
  14. // 4、创建消费者
  15. Consumer consumer = new DefaultConsumer(channel){
  16. @Override
  17. public void handleDelivery(String consumerTag,
  18. Envelope envelope,
  19. AMQP.BasicProperties properties,
  20. byte[] body) throws IOException {
  21. // 消费消息
  22. System.out.println(ROUTING_KEY+":"+new String(body));
  23. }
  24. };
  25. // 5、让消费者监听队列
  26. channel.basicConsume(QUEUE_NAME,true,consumer);
  27. }
  28. }

分别启动两个消费者和一个生产者,发现只有队列绑定的routing-key为apple的消费者才接收到了消息,则实现了给指定队列发送消息

6.topics模式

在routing模式的基础上,对routing-key使⽤了通配符,提⾼了匹配的范围,增加了可玩性。

- *.orange.*

- *.*.rabbit 只⽀持单层级

- lazy.# 可以⽀持多层级的routing-key 

绑定关系中如果使⽤了product.* ,那么在发送消息时:

  • product.add ok product.del ok
  • product.add.one 不ok

绑定关系中如果使⽤了product.#,那么在发送消息时:

  • product.add ok
  • product.add.one ok 

编写生产者:

        用 topic模式,并设置routing-key为多层级

  1. public class MyProducer {
  2. public static final String EXCHANGE_NAME = "my_topic_exchange";
  3. public static void main(String[] args) throws Exception {
  4. // 获得连接对象与通道
  5. Connection connection = RabbitUtil.getConnection();
  6. Channel channel = connection.createChannel();
  7. // 声明交换机
  8. channel.exchangeDeclare(EXCHANGE_NAME,"topic");
  9. // 发送消息
  10. channel.basicPublish(EXCHANGE_NAME,"product.add.one",false,false,null,"hello,topic".getBytes(StandardCharsets.UTF_8));
  11. // 关闭连接
  12. channel.close();
  13. connection.close();
  14. }
  15. }

编写消费者1:

        用 topic模式,并设置routing-key为 product.*  (单层级)

  1. public class MyConsumer1 {
  2. // 交换机名称
  3. private static String EXCHANGE_NAME = "my_topic_exchange";
  4. // 队列名称
  5. private static String QUEUE_NAME = "my_topic_queue_1";
  6. public static void main(String[] args) throws Exception {
  7. Connection connection = RabbitUtil.getConnection();
  8. Channel channel = connection.createChannel();
  9. // 1、声明交换机
  10. channel.exchangeDeclare(EXCHANGE_NAME,"topic");
  11. // 2、声明队列
  12. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  13. // 3、将队列与交换机进行绑定,并指定routingKey,为 product.任意字符 都能接收
  14. channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"product.*");
  15. // 4、创建消费者
  16. Consumer consumer = new DefaultConsumer(channel){
  17. @Override
  18. public void handleDelivery(String consumerTag,
  19. Envelope envelope,
  20. AMQP.BasicProperties properties,
  21. byte[] body) throws IOException {
  22. // 消费消息
  23. System.out.println("product.*消费者:"+":"+new String(body));
  24. }
  25. };
  26. // 5、让消费者监听队列
  27. channel.basicConsume(QUEUE_NAME,true,consumer);
  28. }
  29. }

编写消费者2:

        用 topic模式,并设置routing-key为 product.#  (多层级)

  1. public class MyConsumer2 {
  2. // 交换机名称
  3. private static String EXCHANGE_NAME = "my_topic_exchange";
  4. // 队列名称
  5. private static String QUEUE_NAME = "my_topic_queue_2";
  6. public static void main(String[] args) throws Exception {
  7. Connection connection = RabbitUtil.getConnection();
  8. Channel channel = connection.createChannel();
  9. // 1、声明交换机
  10. channel.exchangeDeclare(EXCHANGE_NAME,"topic");
  11. // 2、声明队列
  12. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  13. // 3、将队列与交换机进行绑定,并指定routingKey,为 product.任意字符 都能接收
  14. channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"product.#");
  15. // 4、创建消费者
  16. Consumer consumer = new DefaultConsumer(channel){
  17. @Override
  18. public void handleDelivery(String consumerTag,
  19. Envelope envelope,
  20. AMQP.BasicProperties properties,
  21. byte[] body) throws IOException {
  22. // 消费消息
  23. System.out.println("product.*消费者:"+":"+new String(body));
  24. }
  25. };
  26. // 5、让消费者监听队列
  27. channel.basicConsume(QUEUE_NAME,true,consumer);
  28. }
  29. }

分别启动两个消费者和一个生产者,发现只有队列绑定的routing-key为product.#的消费者才接收到了消息。

六、在Springboot中使⽤RabbitMQ

1.引⼊依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

  2.编写配置⽂件      

  1. server.port=8091
  2. spring.rabbitmq.addresses=虚拟机ip地址
  3. spring.rabbitmq.port=5672
  4. spring.rabbitmq.username=test_user
  5. spring.rabbitmq.password=test
  6. spring.rabbitmq.virtual-host=/test #虚拟主机名

3.使⽤发布订阅模式

发布订阅回顾:

        消费者定义交换机与队列,将两者绑定;

        生产者向交换机发送消息;

1)编写消费者

  • 编写配置类

        定义交换机与队列,将交换机与队列进行绑定

  1. /**
  2. * RabbitMQ消费者配置类
  3. * springBoot实现消息订阅模式
  4. */
  5. @Configuration
  6. public class MyRabbitConfig {
  7. public static final String EXCHANGE_NAME = "my_boot_fanout_exchange";
  8. public static final String QUEUE_NAME = "my_boot_fanout_queue1";
  9. /**
  10. * 声明交换机
  11. */
  12. @Bean
  13. public FanoutExchange exchange(){
  14. return new FanoutExchange(EXCHANGE_NAME,true,false);
  15. }
  16. /**
  17. * 声明队列
  18. */
  19. @Bean
  20. public Queue queue(){
  21. return new Queue(QUEUE_NAME,true,false,false);
  22. }
  23. /**
  24. * 绑定交换机与队列
  25. */
  26. @Bean
  27. public Binding queueBinding(Queue queue,FanoutExchange exchange){
  28. return BindingBuilder.bind(queue).to(exchange);
  29. }
  30. }
  • 编写消费消息的⽅法

        关键:使⽤该注解来指定监听的队列@RabbitListener(queues = "要监听的队列名称")

  1. @Component
  2. public class MyConsumer {
  3. /**
  4. * 监听队列:当队列中有消息,则监听器工作,处理接收到的消息
  5. * @param message 消息体
  6. */
  7. @RabbitListener(queues = "my_boot_fanout_queue1")
  8. public void process(Message message){
  9. byte[] messageBody = message.getBody();
  10. System.out.println(new String(messageBody));
  11. }
  12. }

2)编写⽣产者

  • 编写配置类

        因为生产者只需要给交换机发送消息,所以只需要声明交换机即可

  1. @Configuration
  2. public class MyProducerConfig {
  3. public static final String EXCHANGE_NAME = "my_boot_fanout_exchange";
  4. /**
  5. * 声明交换机
  6. */
  7. @Bean
  8. public FanoutExchange exchange(){
  9. return new FanoutExchange(EXCHANGE_NAME,true,false);
  10. }
  11. }
  • 使⽤RabbitTemplate发送消息
  1. @Autowired
  2. RabbitTemplate rabbitTemplate;
  3. public static final String EXCHANGE_NAME = "my_boot_fanout_exchange";
  4. @Test
  5. void testSendMsg(){
  6. String msg = "Hello,SpringBootRabbitMQ!";
  7. rabbitTemplate.convertAndSend(EXCHANGE_NAME,"",msg);
  8. System.out.println("消息发送成功!");
  9. }

4.使⽤topic模式

        topic模式相⽐发布订阅模式,多了routing-key的使⽤

1)调整消费者配置类 (要指定Routing-key)

  1. /**
  2. * RabbitMQ消费者配置类
  3. * springBoot实现Topic模式
  4. */
  5. @Configuration
  6. public class MyRabbitTopicConfig {
  7. public static final String TOPIC_EXCHANGE_NAME = "my_boot_topic_exchange";
  8. public static final String TOPIC_QUEUE_NAME = "my_boot_topic_queue";
  9. /**
  10. * 声明交换机
  11. */
  12. @Bean
  13. public TopicExchange exchange(){
  14. return new TopicExchange(TOPIC_EXCHANGE_NAME,true,false);
  15. }
  16. /**
  17. * 声明队列
  18. */
  19. @Bean
  20. public Queue queue(){
  21. return new Queue(TOPIC_QUEUE_NAME,true,false,false);
  22. }
  23. /**
  24. * 绑定交换机与队列
  25. */
  26. @Bean
  27. public Binding queueBinding(Queue queue,TopicExchange exchange){
  28. return BindingBuilder.bind(queue).to(exchange).with("product.*"); // 能接受到routing-key为product.任意字符的消息(*单层 #多层)
  29. }
  30. }

        还需修改消费者监听的队列为这里声明的队列名

 2)编写⽣产者

  • 调整⽣产者的配置类
  1. public class MyTopicProducerConfig {
  2. public static final String TOPIC_EXCHANGE_NAME = "my_boot_topic_queue";
  3. /**
  4. * 声明交换机
  5. */
  6. @Bean
  7. public TopicExchange exchange(){
  8. return new TopicExchange(TOPIC_EXCHANGE_NAME,true,false);
  9. }
  10. }
  • 发消息时携带routing-key
  1. @Autowired
  2. RabbitTemplate rabbitTemplate;
  3. public static final String TOPIC_EXCHANGE_NAME = "my_boot_fanout_exchange";
  4. @Test
  5. void testSendMsg(){
  6. String msg = "Hello,SpringBootRabbitMQ!";
  7. rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME,"product.add",msg); //指定routing-key
  8. System.out.println("消息发送成功!");
  9. }

5.⼿动ack的实现

  • 在配置⽂件中添加⼿动ack的配置
spring.rabbitmq.listener.direct.acknowledge-mode=manual
  • 在消费者中消费完后进⾏⼿动ack
  1. @RabbitListener(queues = "my_boot_topic_queue")
  2. public void process(Message message, Channel channel) throws IOException {
  3. System.out.println("接收到的消息"+message.toString());
  4. // 手动ACK,告知Broker确认已被消费的消息id(DeliveryTag)
  5. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  6. }

 其中,message的请求头中的这两个键值对分别为:

  • spring_listener_return_correlation:该属性是⽤来确定消息被退回时调⽤哪个监听器
  • spring_returned_message_correlation:该属性是指退回待确认消息的唯⼀标识

七、消息的可靠性投递

消息的可靠性的三个保障:

        1.生产者将消息准确的投递给交换机(使用Confirm机制)

        2.交换机将消息准确的投递给队列(使用Return机制)

        3.队列将消息准确的推送给消费者(消费者手动ACK)

1.通过confirm机制保证⽣产者消息能够投递到 MQ 

  • 在spring项⽬中为生产者发送消息时使用confirm
  1. public class MyProducer {
  2. public static final String EXCHANGE_NAME = "my_topic_exchange";
  3. public static void main(String[] args) throws Exception {
  4. // 获得连接对象与通道
  5. Connection connection = RabbitUtil.getConnection();
  6. Channel channel = connection.createChannel();
  7. // 声明交换机
  8. channel.exchangeDeclare(EXCHANGE_NAME,"topic");
  9. // 开启confirm机制
  10. channel.confirmSelect();
  11. // 设置confirm监听器
  12. channel.addConfirmListener(new ConfirmListener() {
  13. // 消息被Broker确认接收了,将会回调此方法
  14. @Override
  15. public void handleAck(long l, boolean b) throws IOException {
  16. // 消息发送成功
  17. System.out.println("消息被成功投递!");
  18. }
  19. // 消息被Broker接收失败了,将会回调此方法
  20. @Override
  21. public void handleNack(long l, boolean b) throws IOException {
  22. //开启重试机制,重试达到阈值,则考虑人工介入
  23. System.out.println("消息投递失败!");
  24. }
  25. });
  26. byte[] msg = "hello,confirm message".getBytes(StandardCharsets.UTF_8);
  27. // 发送消息
  28. channel.basicPublish(EXCHANGE_NAME,"product.add",false,false,null,msg);
  29. }
  30. }

在发送消息前 使用 channel.confirmSelect()  以开启confirm机制;

                       使用  channel.addConfirmListener 来设置confirm监听器

                                其中 handleAck 为消息成功投递给交换机的回调函数 

                                        handleNack 为消息未成功投递给交换机的回调函数

  • 在SpringBoot中使用Confirm

步骤⼀:修改⽣产者的配置:

  1. server.port=8091
  2. spring.rabbitmq.addresses=虚拟机ip地址
  3. spring.rabbitmq.port=5672
  4. spring.rabbitmq.username=test_user
  5. spring.rabbitmq.password=test
  6. spring.rabbitmq.virtual-host=/test
  7. spring.rabbitmq.publisher-confirm-type: correlated

publisher-confirm-type:有三种配置:

  • simple:简单的执⾏ack的判断;在发布消息成功后使⽤rabbitTemplate调⽤ waitForConfirms或waitForConfirmsOrDie⽅法等待broker节点返回发送结果,根据返回结果来判断下⼀步的逻辑。但是要注意的是当waitForConfirmsOrDie⽅法 如果返回false则会关闭channel。
  • correlated: 执⾏ack的时候还会携带数据(消息的元数据);
  • none: 禁⽤发布确认模式, 默认的

步骤⼆:编写⼀个ConfirmCallback的实现类(监听器),并注⼊到RabbitTemplate

  1. @Component
  2. public class MyConfirmCallfack implements RabbitTemplate.ConfirmCallback {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. // 将监听器注入到RabbitTemplate中
  6. @PostConstruct
  7. public void init(){
  8. rabbitTemplate.setConfirmCallback(this);
  9. }
  10. /**
  11. * @param correlationData 消息元数据(消息id,消息内容)
  12. * @param ack 布尔值,Broker是否成功接收到消息
  13. * @param cause 投递失败的原因
  14. */
  15. @Override
  16. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  17. // 消息的id
  18. String id = correlationData.getId();
  19. if (ack){
  20. // 消息投递成功
  21. System.out.println("消息投递成功,id为:"+id);
  22. }else{
  23. // 消息投递失败,可对失败的消息进行定时重试
  24. System.out.println("消息投递失败,原因为:"+cause);
  25. }
  26. }
  27. }

 我们发现当把交换机改成一个不存在的交换机,会得到消息失败的反馈,但如果修改了错误的routing-key而导致消息未成功投递,则不会收到消息投递失败的反馈,这是因为Confirm只会关注生产者与交换机的消息投递情况。

2.通过return机制保证消息在rabbitmq中能够成功的投递到队列⾥

⽣产者将消息投递到mq的交换机上——Confirm机制来保证的。

如果交换机没办法将消息投递到队列上,就可以通过Return机制来进⾏重试。 

步骤⼀:修改配置⽂件 

开启return机制的话。需要把mandatory设置成true。

  1. server.port=8091
  2. spring.rabbitmq.addresses=虚拟机IP地址
  3. spring.rabbitmq.port=5672
  4. spring.rabbitmq.username=test_user
  5. spring.rabbitmq.password=test
  6. spring.rabbitmq.virtual-host=/test
  7. spring.rabbitmq.publisher-confirm-type: correlated
  8. spring.rabbitmq.publisher-returns: true

步骤⼆:在监听类中实现RabbitTemplate.ReturnCallback接⼝

  1. import org.springframework.amqp.core.Message;
  2. import org.springframework.amqp.rabbit.connection.CorrelationData;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Component;
  6. import javax.annotation.PostConstruct;
  7. @Component
  8. public class MyConfirmCallfack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
  9. @Autowired
  10. private RabbitTemplate rabbitTemplate;
  11. // 将监听器注入到RabbitTemplate中
  12. @PostConstruct
  13. public void init(){
  14. rabbitTemplate.setConfirmCallback(this);
  15. rabbitTemplate.setReturnCallback(this);
  16. }
  17. /**
  18. * @param correlationData 消息元数据(消息id,消息内容)
  19. * @param ack 布尔值,Broker是否成功接收到消息
  20. * @param cause 投递失败的原因
  21. */
  22. @Override
  23. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  24. // 消息的id
  25. String id = correlationData.getId();
  26. if (ack){
  27. // 消息投递成功
  28. System.out.println("消息投递成功,id为:"+id);
  29. }else{
  30. // 消息投递失败,可对失败的消息进行定时重试
  31. System.out.println("消息投递失败,原因为:"+cause);
  32. }
  33. }
  34. /**
  35. 当消息未成功被投递到队列,调用此方法
  36. */
  37. @Override
  38. public void returnedMessage(Message message, int i, String s, String s1, String s2) {
  39. System.out.println("消息"+new String(message.getBody()+"没有被成功投递到队列"));
  40. }
  41. }

3.⼿动ack、nack、reject的区别

1) 不做任何的ack

        RabbitMQ会把消息标记成unacked,此时mq是在等待消费者进⾏ack,如果消费者失去了会话,此时消息会重新回到ready状态,被其他消费者消费。

2)ack

        确认签收,之后消息会从队列中剔除

3)reject

        reject就是拒绝此条消息。

        reject⼀次只⽀持处理⼀条消息。消息被拒绝掉之后,并且requeue设置成了false, 将会进⼊到死信队列中。如果requeue设置成true,将会重回队列,但是这种情况很少使⽤。

4)nack 

  1. public class MyConsumer1 {
  2. // 交换机名称
  3. private static String EXCHANGE_NAME = "my_topic_exchange";
  4. // 队列名称
  5. private static String QUEUE_NAME = "my_topic_queue_1";
  6. public static void main(String[] args) throws Exception {
  7. Connection connection = RabbitUtil.getConnection();
  8. Channel channel = connection.createChannel();
  9. // 1、声明交换机
  10. channel.exchangeDeclare(EXCHANGE_NAME,"topic");
  11. // 2、声明队列
  12. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  13. // 3、将队列与交换机进行绑定,并指定routingKey,为 product.任意字符 都能接收
  14. channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"product.*");
  15. // 4、创建消费者
  16. Consumer consumer = new DefaultConsumer(channel){
  17. @Override
  18. public void handleDelivery(String consumerTag,
  19. Envelope envelope,
  20. AMQP.BasicProperties properties,
  21. byte[] body) throws IOException {
  22. // 消费消息
  23. System.out.println("product.*消费者:"+":"+new String(body));
  24. //⼿动ack
  25. // channel.basicAck(envelope.getDeliveryTag(),false);
  26. //reject拒签消息 ⼀次只⽀持处理⼀条消息
  27. // channel.basicReject(envelope.getDeliveryTag(),false);
  28. //nack 拒签消息 ⽀持批处理多条消息
  29. channel.basicNack(envelope.getDeliveryTag(), true,false);
  30. }
  31. };
  32. // 5、让消费者监听队列
  33. channel.basicConsume(QUEUE_NAME,true,consumer);
  34. }
  35. }

 5)消息元数据的封装

        在生产者发送消息前,可构建消息的元数据,例如消息是否持久化,消息过期时间,消息的ID及自定义的Map数据。

⽣产者端: 

  1. public class MyProducer {
  2. public static final String EXCHANGE_NAME = "my_topic_exchange";
  3. public static void main(String[] args) throws Exception {
  4. // 获得连接对象与通道
  5. Connection connection = RabbitUtil.getConnection();
  6. Channel channel = connection.createChannel();
  7. // 声明交换机
  8. channel.exchangeDeclare(EXCHANGE_NAME,"topic");
  9. // 创建消息的元数据
  10. HashMap<String, Object> map = new HashMap<>();
  11. map.put("name","zhangsan");
  12. map.put("age","18");
  13. AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
  14. .deliveryMode(2) //消息是否支持持久化:1不支持2支付
  15. .messageId(UUID.randomUUID().toString()) //定义消息的业务id
  16. .expiration("100000000") // 定义消息的过期时间
  17. .headers(map) // 头信息
  18. .build();
  19. // 发送消息
  20. channel.basicPublish(EXCHANGE_NAME,"product.#",false,false,properties,"hello,topic".getBytes(StandardCharsets.UTF_8));
  21. // 关闭连接
  22. channel.close();
  23. connection.close();
  24. }
  25. }

通过new AMQP.BasicProperties.Builder构造消息元数据

消费者端:

  1. public class MyConsumer1 {
  2. // 交换机名称
  3. private static String EXCHANGE_NAME = "my_topic_exchange";
  4. // 队列名称
  5. private static String QUEUE_NAME = "my_topic_queue_1";
  6. public static void main(String[] args) throws Exception {
  7. Connection connection = RabbitUtil.getConnection();
  8. Channel channel = connection.createChannel();
  9. // 1、声明交换机
  10. channel.exchangeDeclare(EXCHANGE_NAME,"topic");
  11. // 2、声明队列
  12. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  13. // 3、将队列与交换机进行绑定,并指定routingKey,为 product.任意字符 都能接收
  14. channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"product.add");
  15. // 4、创建消费者
  16. Consumer consumer = new DefaultConsumer(channel){
  17. @Override
  18. public void handleDelivery(String consumerTag,
  19. Envelope envelope,
  20. AMQP.BasicProperties properties,
  21. byte[] body) throws IOException {
  22. // 消费消息
  23. Map<String, Object> map = properties.getHeaders();//获取消息元数据
  24. System.out.println(map);
  25. //⼿动ack
  26. channel.basicAck(envelope.getDeliveryTag(),false);
  27. }
  28. };
  29. // 5、让消费者监听队列
  30. channel.basicConsume(QUEUE_NAME,false,consumer);
  31. }
  32. }

通过AMQP.BasicProperties获取消息元数据

⼋、消息的重复消费问题

1.什么幂等性

        幂等性:多次操作造成的结果是⼀致的。对于⾮幂等的操作,幂等性如何保证? ——使⽤分布式锁。

        使用分布式锁解决因网络抖动造成消费者成功消费后没有手动ACK而消息重复消费思路:为消息生成全局唯一ID,生产者发送消息时携带此ID,消费者成功消费后将这条消息的ID通过Redis的setnx缓存,当准备重复消费时从redis中去判断是否有该ID。

九、死信队列——“延迟”队列 

1.死信队列的介绍 

        死信队列 ,让⼀条消息,在满⾜⼀定的条件下,成为死信,会被发送到另⼀个交换机上,再被消费。 这个过程就是死信队列的作⽤。 死信队列就可以做出“延迟”队列的效果。⽐如,在订单超时未⽀付 ,将订单状态改 成“已取消”,这个操作就可以使⽤死信队列来完成。设置消息的超时时间,当消息超时则消息成为死信,于是通过监听死信队列的消费者来做取消订单的动作。 

 要掌握两个知识:

  • 消息如何成为死信? 成为死信的条件
  • 怎样创建死信队列,完成死信队列的效果

2.消息成为死信的条件

  • 消息被拒签,并且没有重回队列,消息将成为死信。(nack、reject且requeue为false)
  • 消息过期了,消息将成为死信。
  • 队列⻓度有限,存不下消息了,存不下的消息将会成为死信。

3.创建死信队列 

        关键点:让正常的队列,绑定上死信交换机即可。注意:这个死信交换机实际上也是⼀个正常交换机。

消费者):

  1. Connection connection = RabbitUtil.getConnection();
  2. Channel channel = connection.createChannel();
  3. // 声明普通交换机、普通队列 声明死信交换机、死信队列 建立他们的关系
  4. String normalExchangeName = "normal.exchange";
  5. String exchangeType = "topic";
  6. String normalQueueName = "normal.queue";
  7. String routingKey = "dlx.#";
  8. // 声明死信队列
  9. String dlxExchangeName = "dlx.exchange";
  10. String dlxQueueName = "dlx.queue";
  11. // 声明普通交换机
  12. channel.exchangeDeclare(normalExchangeName,exchangeType,true,false,null);
  13. // 为队列绑定死信交换机
  14. Map<String,Object> queueArgs = new HashMap<>();
  15. queueArgs.put("x-dead-letter-exchange",dlxExchangeName);//正常队列绑定⼀个交换机,让该交换机是死信交换机
  16. queueArgs.put("x-max-length",4); //设置队列的⻓度是4
  17. // 声明普通队列,并将带有死信交换机的消息元数据
  18. channel.queueDeclare(normalQueueName,true,false,false,queueArgs);
  19. channel.queueBind(normalQueueName,normalExchangeName,routingKey);
  20. //创建死信队列
  21. channel.exchangeDeclare(dlxExchangeName,exchangeType,true,false,null);
  22. channel.queueDeclare(dlxQueueName,true,false,false,null);
  23. channel.queueBind(dlxQueueName,dlxExchangeName,"#");

4.延迟队列

        创建一个监听死信队列的消费者,当有消息进入死信队列时,取出消息的元数据进行业务处理(例如超时取消订单),成功消费死信队列的消息后手动ACK。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/天景科技苑/article/detail/907623
推荐阅读
相关标签
  

闽ICP备14008679号