当前位置:   article > 正文

RabbitMQ使用教程(超详细)_rabbitmq教学

rabbitmq教学

推荐springCloud教程:
https://blog.csdn.net/hellozpc/article/details/83692496

推荐Springboot2.0教程:
https://blog.csdn.net/hellozpc/article/details/82531834

RabbitMQ实战教程

1.什么是MQ

  • 消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。
    其主要用途:不同进程Process/线程Thread之间通信。

为什么会产生消息队列?有几个原因:

  • 不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;

  • 不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;

  • 关于消息队列的详细介绍请参阅:
    《Java帝国之消息队列》
    《一个故事告诉你什么是消息队列》
    《到底什么时候该使用MQ》

  • MQ框架非常之多,比较流行的有RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里开源的RocketMQ。本文主要介绍RabbitMq。

  • 本教程pdf及代码下载地址
    代码:https://download.csdn.net/download/zpcandzhj/10585077
    教程:https://download.csdn.net/download/zpcandzhj/10585092

2.RabbitMQ

2.1.RabbitMQ的简介

这里写图片描述
开发语言:Erlang – 面向并发的编程语言。

这里写图片描述

2.1.1.AMQP
AMQP是消息队列的一个协议。

这里写图片描述

2.2.官网

这里写图片描述

2.3.MQ的其他产品

这里写图片描述

2.4.学习5种队列

这里写图片描述

2.5.安装文档

这里写图片描述

3.搭建RabbitMQ环境

3.1.下载

下载地址:http://www.rabbitmq.com/download.html

3.2.windows下安装

3.2.1.安装Erlang

下载:http://www.erlang.org/download/otp_win64_17.3.exe
安装:
这里写图片描述
这里写图片描述
这里写图片描述
这里写图片描述
这里写图片描述
安装完成。

3.2.2.安装RabbitMQ
这里写图片描述
这里写图片描述
这里写图片描述
安装完成。

开始菜单里出现如下选项:
这里写图片描述

启动、停止、重新安装等。

3.2.3.启用管理工具

1、双击这里写图片描述
2、进入C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.4.1\sbin输入命令:
rabbitmq-plugins enable rabbitmq_management
这里写图片描述

这样就启动了管理工具,可以试一下命令:
停止:net stop RabbitMQ
启动:net start RabbitMQ

3、在浏览器中输入地址查看:http://127.0.0.1:15672/
这里写图片描述
4、使用默认账号登录:guest/ guest

3.3.Linux下安装

3.3.1.安装Erlang

..


3.3.2.添加yum支持

  1. cd /usr/local/src/
  2. mkdir rabbitmq
  3. cd rabbitmq
  4. wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
  5. rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
  6. rpm --import http://packages.erlang-solutions.com/rpm/erlang_solutions.asc

使用yum安装:

sudo yum install erlang


这里写图片描述

3.3.3.安装RabbitMQ


上传rabbitmq-server-3.4.1-1.noarch.rpm文件到/usr/local/src/rabbitmq/
安装:

rpm -ivh rabbitmq-server-3.4.1-1.noarch.rpm

3.3.4.启动、停止

  1. service rabbitmq-server start
  2. service rabbitmq-server stop
  3. service rabbitmq-server restart

3.3.5.设置开机启动

chkconfig rabbitmq-server on

3.3.6.设置配置文件

  1. cd /etc/rabbitmq
  2. cp /usr/share/doc/rabbitmq-server-3.4.1/rabbitmq.config.example /etc/rabbitmq/
  3. mv rabbitmq.config.example rabbitmq.config

3.3.7.开启用户远程访问

vi /etc/rabbitmq/rabbitmq.config


这里写图片描述
注意要去掉后面的逗号。


3.3.8.开启web界面管理工具

  1. rabbitmq-plugins enable rabbitmq_management
  2. service rabbitmq-server restart

3.3.9.防火墙开放15672端口

  1. /sbin/iptables -I INPUT -p tcp --dport 15672 -j ACCEPT
  2. /etc/rc.d/init.d/iptables save

3.4.安装的注意事项

1、推荐使用默认的安装路径
2、系统用户名必须是英文
Win10改名字非常麻烦,具体方法百度
这里写图片描述
3、计算机名必须是英文
这里写图片描述
4、系统的用户必须是管理员

如果安装失败应该如何解决:
1、重装系统 – 不推荐
2、将RabbitMQ安装到linux虚拟机中
a)推荐
3、使用别人安装好的RabbitMQ服务
a)只要给你开通一个账户即可。
b)使用公用的RabbitMQ服务,在192.168.50.22
c)推荐

常见错误:
这里写图片描述

3.5.安装完成后操作

1、系统服务中有RabbitMQ服务,停止、启动、重启
这里写图片描述
2、打开命令行工具
这里写图片描述
如果找不到命令行工具,直接cd到相应目录:
这里写图片描述
输入命令以下启用管理插件

rabbitmq-plugins enable rabbitmq_management


这里写图片描述
查看管理页面
这里写图片描述
通过默认账户 guest/guest 登录
如果能够登录,说明安装成功。
这里写图片描述

4.添加用户

4.1.添加admin用户

这里写图片描述

4.2.用户角色

1、超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
2、监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
3、策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
4、普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
5、其他
无法登陆管理控制台,通常就是普通的生产者和消费者。

4.3.创建Virtual Hosts

这里写图片描述

选中Admin用户,设置权限:
这里写图片描述
看到权限已加:
这里写图片描述

4.4.管理界面中的功能

 

 

5.学习五种队列

这里写图片描述

5.1.导入my-rabbitmq项目

项目下载地址:
https://download.csdn.net/download/zpcandzhj/10585077
这里写图片描述

5.2.简单队列

5.2.1.图示
这里写图片描述

P:消息的生产者
C:消息的消费者
红色:队列

生产者将消息发送到队列,消费者从队列中获取消息。
5.2.2.导入RabbitMQ的客户端依赖

  1. <dependency>
  2. <groupId>com.rabbitmq</groupId>
  3. <artifactId>amqp-client</artifactId>
  4. <version>3.4.1</version>
  5. </dependency>

5.2.3.获取MQ的连接

  1. package com.zpc.rabbitmq.util;
  2. import com.rabbitmq.client.ConnectionFactory;
  3. import com.rabbitmq.client.Connection;
  4. public class ConnectionUtil {
  5. public static Connection getConnection() throws Exception {
  6. //定义连接工厂
  7. ConnectionFactory factory = new ConnectionFactory();
  8. //设置服务地址
  9. factory.setHost("localhost");
  10. //端口
  11. factory.setPort(5672);
  12. //设置账号信息,用户名、密码、vhost
  13. factory.setVirtualHost("testhost");
  14. factory.setUsername("admin");
  15. factory.setPassword("admin");
  16. // 通过工程获取连接
  17. Connection connection = factory.newConnection();
  18. return connection;
  19. }
  20. }

5.2.4.生产者发送消息到队列

  1. package com.zpc.rabbitmq.simple;
  2. import com.zpc.rabbitmq.util.ConnectionUtil;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. public class Send {
  6. private final static String QUEUE_NAME = "q_test_01";
  7. public static void main(String[] argv) throws Exception {
  8. // 获取到连接以及mq通道
  9. Connection connection = ConnectionUtil.getConnection();
  10. // 从连接中创建通道
  11. Channel channel = connection.createChannel();
  12. // 声明(创建)队列
  13. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  14. // 消息内容
  15. String message = "Hello World!";
  16. channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  17. System.out.println(" [x] Sent '" + message + "'");
  18. //关闭通道和连接
  19. channel.close();
  20. connection.close();
  21. }
  22. }

5.2.5.管理工具中查看消息
这里写图片描述

点击上面的队列名称,查询具体的队列中的信息:
这里写图片描述
5.2.6.消费者从队列中获取消息

  1. package com.zpc.rabbitmq.simple;
  2. import com.zpc.rabbitmq.util.ConnectionUtil;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.QueueingConsumer;
  6. public class Recv {
  7. private final static String QUEUE_NAME = "q_test_01";
  8. public static void main(String[] argv) throws Exception {
  9. // 获取到连接以及mq通道
  10. Connection connection = ConnectionUtil.getConnection();
  11. // 从连接中创建通道
  12. Channel channel = connection.createChannel();
  13. // 声明队列
  14. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  15. // 定义队列的消费者
  16. QueueingConsumer consumer = new QueueingConsumer(channel);
  17. // 监听队列
  18. channel.basicConsume(QUEUE_NAME, true, consumer);
  19. // 获取消息
  20. while (true) {
  21. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  22. String message = new String(delivery.getBody());
  23. System.out.println(" [x] Received '" + message + "'");
  24. }
  25. }
  26. }

5.3.Work模式

这里写图片描述

5.3.1.图示
这里写图片描述

一个生产者、2个消费者。

一个消息只能被一个消费者获取。

 5.3.2.消费者1

  1. package com.zpc.rabbitmq.work;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.QueueingConsumer;
  5. import com.zpc.rabbitmq.util.ConnectionUtil;
  6. public class Recv {
  7. private final static String QUEUE_NAME = "test_queue_work";
  8. public static void main(String[] argv) throws Exception {
  9. // 获取到连接以及mq通道
  10. Connection connection = ConnectionUtil.getConnection();
  11. Channel channel = connection.createChannel();
  12. // 声明队列
  13. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  14. // 同一时刻服务器只会发一条消息给消费者
  15. //channel.basicQos(1);
  16. // 定义队列的消费者
  17. QueueingConsumer consumer = new QueueingConsumer(channel);
  18. // 监听队列,false表示手动返回完成状态,true表示自动
  19. channel.basicConsume(QUEUE_NAME, true, consumer);
  20. // 获取消息
  21. while (true) {
  22. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  23. String message = new String(delivery.getBody());
  24. System.out.println(" [y] Received '" + message + "'");
  25. //休眠
  26. Thread.sleep(10);
  27. // 返回确认状态,注释掉表示使用自动确认模式
  28. //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  29. }
  30. }
  31. }

5.3.3.消费者2

  1. package com.zpc.rabbitmq.work;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.QueueingConsumer;
  5. import com.zpc.rabbitmq.util.ConnectionUtil;
  6. public class Recv2 {
  7. private final static String QUEUE_NAME = "test_queue_work";
  8. public static void main(String[] argv) throws Exception {
  9. // 获取到连接以及mq通道
  10. Connection connection = ConnectionUtil.getConnection();
  11. Channel channel = connection.createChannel();
  12. // 声明队列
  13. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  14. // 同一时刻服务器只会发一条消息给消费者
  15. //channel.basicQos(1);
  16. // 定义队列的消费者
  17. QueueingConsumer consumer = new QueueingConsumer(channel);
  18. // 监听队列,false表示手动返回完成状态,true表示自动
  19. channel.basicConsume(QUEUE_NAME, true, consumer);
  20. // 获取消息
  21. while (true) {
  22. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  23. String message = new String(delivery.getBody());
  24. System.out.println(" [x] Received '" + message + "'");
  25. // 休眠1
  26. Thread.sleep(1000);
  27. //下面这行注释掉表示使用自动确认模式
  28. //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  29. }
  30. }
  31. }

5.3.4.生产者
向队列中发送100条消息。

  1. package com.zpc.rabbitmq.work;
  2. import com.zpc.rabbitmq.util.ConnectionUtil;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. public class Send {
  6. private final static String QUEUE_NAME = "test_queue_work";
  7. public static void main(String[] argv) throws Exception {
  8. // 获取到连接以及mq通道
  9. Connection connection = ConnectionUtil.getConnection();
  10. Channel channel = connection.createChannel();
  11. // 声明队列
  12. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  13. for (int i = 0; i < 100; i++) {
  14. // 消息内容
  15. String message = "" + i;
  16. channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  17. System.out.println(" [x] Sent '" + message + "'");
  18. Thread.sleep(i * 10);
  19. }
  20. channel.close();
  21. connection.close();
  22. }
  23. }

5.3.5.测试
测试结果:
1、消费者1和消费者2获取到的消息内容是不同的,同一个消息只能被一个消费者获取。
2、消费者1和消费者2获取到的消息的数量是相同的,一个是消费奇数号消息,一个是偶数。

  • 其实,这样是不合理的,因为消费者1线程停顿的时间短。应该是消费者1要比消费者2获取到的消息多才对。
    RabbitMQ 默认将消息顺序发送给下一个消费者,这样,每个消费者会得到相同数量的消息。即轮询(round-robin)分发消息。

  • 怎样才能做到按照每个消费者的能力分配消息呢?联合使用 Qos 和 Acknowledge 就可以做到。
    basicQos 方法设置了当前信道最大预获取(prefetch)消息数量为1。消息从队列异步推送给消费者,消费者的 ack 也是异步发送给队列,从队列的视角去看,总是会有一批消息已推送但尚未获得 ack 确认,Qos 的 prefetchCount 参数就是用来限制这批未确认消息数量的。设为1时,队列只有在收到消费者发回的上一条消息 ack 确认后,才会向该消费者发送下一条消息。prefetchCount 的默认值为0,即没有限制,队列会将所有消息尽快发给消费者。

  • 2个概念

  • 轮询分发 :使用任务队列的优点之一就是可以轻易的并行工作。如果我们积压了好多工作,我们可以通过增加工作者(消费者)来解决这一问题,使得系统的伸缩性更加容易。在默认情况下,RabbitMQ将逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个分配)。平均每个消费者获得相同数量的消息。这种方式分发消息机制称为Round-Robin(轮询)。

  • 公平分发 :虽然上面的分配法方式也还行,但是有个问题就是:比如:现在有2个消费者,所有的奇数的消息都是繁忙的,而偶数则是轻松的。按照轮询的方式,奇数的任务交给了第一个消费者,所以一直在忙个不停。偶数的任务交给另一个消费者,则立即完成任务,然后闲得不行。而RabbitMQ则是不了解这些的。这是因为当消息进入队列,RabbitMQ就会分派消息。它不看消费者为应答的数目,只是盲目的将消息发给轮询指定的消费者。

为了解决这个问题,我们使用basicQos( prefetchCount = 1)方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。
还有一点需要注意,使用公平分发,必须关闭自动应答,改为手动应答。

5.4.Work模式的“能者多劳”

打开上述代码的注释:

  1. // 同一时刻服务器只会发一条消息给消费者
  2. channel.basicQos(1);
  1. //开启这行 表示使用手动确认模式
  2. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

同时改为手动确认:

  1. // 监听队列,false表示手动返回完成状态,true表示自动
  2. channel.basicConsume(QUEUE_NAME, false, consumer);

测试:
消费者1比消费者2获取的消息更多。

5.5.消息的确认模式

消费者从队列中获取消息,服务端如何知道消息已经被消费呢?

模式1:自动确认
只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费。
模式2:手动确认
消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态。

手动模式:
这里写图片描述

自动模式:
这里写图片描述

5.6.订阅模式

这里写图片描述
5.6.1.图示
这里写图片描述

解读:
1、1个生产者,多个消费者
2、每一个消费者都有自己的一个队列
3、生产者没有将消息直接发送到队列,而是发送到了交换机
4、每个队列都要绑定到交换机
5、生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的
注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费

这里写图片描述

5.6.2.消息的生产者(看作是后台系统)
向交换机中发送消息。

  1. package com.zpc.rabbitmq.subscribe;
  2. import com.zpc.rabbitmq.util.ConnectionUtil;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. public class Send {
  6. private final static String EXCHANGE_NAME = "test_exchange_fanout";
  7. public static void main(String[] argv) throws Exception {
  8. // 获取到连接以及mq通道
  9. Connection connection = ConnectionUtil.getConnection();
  10. Channel channel = connection.createChannel();
  11. // 声明exchange
  12. channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  13. // 消息内容
  14. String message = "Hello World!";
  15. channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
  16. System.out.println(" [x] Sent '" + message + "'");
  17. channel.close();
  18. connection.close();
  19. }
  20. }

注意:消息发送到没有队列绑定的交换机时,消息将丢失,因为,交换机没有存储消息的能力,消息只能存在在队列中。
5.6.3.消费者1(看作是前台系统)

  1. package com.zpc.rabbitmq.subscribe;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.QueueingConsumer;
  5. import com.zpc.rabbitmq.util.ConnectionUtil;
  6. public class Recv {
  7. private final static String QUEUE_NAME = "test_queue_work1";
  8. private final static String EXCHANGE_NAME = "test_exchange_fanout";
  9. public static void main(String[] argv) throws Exception {
  10. // 获取到连接以及mq通道
  11. Connection connection = ConnectionUtil.getConnection();
  12. Channel channel = connection.createChannel();
  13. // 声明队列
  14. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  15. // 绑定队列到交换机
  16. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
  17. // 同一时刻服务器只会发一条消息给消费者
  18. channel.basicQos(1);
  19. // 定义队列的消费者
  20. QueueingConsumer consumer = new QueueingConsumer(channel);
  21. // 监听队列,手动返回完成
  22. channel.basicConsume(QUEUE_NAME, false, consumer);
  23. // 获取消息
  24. while (true) {
  25. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  26. String message = new String(delivery.getBody());
  27. System.out.println(" [Recv] Received '" + message + "'");
  28. Thread.sleep(10);
  29. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  30. }
  31. }
  32. }

5.6.4.消费者2(看作是搜索系统)

  1. package com.zpc.rabbitmq.subscribe;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.QueueingConsumer;
  5. import com.zpc.rabbitmq.util.ConnectionUtil;
  6. public class Recv2 {
  7. private final static String QUEUE_NAME = "test_queue_work2";
  8. private final static String EXCHANGE_NAME = "test_exchange_fanout";
  9. public static void main(String[] argv) throws Exception {
  10. // 获取到连接以及mq通道
  11. Connection connection = ConnectionUtil.getConnection();
  12. Channel channel = connection.createChannel();
  13. // 声明队列
  14. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  15. // 绑定队列到交换机
  16. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
  17. // 同一时刻服务器只会发一条消息给消费者
  18. channel.basicQos(1);
  19. // 定义队列的消费者
  20. QueueingConsumer consumer = new QueueingConsumer(channel);
  21. // 监听队列,手动返回完成
  22. channel.basicConsume(QUEUE_NAME, false, consumer);
  23. // 获取消息
  24. while (true) {
  25. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  26. String message = new String(delivery.getBody());
  27. System.out.println(" [Recv2] Received '" + message + "'");
  28. Thread.sleep(10);
  29. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  30. }
  31. }
  32. }

5.6.5.测试
测试结果:
同一个消息被多个消费者获取。一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费到消息。

在管理工具中查看队列和交换机的绑定关系:

这里写图片描述

5.7.路由模式

这里写图片描述
5.7.1.图示
这里写图片描述

5.7.2.生产者
这里写图片描述
5.7.3.消费者1(假设是前台系统)
这里写图片描述
5.7.4.消费2(假设是搜索系统)
这里写图片描述

5.8.主题模式(通配符模式)

这里写图片描述

这里写图片描述

5.8.1.图示
这里写图片描述

同一个消息被多个消费者获取。一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费到消息。

5.8.2.生产者

  1. package com.zpc.rabbitmq.topic;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.zpc.rabbitmq.util.ConnectionUtil;
  5. public class Send {
  6. private final static String EXCHANGE_NAME = "test_exchange_topic";
  7. public static void main(String[] argv) throws Exception {
  8. // 获取到连接以及mq通道
  9. Connection connection = ConnectionUtil.getConnection();
  10. Channel channel = connection.createChannel();
  11. // 声明exchange
  12. channel.exchangeDeclare(EXCHANGE_NAME, "topic");
  13. // 消息内容
  14. String message = "Hello World!!";
  15. channel.basicPublish(EXCHANGE_NAME, "routekey.1", null, message.getBytes());
  16. System.out.println(" [x] Sent '" + message + "'");
  17. channel.close();
  18. connection.close();
  19. }
  20. }

5.8.3.消费者1(前台系统)

  1. package com.zpc.rabbitmq.topic;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.QueueingConsumer;
  5. import com.zpc.rabbitmq.util.ConnectionUtil;
  6. public class Recv {
  7. private final static String QUEUE_NAME = "test_queue_topic_work_1";
  8. private final static String EXCHANGE_NAME = "test_exchange_topic";
  9. public static void main(String[] argv) throws Exception {
  10. // 获取到连接以及mq通道
  11. Connection connection = ConnectionUtil.getConnection();
  12. Channel channel = connection.createChannel();
  13. // 声明队列
  14. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  15. // 绑定队列到交换机
  16. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routekey.*");
  17. // 同一时刻服务器只会发一条消息给消费者
  18. channel.basicQos(1);
  19. // 定义队列的消费者
  20. QueueingConsumer consumer = new QueueingConsumer(channel);
  21. // 监听队列,手动返回完成
  22. channel.basicConsume(QUEUE_NAME, false, consumer);
  23. // 获取消息
  24. while (true) {
  25. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  26. String message = new String(delivery.getBody());
  27. System.out.println(" [Recv_x] Received '" + message + "'");
  28. Thread.sleep(10);
  29. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  30. }
  31. }
  32. }

5.8.4.消费者2(搜索系统)

  1. package com.zpc.rabbitmq.topic;
  2. import com.zpc.rabbitmq.util.ConnectionUtil;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.QueueingConsumer;
  6. public class Recv2 {
  7. private final static String QUEUE_NAME = "test_queue_topic_work_2";
  8. private final static String EXCHANGE_NAME = "test_exchange_topic";
  9. public static void main(String[] argv) throws Exception {
  10. // 获取到连接以及mq通道
  11. Connection connection = ConnectionUtil.getConnection();
  12. Channel channel = connection.createChannel();
  13. // 声明队列
  14. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  15. // 绑定队列到交换机
  16. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");
  17. // 同一时刻服务器只会发一条消息给消费者
  18. channel.basicQos(1);
  19. // 定义队列的消费者
  20. QueueingConsumer consumer = new QueueingConsumer(channel);
  21. // 监听队列,手动返回完成
  22. channel.basicConsume(QUEUE_NAME, false, consumer);
  23. // 获取消息
  24. while (true) {
  25. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  26. String message = new String(delivery.getBody());
  27. System.out.println(" [Recv2_x] Received '" + message + "'");
  28. Thread.sleep(10);
  29. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  30. }
  31. }
  32. }

6.Spring-Rabbit

6.1.Spring项目

http://spring.io/projects

这里写图片描述

6.2.简介

这里写图片描述
这里写图片描述

6.3.使用

6.3.1.消费者

  1. package com.zpc.rabbitmq.spring;
  2. /**
  3. * 消费者
  4. *
  5. * @author Evan
  6. */
  7. public class Foo {
  8. //具体执行业务的方法
  9. public void listen(String foo) {
  10. System.out.println("\n消费者: " + foo + "\n");
  11. }
  12. }

6.3.2.生产者

  1. package com.zpc.rabbitmq.spring;
  2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  3. import org.springframework.context.support.AbstractApplicationContext;
  4. import org.springframework.context.support.ClassPathXmlApplicationContext;
  5. public class SpringMain {
  6. public static void main(final String... args) throws Exception {
  7. AbstractApplicationContext ctx = new ClassPathXmlApplicationContext(
  8. "classpath:spring/rabbitmq-context.xml");
  9. //RabbitMQ模板
  10. RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
  11. //发送消息
  12. template.convertAndSend("Hello, 鸟鹏!");
  13. Thread.sleep(1000);// 休眠1秒
  14. ctx.destroy(); //容器销毁
  15. }
  16. }

6.3.3.配置文件
1、定义连接工厂

  1. <!-- 定义RabbitMQ的连接工厂 -->
  2. <rabbit:connection-factory id="connectionFactory"
  3. host="127.0.0.1" port="5672" username="admin" password="admin"
  4. virtual-host="testhost" />

2、定义模板(可以指定交换机或队列)

  1. <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange" />

3、定义队列、交换机、以及完成队列和交换机的绑定

  1. <!-- 定义队列,自动声明 -->
  2. <rabbit:queue name="zpcQueue" auto-declare="true"/>
  3. <!-- 定义交换器,把Q绑定到交换机,自动声明 -->
  4. <rabbit:fanout-exchange name="fanoutExchange" auto-declare="true">
  5. <rabbit:bindings>
  6. <rabbit:binding queue="zpcQueue"/>
  7. </rabbit:bindings>
  8. </rabbit:fanout-exchange>

4、定义监听

  1. <rabbit:listener-container connection-factory="connectionFactory">
  2. <rabbit:listener ref="foo" method="listen" queue-names="zpcQueue" />
  3. </rabbit:listener-container>
  4. <bean id="foo" class="com.zpc.rabbitmq.spring.Foo" />

5、定义管理,用于管理队列、交换机等:

  1. <!-- MQ的管理,包括队列、交换器等 -->
  2. <rabbit:admin connection-factory="connectionFactory" />

完整配置文件rabbitmq-context.xml

  1. <beans xmlns="http://www.springframework.org/schema/beans"
  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  3. xsi:schemaLocation="http://www.springframework.org/schema/rabbit
  4. http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
  5. http://www.springframework.org/schema/beans
  6. http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
  7. <!-- 定义RabbitMQ的连接工厂 -->
  8. <rabbit:connection-factory id="connectionFactory"
  9. host="127.0.0.1" port="5672" username="admin" password="admin"
  10. virtual-host="testhost" />
  11. <!-- 定义Rabbit模板,指定连接工厂以及定义exchange -->
  12. <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange" />
  13. <!-- <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
  14. exchange="fanoutExchange" routing-key="foo.bar" /> -->
  15. <!-- MQ的管理,包括队列、交换器等 -->
  16. <rabbit:admin connection-factory="connectionFactory" />
  17. <!-- 定义队列,自动声明 -->
  18. <rabbit:queue name="zpcQueue" auto-declare="true"/>
  19. <!-- 定义交换器,把Q绑定到交换机,自动声明 -->
  20. <rabbit:fanout-exchange name="fanoutExchange" auto-declare="true">
  21. <rabbit:bindings>
  22. <rabbit:binding queue="zpcQueue"/>
  23. </rabbit:bindings>
  24. </rabbit:fanout-exchange>
  25. <!-- <rabbit:topic-exchange name="myExchange">
  26. <rabbit:bindings>
  27. <rabbit:binding queue="myQueue" pattern="foo.*" />
  28. </rabbit:bindings>
  29. </rabbit:topic-exchange> -->
  30. <!-- 队列监听 -->
  31. <rabbit:listener-container connection-factory="connectionFactory">
  32. <rabbit:listener ref="foo" method="listen" queue-names="zpcQueue" />
  33. </rabbit:listener-container>
  34. <bean id="foo" class="com.zpc.rabbitmq.spring.Foo" />
  35. </beans>

6.4.持久化交换机和队列

这里写图片描述

持久化:将交换机或队列的数据保存到磁盘,服务器宕机或重启之后依然存在。
非持久化:将交换机或队列的数据保存到内存,服务器宕机或重启之后将不存在。

非持久化的性能高于持久化。

如何选择持久化?非持久化? – 看需求。

7.Spring集成RabbitMQ一个完整案例

创建三个系统A,B,C
A作为生产者,B、C作为消费者(B,C作为web项目启动)
项目下载地址:https://download.csdn.net/download/zpcandzhj/10585077

7.1.在A系统中发送消息到交换机

7.1.1.导入依赖

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <groupId>com.zpc</groupId>
  6. <artifactId>myrabbitA</artifactId>
  7. <version>0.0.1-SNAPSHOT</version>
  8. <packaging>jar</packaging>
  9. <name>myrabbit</name>
  10. <dependencies>
  11. <dependency>
  12. <groupId>org.springframework.amqp</groupId>
  13. <artifactId>spring-rabbit</artifactId>
  14. <version>1.4.0.RELEASE</version>
  15. </dependency>
  16. <dependency>
  17. <groupId>com.alibaba</groupId>
  18. <artifactId>fastjson</artifactId>
  19. <version>1.2.47</version>
  20. </dependency>
  21. </dependencies>
  22. </project>

7.1.2.队列和交换机的绑定关系
实现:
1、在配置文件中将队列和交换机完成绑定
2、可以在管理界面中完成绑定
a)绑定关系如果发生变化,需要修改配置文件,并且服务需要重启
b)管理更加灵活
c)更容易对绑定关系的权限管理,流程管理
本例选择第2种方式
7.1.3.配置
rabbitmq-context.xml

  1. <beans xmlns="http://www.springframework.org/schema/beans"
  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  3. xsi:schemaLocation="http://www.springframework.org/schema/rabbit
  4. http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
  5. http://www.springframework.org/schema/beans
  6. http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
  7. <!-- 定义RabbitMQ的连接工厂 -->
  8. <rabbit:connection-factory id="connectionFactory"
  9. host="127.0.0.1" port="5672" username="admin" password="admin"
  10. virtual-host="testhost" />
  11. <!-- MQ的管理,包括队列、交换器等 -->
  12. <rabbit:admin connection-factory="connectionFactory" />
  13. <!-- 定义交换器,暂时不把Q绑定到交换机,在管理界面去绑定 -->
  14. <!--<rabbit:topic-exchange name="topicExchange" auto-declare="true" ></rabbit:topic-exchange>-->
  15. <rabbit:direct-exchange name="directExchange" auto-declare="true" ></rabbit:direct-exchange>
  16. <!--<rabbit:fanout-exchange name="fanoutExchange" auto-declare="true" ></rabbit:fanout-exchange>-->
  17. <!-- 定义Rabbit模板,指定连接工厂以及定义exchange(exchange要和上面的一致) -->
  18. <!--<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="topicExchange" />-->
  19. <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="directExchange" />
  20. <!--<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange" />-->
  21. </beans>

7.1.4.消息内容
方案:
1、消息内容使用对象做json序列化发送
a)数据大
b)有些数据其他人是可能用不到的
2、发送特定的业务字段,如id、操作类型

7.1.5.实现
生产者MsgSender.java:

  1. package com.zpc.myrabbit;
  2. import com.alibaba.fastjson.JSON;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.context.support.AbstractApplicationContext;
  5. import org.springframework.context.support.ClassPathXmlApplicationContext;
  6. import java.text.SimpleDateFormat;
  7. import java.util.Date;
  8. import java.util.HashMap;
  9. import java.util.Map;
  10. /**
  11. * 消息生产者
  12. */
  13. public class MsgSender {
  14. public static void main(String[] args) throws Exception {
  15. AbstractApplicationContext ctx = new ClassPathXmlApplicationContext(
  16. "classpath:spring/rabbitmq-context.xml");
  17. //RabbitMQ模板
  18. RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
  19. String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小时制
  20. //发送消息
  21. Map<String, Object> msg = new HashMap<String, Object>();
  22. msg.put("type", "1");
  23. msg.put("date", date);
  24. template.convertAndSend("type2", JSON.toJSONString(msg));
  25. Thread.sleep(1000);// 休眠1
  26. ctx.destroy(); //容器销毁
  27. }
  28. }

7.2.在B系统接收消息

7.2.1.导入依赖

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <groupId>com.zpc</groupId>
  6. <artifactId>myrabbitB</artifactId>
  7. <version>0.0.1-SNAPSHOT</version>
  8. <packaging>war</packaging>
  9. <name>myrabbit</name>
  10. <properties>
  11. <spring.version>4.1.3.RELEASE</spring.version>
  12. <fastjson.version>1.2.46</fastjson.version>
  13. </properties>
  14. <dependencies>
  15. <dependency>
  16. <groupId>com.rabbitmq</groupId>
  17. <artifactId>amqp-client</artifactId>
  18. <version>3.4.1</version>
  19. </dependency>
  20. <dependency>
  21. <groupId>org.springframework.amqp</groupId>
  22. <artifactId>spring-rabbit</artifactId>
  23. <version>1.4.0.RELEASE</version>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.springframework</groupId>
  27. <artifactId>spring-webmvc</artifactId>
  28. <version>${spring.version}</version>
  29. </dependency>
  30. <dependency>
  31. <groupId>com.alibaba</groupId>
  32. <artifactId>fastjson</artifactId>
  33. <version>1.2.47</version>
  34. </dependency>
  35. </dependencies>
  36. <build>
  37. <finalName>${project.artifactId}</finalName>
  38. <plugins>
  39. <!-- web层需要配置Tomcat插件 -->
  40. <plugin>
  41. <groupId>org.apache.tomcat.maven</groupId>
  42. <artifactId>tomcat7-maven-plugin</artifactId>
  43. <configuration>
  44. <path>/testRabbit</path>
  45. <uriEncoding>UTF-8</uriEncoding>
  46. <port>8081</port>
  47. </configuration>
  48. </plugin>
  49. </plugins>
  50. </build>
  51. </project>

7.2.2.配置

  1. <beans xmlns="http://www.springframework.org/schema/beans"
  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  3. xsi:schemaLocation="http://www.springframework.org/schema/rabbit
  4. http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
  5. http://www.springframework.org/schema/beans
  6. http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
  7. <!-- 定义RabbitMQ的连接工厂 -->
  8. <rabbit:connection-factory id="connectionFactory"
  9. host="127.0.0.1" port="5672" username="admin" password="admin"
  10. virtual-host="testhost" />
  11. <!-- MQ的管理,包括队列、交换器等 -->
  12. <rabbit:admin connection-factory="connectionFactory" />
  13. <!-- 定义B系统需要监听的队列,自动声明 -->
  14. <rabbit:queue name="q_topic_testB" auto-declare="true"/>
  15. <!-- 队列监听 -->
  16. <rabbit:listener-container connection-factory="connectionFactory">
  17. <rabbit:listener ref="myMQlistener" method="listen" queue-names="q_topic_testB" />
  18. </rabbit:listener-container>
  19. <bean id="myMQlistener" class="com.zpc.myrabbit.listener.Listener" />
  20. </beans>

7.2.3.具体处理逻辑

  1. public class Listener {
  2. //具体执行业务的方法
  3. public void listen(String msg) {
  4. System.out.println("\n消费者B开始处理消息: " + msg + "\n");
  5. }
  6. }

7.2.4.在界面管理工具中完成绑定关系
选中定义好的交换机(exchange)
这里写图片描述
1)direct
这里写图片描述
2)fanout
这里写图片描述
3)topic
这里写图片描述

7.3.在C系统中接收消息

(和B系统配置差不多,无非是Q名和Q对应的处理逻辑变了)

7.3.1.配置

  1. <beans xmlns="http://www.springframework.org/schema/beans"
  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  3. xsi:schemaLocation="http://www.springframework.org/schema/rabbit
  4. http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
  5. http://www.springframework.org/schema/beans
  6. http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
  7. <!-- 定义RabbitMQ的连接工厂 -->
  8. <rabbit:connection-factory id="connectionFactory"
  9. host="127.0.0.1" port="5672" username="admin" password="admin"
  10. virtual-host="testhost" />
  11. <!-- MQ的管理,包括队列、交换器等 -->
  12. <rabbit:admin connection-factory="connectionFactory" />
  13. <!-- 定义C系统需要监听的队列,自动声明 -->
  14. <rabbit:queue name="q_topic_testC" auto-declare="true"/>
  15. <!-- 队列监听 -->
  16. <rabbit:listener-container connection-factory="connectionFactory">
  17. <rabbit:listener ref="myMQlistener" method="listen" queue-names="q_topic_testC" />
  18. </rabbit:listener-container>
  19. <bean id="myMQlistener" class="com.zpc.myrabbit.listener.Listener" />
  20. </beans>

7.3.2.处理业务逻辑

  1. public class Listener {
  2. //具体执行业务的方法
  3. public void listen(String msg) {
  4. System.out.println("\n消费者C开始处理消息: " + msg + "\n");
  5. }
  6. }

7.3.3.在管理工具中绑定队列和交换机
见7.2.4

7.3.4.测试
分别启动B,C两个web应用,然后运行A的MsgSender主方法发送消息,分别测试fanout、direct、topic三种类型

8.Springboot集成RabbitMQ

8.1.简单队列

1、配置pom文件,主要是添加spring-boot-starter-amqp的支持

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

2、配置application.properties文件
配置rabbitmq的安装地址、端口以及账户信息

  1. spring.application.name=spirng-boot-rabbitmq
  2. spring.rabbitmq.host=127.0.0.1
  3. spring.rabbitmq.port=5672
  4. spring.rabbitmq.username=admin
  5. spring.rabbitmq.password=admin

3、配置队列

  1. package com.zpc.rabbitmq;
  2. import org.springframework.amqp.core.Queue;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. @Configuration
  6. public class RabbitConfig {
  7. @Bean
  8. public Queue queue() {
  9. return new Queue("q_hello");
  10. }
  11. }

4、发送者

  1. package com.zpc.rabbitmq;
  2. import org.springframework.amqp.core.AmqpTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Component;
  5. import java.text.SimpleDateFormat;
  6. import java.util.Date;
  7. @Component
  8. public class HelloSender {
  9. @Autowired
  10. private AmqpTemplate rabbitTemplate;
  11. public void send() {
  12. String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小时制
  13. String context = "hello " + date;
  14. System.out.println("Sender : " + context);
  15. //简单对列的情况下routingKey即为Q名
  16. this.rabbitTemplate.convertAndSend("q_hello", context);
  17. }
  18. }

5、接收者

  1. package com.zpc.rabbitmq;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. @RabbitListener(queues = "q_hello")
  7. public class HelloReceiver {
  8. @RabbitHandler
  9. public void process(String hello) {
  10. System.out.println("Receiver : " + hello);
  11. }
  12. }

6、测试

  1. package com.zpc.rabbitmq;
  2. import org.junit.Test;
  3. import org.junit.runner.RunWith;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.boot.test.context.SpringBootTest;
  6. import org.springframework.test.context.junit4.SpringRunner;
  7. @RunWith(SpringRunner.class)
  8. @SpringBootTest
  9. public class RabbitMqHelloTest {
  10. @Autowired
  11. private HelloSender helloSender;
  12. @Test
  13. public void hello() throws Exception {
  14. helloSender.send();
  15. }
  16. }

8.2.多对多使用(Work模式)

注册两个Receiver:

  1. package com.zpc.rabbitmq;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. @RabbitListener(queues = "q_hello")
  7. public class HelloReceiver2 {
  8. @RabbitHandler
  9. public void process(String hello) {
  10. System.out.println("Receiver2 : " + hello);
  11. }
  12. }
  1. @Test
  2. public void oneToMany() throws Exception {
  3. for (int i=0;i<100;i++){
  4. helloSender.send(i);
  5. Thread.sleep(300);
  6. }
  7. }
  1. public void send(int i) {
  2. String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小时制
  3. String context = "hello " + i + " " + date;
  4. System.out.println("Sender : " + context);
  5. //简单对列的情况下routingKey即为Q名
  6. this.rabbitTemplate.convertAndSend("q_hello", context);
  7. }

8.3.Topic Exchange(主题模式)

  • topic 是RabbitMQ中最灵活的一种方式,可以根据routing_key自由的绑定不同的队列

首先对topic规则配置,这里使用两个队列(消费者)来演示。
1)配置队列,绑定交换机

  1. package com.zpc.rabbitmq.topic;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.amqp.core.TopicExchange;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. @Configuration
  9. public class TopicRabbitConfig {
  10. final static String message = "q_topic_message";
  11. final static String messages = "q_topic_messages";
  12. @Bean
  13. public Queue queueMessage() {
  14. return new Queue(TopicRabbitConfig.message);
  15. }
  16. @Bean
  17. public Queue queueMessages() {
  18. return new Queue(TopicRabbitConfig.messages);
  19. }
  20. /**
  21. * 声明一个Topic类型的交换机
  22. * @return
  23. */
  24. @Bean
  25. TopicExchange exchange() {
  26. return new TopicExchange("mybootexchange");
  27. }
  28. /**
  29. * 绑定Q到交换机,并且指定routingKey
  30. * @param queueMessage
  31. * @param exchange
  32. * @return
  33. */
  34. @Bean
  35. Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
  36. return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
  37. }
  38. @Bean
  39. Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
  40. return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
  41. }
  42. }

2)创建2个消费者
q_topic_message 和q_topic_messages

  1. package com.zpc.rabbitmq.topic;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. @RabbitListener(queues = "q_topic_message")
  7. public class Receiver1 {
  8. @RabbitHandler
  9. public void process(String hello) {
  10. System.out.println("Receiver1 : " + hello);
  11. }
  12. }
  1. package com.zpc.rabbitmq.topic;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. @RabbitListener(queues = "q_topic_messages")
  7. public class Receiver2 {
  8. @RabbitHandler
  9. public void process(String hello) {
  10. System.out.println("Receiver2 : " + hello);
  11. }
  12. }

3)消息发送者(生产者)

  1. package com.zpc.rabbitmq.topic;
  2. import org.springframework.amqp.core.AmqpTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. public class MsgSender {
  7. @Autowired
  8. private AmqpTemplate rabbitTemplate;
  9. public void send1() {
  10. String context = "hi, i am message 1";
  11. System.out.println("Sender : " + context);
  12. this.rabbitTemplate.convertAndSend("mybootexchange", "topic.message", context);
  13. }
  14. public void send2() {
  15. String context = "hi, i am messages 2";
  16. System.out.println("Sender : " + context);
  17. this.rabbitTemplate.convertAndSend("mybootexchange", "topic.messages", context);
  18. }
  19. }

send1方法会匹配到topic.#和topic.message,两个Receiver都可以收到消息,发送send2只有topic.#可以匹配所有只有Receiver2监听到消息。
4)测试

  1. package com.zpc.rabbitmq.topic;
  2. import org.junit.Test;
  3. import org.junit.runner.RunWith;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.boot.test.context.SpringBootTest;
  6. import org.springframework.test.context.junit4.SpringRunner;
  7. @RunWith(SpringRunner.class)
  8. @SpringBootTest
  9. public class RabbitTopicTest {
  10. @Autowired
  11. private MsgSender msgSender;
  12. @Test
  13. public void send1() throws Exception {
  14. msgSender.send1();
  15. }
  16. @Test
  17. public void send2() throws Exception {
  18. msgSender.send2();
  19. }
  20. }

8.4.Fanout Exchange(订阅模式)

  • Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
    1)配置队列,绑定交换机
  1. package com.zpc.rabbitmq.fanout;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.FanoutExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. @Configuration
  9. public class FanoutRabbitConfig {
  10. @Bean
  11. public Queue aMessage() {
  12. return new Queue("q_fanout_A");
  13. }
  14. @Bean
  15. public Queue bMessage() {
  16. return new Queue("q_fanout_B");
  17. }
  18. @Bean
  19. public Queue cMessage() {
  20. return new Queue("q_fanout_C");
  21. }
  22. @Bean
  23. FanoutExchange fanoutExchange() {
  24. return new FanoutExchange("mybootfanoutExchange");
  25. }
  26. @Bean
  27. Binding bindingExchangeA(Queue aMessage, FanoutExchange fanoutExchange) {
  28. return BindingBuilder.bind(aMessage).to(fanoutExchange);
  29. }
  30. @Bean
  31. Binding bindingExchangeB(Queue bMessage, FanoutExchange fanoutExchange) {
  32. return BindingBuilder.bind(bMessage).to(fanoutExchange);
  33. }
  34. @Bean
  35. Binding bindingExchangeC(Queue cMessage, FanoutExchange fanoutExchange) {
  36. return BindingBuilder.bind(cMessage).to(fanoutExchange);
  37. }
  38. }

2)创建3个消费者

  1. package com.zpc.rabbitmq.fanout;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. @RabbitListener(queues = "q_fanout_A")
  7. public class ReceiverA {
  8. @RabbitHandler
  9. public void process(String hello) {
  10. System.out.println("AReceiver : " + hello + "/n");
  11. }
  12. }
  1. package com.zpc.rabbitmq.fanout;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. @RabbitListener(queues = "q_fanout_B")
  7. public class ReceiverB {
  8. @RabbitHandler
  9. public void process(String hello) {
  10. System.out.println("BReceiver : " + hello + "/n");
  11. }
  12. }
  1. package com.zpc.rabbitmq.fanout;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. @RabbitListener(queues = "q_fanout_C")
  7. public class ReceiverC {
  8. @RabbitHandler
  9. public void process(String hello) {
  10. System.out.println("CReceiver : " + hello + "/n");
  11. }
  12. }

3)生产者

  1. package com.zpc.rabbitmq.fanout;
  2. import org.springframework.amqp.core.AmqpTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. public class MsgSenderFanout {
  7. @Autowired
  8. private AmqpTemplate rabbitTemplate;
  9. public void send() {
  10. String context = "hi, fanout msg ";
  11. System.out.println("Sender : " + context);
  12. this.rabbitTemplate.convertAndSend("mybootfanoutExchange","", context);
  13. }
  14. }

4)测试

  1. package com.zpc.rabbitmq.fanout;
  2. import org.junit.Test;
  3. import org.junit.runner.RunWith;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.boot.test.context.SpringBootTest;
  6. import org.springframework.test.context.junit4.SpringRunner;
  7. @RunWith(SpringRunner.class)
  8. @SpringBootTest
  9. public class RabbitFanoutTest {
  10. @Autowired
  11. private MsgSenderFanout msgSender;
  12. @Test
  13. public void send1() throws Exception {
  14. msgSender.send();
  15. }
  16. }

结果如下,三个消费者都收到消息:
AReceiver : hi, fanout msg
CReceiver : hi, fanout msg
BReceiver : hi, fanout msg

9.总结

推荐springCloud教程:
https://blog.csdn.net/hellozpc/article/details/83692496

推荐Springboot2.0教程:
https://blog.csdn.net/hellozpc/article/details/82531834

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/煮酒与君饮/article/detail/808778
推荐阅读
相关标签
  

闽ICP备14008679号