当前位置:   article > 正文

RabbitMQ_rabbitmq启动修改端口

rabbitmq启动修改端口

测试代码——消费者1

一、rabbitmq的安装及启动

1.下载对应版本的erlang和rabbitmq文件。(第二推荐)

或者你可以打开下面的网站找到你想要的版本(推荐,使用此方法可跳过2、3两步)

在创建文件夹并进入后输入页面的命令,例如:

  1. #下载erlang
  2. curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
  3. #安装erlang
  4. sudo yum install erlang-debuginfo-23.3.4.11-1.el7.x86_64
  5. #下载rabbitmq
  6. curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
  7. #安装rabbitmq
  8. sudo yum install rabbitmq-server-3.10.0-1.el7.noarch

下载地址:rabbitmq/erlang - Packages · packagecloud

rabbitmq/rabbitmq-server - Packages · packagecloud

查看对应版本:RabbitMQ Erlang Version Requirements — RabbitMQ

我这里使用的本虚拟机是centos7于是选择了这两个版

2.在虚拟机中使用mkdir命令创建文件夹 ——/root/rabbitmq(使用推荐方法后跳过此步骤)

3.把文件传输到虚拟机中(使用推荐方法后跳过此步骤)

首先使用ip addr命令查看虚拟机的ip

 在物理机上win+r -->cmd -->scp "C:/Users/q/Downloads/rabbitmq-server-3.10.0.rc.3-1.el7.noarch.rpm" root@172.20.10.2:/root/rabbitmq

 在执行命令前物理机会尝试连接虚拟机,如果超时可以参考如下三个教程(练习环境中可以关闭防火墙,如是生产环境,为了网络安全,请看其他教程开放15672及5672端口)

Centos怎么关闭防火墙?Centos7关闭防火墙详细教程 -Win7系统之家

CentOS7配置IP_centos7配置ip地址_凡梦千华的博客-CSDN博客

Permission denied (publickey,gssapi-keyex,gssapi-with-mic)_H.S.T不想卷的博客-CSDN博客

在执行命令时会要求你输入密码,输入对应用户的密码就可以了。

4.安装完成启动rabbitmq

  1. #安装rabbitmq
  2. yum localinstall rabbitmq-server3.8.6-1.el7.noarch.rpm

 注意e后面是小的l不是1,当出现如下页面时输入y并按回车键

输入如下命令启动并查看rabbitmq服务的状态

  1. #启动rabbitmq
  2. systemctl start rabbitmq-server
  3. #查看rabbitmq的状态
  4. systemctl status rabbitmq-server

 显示如下内容则为rabbitmq已经成功启动

5.安装rabbitmq管理页面及创建用户

 默认情况下rabbimq没有安装web端的客户端软件,需要安装才能生效

  1. # 打开rabbitmq管理界面插件
  2. rabbitmq-plugins enable rabbitmq_management

安装完成后即可通过服务器ip:15672打开管理后台

如果此时你使用的是服务器本机那么可以使用默认用户guest访问,如果使用其他电脑那么需要新创建一个用户并授权

 6.新建远程用户

操作权限分为下列四种:
administrator:(管理员)可以登录控制台、查看所有信息、并对rabbitmq进行管理
monToring:(监控者)可以登录控制台,查看所有信息
policymaker:(策略制定者)可以登录控制台指定策略
managment:(普通管理员)可以进行登录控制

  1. #添加用户
  2. rabbitmqctl add_user 用户名 密码
  3. #设置用户角色,并分配操作权限
  4. rabbitmqctl set_user_tags 用户名 角色
  5. #为用户添加资源权限(授予虚拟机根节点的所有权限)
  6. rabbitmqctl set_permissions -p / 用户名 ".*" ".*" ".*"

7.修改默认端口

使用whereis reabbitmq查看rabbitmq的安装路径

新增配置文件(在/etc/rabbitmq目录下)

vim /etc/rabbitmq/rabbitmq.conf

进入文本编辑器后按i键输入如下配置即更改了通讯端口为5673 

输入如下命令修改默认配置,请根据你们的rabbitmq版本对命令进行修改

输入结束后按ESC键,输入:wq并按回车键

重启rabbitmq即完成修改,如想测试请更改management.tcp.port后的端口,重启后使用服务器ip+新端口号才可访问rabbitmq的管理后台

systemctl restart rabbitmq.server

以下步骤因本人测试后出现问题,无法启动rabbitmq,故不推荐修改:

vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.6/sbin/rabbitmq-defaults

 进入文本编辑器后按i键,将光标定位到BOOT_MODULE="rabbit"下面并输入(根据你们放置的路径修改):

CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf

 输入结束后按ESC键,输入:wq并按回车键

重启rabbitmq

systemctl restart rabbitmq-server

二、编写测试程序

1.新建一个maven工程

2.引入依赖(当我未引入slf4j-nop时程序启动报错SLF4J: Failed to load class “org.slf4j.impl.StaticLoggerBinder,引入改依赖后错误消失)

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.slf4j</groupId>
  4. <artifactId>slf4j-nop</artifactId>
  5. <version>1.7.2</version>
  6. </dependency>
  7. <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
  8. <dependency>
  9. <groupId>com.rabbitmq</groupId>
  10. <artifactId>amqp-client</artifactId>
  11. <version>5.18.0</version>
  12. </dependency>
  13. <dependency>
  14. <groupId>commons-io</groupId>
  15. <artifactId>commons-io</artifactId>
  16. <version>2.6</version>
  17. </dependency>
  18. </dependencies>

参考:Maven项目报错:“ SLF4J: Failed to load class “org.slf4j.impl.StaticLoggerBinder”的解决方案_java_脚本之家

3.编写 生产者代码(其中的异常既可以像下面这样写,也可以throws)

  1. public class Producer {
  2. public static final String QUEUE_NAME="hello";
  3. public static void main(String[] args) {
  4. //创建一个连接工厂
  5. ConnectionFactory factory=new ConnectionFactory();
  6. //工厂IP连接RabbitMQ的队列
  7. factory.setHost("172.20.10.2");
  8. //用户名
  9. factory.setUsername("admin");
  10. //密码
  11. factory.setPassword("root");
  12. try {
  13. //创建连接
  14. Connection connection = factory.newConnection();
  15. //获取信道
  16. Channel channel = connection.createChannel();
  17. /**
  18. * 生成一个队列
  19. * 1.队列名称
  20. * 2.队列里面的消息是否持久化(写入磁盘)默认情况下是不持久化的(写入内存)
  21. * 3.该队列是否只供一个消费者进行消费,是否进行消息共享,true可以多个消费者消费,false不能多个消费者消费
  22. * 4.是否自动删除 最后一个消费者断开连接以后,该队列是否自动删除 true自动删除,false不自动删除
  23. * 5.其他参数
  24. */
  25. AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  26. //发消息
  27. String message="hello world";//初次使用
  28. /**
  29. * 发送一个消费
  30. * 1.发送到哪个交换机
  31. * 2.路由的key值是哪个 本次队列的名称
  32. * 3.其他参数消息
  33. * 4.发送消息的消息体
  34. */
  35. channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
  36. System.out.println("发送完成~");
  37. } catch (IOException e) {
  38. throw new RuntimeException(e);
  39. } catch (TimeoutException e) {
  40. throw new RuntimeException(e);
  41. }
  42. }
  43. }

 编写完成后即开始测试,发送后rabbitmq后台可看到“hello”队列

4.编写消费者代码

  1. public class Consumer {
  2. //队列的名称
  3. public static final String QUEUE_NAME="hello";
  4. //接收消息
  5. public static void main(String[] args) {
  6. //创建连接工厂
  7. ConnectionFactory factory = new ConnectionFactory();
  8. try {
  9. factory.setHost("172.20.10.2");
  10. factory.setUsername("admin");
  11. factory.setPassword("root");
  12. //创建连接
  13. Connection connection = factory.newConnection();
  14. //创建信道
  15. Channel channel = connection.createChannel();
  16. //声明接收消息
  17. DeliverCallback deliverCallback = (consumerTag,message) ->{
  18. System.out.println(message.getBody().toString());
  19. };
  20. //声明取消消息
  21. CancelCallback cancel =(String var1) ->{
  22. System.out.println("接收失败");
  23. };
  24. /**
  25. * 消费者消费消息
  26. * 1.消费哪个队列
  27. * 2.消费成功之后是否要自动应答 true代表的是自动应答,false代表手动应答
  28. * 3.消费者未成功消费的回调
  29. * 4.消费者取消费的回调
  30. */
  31. channel.basicConsume(QUEUE_NAME,true,deliverCallback, cancel);
  32. } catch (IOException e) {
  33. throw new RuntimeException(e);
  34. } catch (TimeoutException e) {
  35. throw new RuntimeException(e);
  36. }
  37. }
  38. }

测试打印出来的是一个对象,修改输出语句为,输出即为可读的hello word

System.out.println(new String(message.getBody()));

 小插曲:win10设置打不开

1.按下WIN+X打开超级菜单,点击命令提示符(powershell管理员)

2.输入 sfc /SCANNOW 按下回车开始扫描

3.完成后重启即可解决

三、工作队列

1.编写测试生产者,使用控制台输入发送内容

  1. public class WorkQueueTest {
  2. public static void main(String[] args) throws Exception {
  3. Channel channel = RabbitMQUtil.getChannel();
  4. channel.queueDeclare("hello", false, false, false, null);
  5. Scanner scanner = new Scanner(System.in);
  6. while (scanner.hasNext()) {
  7. String message = scanner.next();
  8. channel.basicPublish("", "hello",null, message.getBytes());
  9. System.out.println("发送完成");
  10. }
  11. }
  12. }

2.启动两个消费者

启动一个消费者后点击如下按钮-->Edit Configurations-->Modify options-->勾选Allow multiple instances-->启动

 

 随后在控制台键入输入4次数据,并成功接收,测试结束。(轮训分发,公平分发)

四、消息应答

为了保证消息在发送过程中不丢失,rabbitmq引入消息应答。消息应答:消费者在接收到消息并处理该消息后,告诉rabbitmq它已经处理了,rabbitmq可以把该消息删除了。

当关闭消息自动应答时,消费者未ack,消息将自动重新入队。

推荐:关闭消息自动应答,关闭消息批量应答

1.测试

生产者:

  1. /**
  2. * @author tom
  3. * @create 2023-08-23 16:42
  4. *
  5. * 消息在手动应答时是不丢失、放回队列中重新消费
  6. */
  7. public class Task2 {
  8. //信道名
  9. public static final String QUEUE_NAME="ack_queue";
  10. public static void main(String[] args) throws Exception {
  11. //获取信道
  12. Channel channel = RabbitMQUtil.getChannel();
  13. //声明队列
  14. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  15. //从控制台中输入信息
  16. Scanner scanner = new Scanner(System.in);
  17. while (scanner.hasNext()){
  18. String message = scanner.next();
  19. channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
  20. System.out.println("生产者发出消息:"+message);
  21. }
  22. }
  23. }

消费者1:

  1. /**
  2. * @author tom
  3. * @create 2023-08-23 17:06
  4. * <p>
  5. * 消息收到应答时不丢失、放回队列中重新消费
  6. */
  7. public class Work02 {
  8. public static final String TASK_NAME = "ack_queue";
  9. public static void main(String[] args) throws Exception {
  10. Channel channel = RabbitMQUtil.getChannel();
  11. //消费成功回调
  12. DeliverCallback deliver = (consumerTag, message) -> {
  13. //沉睡1S
  14. SleepUtils.sleep(1);
  15. System.out.println("消息消费成功"+new String(message.getBody(),"UTF-8"));
  16. //手动应答
  17. channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
  18. };
  19. //消费失败回调
  20. CancelCallback handle = (String var1) -> {
  21. System.out.println("消息消费失败");
  22. };
  23. boolean autoAck = false;
  24. //消费信息
  25. channel.basicConsume(TASK_NAME, autoAck, deliver, handle);
  26. }
  27. }

消费者2:

  1. /**
  2. * @author tom
  3. * @create 2023-08-23 17:06
  4. * <p>
  5. * 消息收到应答时不丢失、放回队列中重新消费
  6. */
  7. public class Work03 {
  8. public static final String TASK_NAME = "ack_queue";
  9. public static void main(String[] args) throws Exception {
  10. Channel channel = RabbitMQUtil.getChannel();
  11. //消费成功回调
  12. DeliverCallback deliver = (consumerTag, message) -> {
  13. //沉睡1S
  14. SleepUtils.sleep(10);
  15. System.out.println("消息消费成功"+new String(message.getBody(),"UTF-8"));
  16. //手动应答
  17. channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
  18. };
  19. //消费失败回调
  20. CancelCallback handle = (String var1) -> {
  21. System.out.println("消息消费失败");
  22. };
  23. boolean autoAck = false;
  24. //消费信息
  25. channel.basicConsume(TASK_NAME, autoAck, deliver, handle);
  26. }
  27. }

测试结果:

当发送44时关闭work03,消息没有被丢弃而是重新回到队列中

2. 开启队列持久化(在生产者处设置)

首先删除队列

点击对应队列 

 已开启持久化

 当队列被持久化后,即便重启rabbitmq,队列也依然存在

3.消息实现持久化(在生产者处设置)

  1. //设置生产者发送消息为持久化消息(保存到磁盘上),当前无法完全确保消息持久化,需要完全支持化请搜索发布确认相关内容
  2. channel.basicPublish("",QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));

4.不公平分发(在2个消费者处都设置一下)

  1. //设置为不公平分发
  2. int prefetchCount=1;
  3. channel.basicQos(prefetchCount);

测试结果:

 

 

 因work02沉睡1s而work03沉睡10s,所以更多的消息被发送给了work02

5.预取值prefetch

设置消费者一次取几条消息

 设置预取值(同不公平分发的设置一样)

  1. //预期值
  2. int prefetchCount=5;
  3. channel.basicQos(prefetchCount);

在信道推送消息时将按顺序推送,即轮流给两个消费者推送,当消费者未消费的信息等于预取值时,队列将直接将该消息推送给其他消费者

五、发布确认

发布确认的流程

 开启发布确认(生产者)

  1. //开启发布确认
  2. channel.confirmSelect();

发布确认 (生产者)

channel.waitForConfirms();
  1. package com.atguigu.rabbitmq.four;
  2. import com.atguigu.rabbitmq.util.RabbitMQUtil;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.ConfirmCallback;
  5. import java.util.UUID;
  6. import java.util.concurrent.ConcurrentHashMap;
  7. /**
  8. * @author tom
  9. * @create 2023-08-31 9:04
  10. */
  11. public class AsyncWorkQueue {
  12. static int MAX = 1000;
  13. public static void main(String[] args) throws Exception {
  14. //单个发布确认
  15. // publishMessageIndividually(); //600ms
  16. //批量发布确认
  17. // BatchReleaseConfirmation(); //80ms
  18. //异步发布确认
  19. AsyncReleaseConfirmation(); //35ms
  20. }
  21. public static void publishMessageIndividually() throws Exception {
  22. //获取信道
  23. Channel channel = RabbitMQUtil.getChannel();
  24. //开启发布确认
  25. channel.confirmSelect();
  26. //随机生成队列名
  27. String queueName = UUID.randomUUID().toString();
  28. //队列构造 队列名/是否持久化/是否供多个消费者消费/是否自动删除/其他参数
  29. channel.queueDeclare(queueName, true, false, false, null);
  30. long begin = System.currentTimeMillis();
  31. int counter = 0;
  32. for (int i = 0; i < MAX; i++) {
  33. String message = "消息" + i;
  34. //发送到哪个交换机/队列名/其他参数/发送消息的消息体
  35. channel.basicPublish("", queueName, null, message.getBytes("UTF-8"));
  36. //单个消息就马上进行发布确认
  37. boolean flag = channel.waitForConfirms();
  38. if (flag = true)
  39. counter++;
  40. }
  41. if (counter == MAX) {
  42. long end = System.currentTimeMillis();
  43. long result = end - begin;
  44. System.out.println("发送1000条消息使用了" + result + "ms");
  45. }
  46. }
  47. public static void BatchReleaseConfirmation() throws Exception {
  48. //获取信道
  49. Channel channel = RabbitMQUtil.getChannel();
  50. //开启发布确认
  51. channel.confirmSelect();
  52. //随机生成队列名
  53. String queueName = UUID.randomUUID().toString();
  54. //队列构造 队列名/是否持久化/是否供多个消费者消费/是否自动删除/其他参数
  55. channel.queueDeclare(queueName, true, false, false, null);
  56. long begin = System.currentTimeMillis();
  57. int counter = 0;
  58. for (int i = 0; i < MAX; i++) {
  59. String message = "消息" + i;
  60. //发送到哪个交换机/队列名/其他参数/发送消息的消息体
  61. channel.basicPublish("", queueName, null, message.getBytes("UTF-8"));
  62. if (i / 100 == 0) {
  63. channel.waitForConfirms();
  64. }
  65. counter++;
  66. }
  67. if (counter == MAX) {
  68. long end = System.currentTimeMillis();
  69. long result = end - begin;
  70. System.out.println("发送1000条消息使用了" + result + "ms");
  71. }
  72. }
  73. public static void AsyncReleaseConfirmation() throws Exception {
  74. //获取信道
  75. Channel channel = RabbitMQUtil.getChannel();
  76. //开启发布确认
  77. channel.confirmSelect();
  78. /**
  79. * 线程安全有序的一个哈希表 设用于高并发的情况下
  80. *1.能轻松的将序号与消息进行关联
  81. * 2.可以轻松的批量删除内容,只要给到序号
  82. * 3.支持高并发(多线程)
  83. */
  84. ConcurrentHashMap<Long, String> OutStandingConfirms = new ConcurrentHashMap<>();
  85. //随机生成队列名
  86. String queueName = UUID.randomUUID().toString();
  87. //队列构造 队列名/是否持久化/是否供多个消费者消费/是否自动删除/其他参数
  88. channel.queueDeclare(queueName, true, false, false, null);
  89. //成功回调
  90. /**
  91. *
  92. * deliveryTag 标记
  93. * multiple 是否批量确认
  94. */
  95. ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
  96. //2.判断时否是批量确认
  97. if (multiple){
  98. //如果为批量确认,则直接清理
  99. OutStandingConfirms.clear();
  100. }else {
  101. //若不为批量确认,则删除对应序号的消息
  102. OutStandingConfirms.remove(deliveryTag);
  103. }
  104. System.out.println("确认成功的消息"+deliveryTag);
  105. };
  106. //失败回调
  107. ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
  108. System.out.println("确认失败的消息"+deliveryTag);
  109. };
  110. //开启监听
  111. /**
  112. *
  113. * 1.监听成功的消息
  114. * 2.监听失败的消息
  115. */
  116. channel.addConfirmListener(ackCallback, nackCallback);//异步通知
  117. //开始时间
  118. long begin = System.currentTimeMillis();
  119. int counter = 0;
  120. for (int i = 0; i < MAX; i++) {
  121. String message = "消息" + i;
  122. //发送到哪个交换机/队列名/其他参数/发送消息的消息体
  123. channel.basicPublish("", queueName, null, message.getBytes("UTF-8"));
  124. //1:记录下所有要发送的消息 消息的总和
  125. OutStandingConfirms.put(channel.getNextPublishSeqNo(),message);
  126. counter++;
  127. }
  128. if (counter == MAX) {
  129. long end = System.currentTimeMillis();
  130. long result = end - begin;
  131. System.out.println("发送1000条消息使用了" + result + "ms");
  132. }
  133. }
  134. }

五、交换机

发布、订阅模式fanout        (广播:多个交换机都会收到)——效率最低

测试代码——生产者:

  1. public class EmitLog {
  2. public static final String EXCHANGE_NAME="logs" ;
  3. public static void main(String[] args) throws Exception {
  4. //获取信道
  5. Channel channel = RabbitMQUtil.getChannel();
  6. //构造交换机
  7. channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
  8. //输入
  9. Scanner scanner = new Scanner(System.in);
  10. while (scanner.hasNext()){
  11. String message = scanner.next();
  12. channel.basicPublish(EXCHANGE_NAME,"123",null,message.getBytes("UTF-8"));
  13. System.out.print("发送消息"+message);
  14. }
  15. }
  16. }

测试代码——消费者1

  1. public class ReceiveLogs01 {
  2. //交换机名称
  3. public static final String EXCHANGE_NAME="logs";
  4. public static void main(String[] args) throws Exception{
  5. Channel channel = RabbitMQUtil.getChannel();
  6. //声明一个交换机
  7. channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  8. //声明一个队列 临时队列
  9. /**
  10. * 队列名称是随机的
  11. * 当消费者断开与队列的连接的时候 队列就自动删除
  12. */
  13. String queue = channel.queueDeclare().getQueue();
  14. /**
  15. * 绑定交换机与队列
  16. */
  17. channel.queueBind(queue,EXCHANGE_NAME,"123");
  18. System.out.println("ReceiveLogs01等待接收消息");
  19. //接收回调
  20. DeliverCallback handle=(consumerTag, message)->{
  21. System.out.println("ReceiveLogs01接收成功"+new String(message.getBody(),"UTF-8"));
  22. };
  23. //失败回调
  24. CancelCallback Cancel=(consumerTag)->{
  25. System.out.println("ReceiveLogs01接收失败,序号"+consumerTag);
  26. };
  27. channel.basicConsume(queue,true,handle,Cancel);
  28. }
  29. }

测试代码——消费者2 

  1. public class ReceiveLogs02 {
  2. //交换机名称
  3. public static final String EXCHANGE_NAME = "logs";
  4. public static void main(String[] args) throws Exception {
  5. Channel channel = RabbitMQUtil.getChannel();
  6. //声明一个交换机
  7. channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  8. //声明一个队列 临时队列
  9. /**
  10. * 队列名称是随机的
  11. * 当消费者断开与队列的连接的时候 队列就自动删除
  12. */
  13. String queue = channel.queueDeclare().getQueue();
  14. /**
  15. * 绑定交换机与队列
  16. */
  17. channel.queueBind(queue, "logs", "123");
  18. System.out.println("ReceiveLogs02等待接收消息");
  19. //接收回调
  20. DeliverCallback handle = (consumerTag, message) -> {
  21. System.out.println("ReceiveLogs02接收成功" + new String(message.getBody(), "UTF-8"));
  22. };
  23. //失败回调
  24. CancelCallback Cancel = (consumerTag) -> {
  25. System.out.println("ReceiveLogs02接收失败,序号" + consumerTag);
  26. };
  27. channel.basicConsume(queue, true, handle, Cancel);
  28. }
  29. }

直接交换机direct                 (竞争:如果有多个交换机,消息只会被发送给其中一个)——效率大大优于fanout,但是低于topics

代码与上面类似,仅需修改交换机的类型

主题交换机topics                 (指定:消息会发给对应routingKey的交换机,如果与多个交换机的routingKey符合,多个交换机都会收到消息)——效率最高

指定routingKey时:

*表示一个任意单词

#表示0-无数个单词

各个单词使用.分隔

测试代码同上,仅需修改routingKey与交换机类型

六、死信队列

死信产生的原因:

1.消息被拒绝

关闭自动应答

 channel.basicConsume(NORMAL_QUEUE,false,delivercallback,callback);

 修改成功回调

  1. //成功回调
  2. DeliverCallback delivercallback=(consumerTag,message)->{
  3. String msg = new String(message.getBody(), "UTF-8");
  4. if (msg.equals("info5")){
  5. //拒绝应答,且不放回normal队列
  6. channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
  7. System.out.println("consumer01拒绝接收,消息为"+new String(message.getBody(),"UTF-8")+message.getEnvelope().getExchange());
  8. }else {
  9. //应答
  10. channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
  11. System.out.println("consumer01成功接收,消息为"+new String(message.getBody(),"UTF-8")+message.getEnvelope().getExchange());
  12. }
  13. };

2.ttl时间过期

ttl即 time to live 存活时间,可以通过消费者或者生产者设置,推荐使用生产者设置。

测试代码--生产者

  1. public class Producer {
  2. public static final String NORMAL_EXCHANGE = "normal_exchange";
  3. public static void main(String[] args) throws Exception {
  4. //获取信道
  5. Channel channel = RabbitMQUtil.getChannel();
  6. // 声明存活时间(time to live)为10s
  7. AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().expiration("10000").build();
  8. for (int i = 0; i < 11; i++) {
  9. //发送的消息
  10. String message = "info" + i;
  11. //推送消息
  12. channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", props, message.getBytes());
  13. }
  14. }
  15. }

 测试代码--NORMAL消费者

  1. public class Consumer01 {
  2. //声明交换机名
  3. public static final String NORMAL_EXCHANGE="normal_exchange";
  4. public static final String DEAD_EXCHANGE="dead_exchange";
  5. //声明队列名
  6. public static final String NORMAL_QUEUE="normal_queue";
  7. public static final String DEAD_QUEUE="dead_queue";
  8. public static void main(String[] args) throws Exception{
  9. Channel channel = RabbitMQUtil.getChannel();
  10. //声明死信与普通交换机
  11. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
  12. channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
  13. //声明普通队列
  14. HashMap<String, Object> arguments = new HashMap<>();
  15. //设置ttl时间为10s
  16. arguments.put("x-message-ttl",10000);
  17. //设置死信交换机
  18. arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
  19. //设置死信交换机的routingKey
  20. arguments.put("x-dead-letter-routing-key","lisi");
  21. channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
  22. //声明死信队列
  23. channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
  24. //绑定交换机
  25. channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
  26. channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
  27. //成功回调
  28. DeliverCallback delivercallback=(consumerTag,message)->{
  29. System.out.println("consumer01成功接收,消息为"+new String(message.getBody(),"UTF-8")+message.getEnvelope().getExchange());
  30. };
  31. //失败回调
  32. CancelCallback callback=(consumerTag)->{
  33. System.out.println("接收失败,标记为"+consumerTag);
  34. };
  35. channel.basicConsume(NORMAL_QUEUE,delivercallback,callback);
  36. }
  37. }

测试代码--DEAD消费者

  1. public class DeadConsumer {
  2. public static final String DEAD_QUEUE="dead_queue";
  3. public static void main(String[] args) throws Exception {
  4. Channel channel = RabbitMQUtil.getChannel();
  5. //成功回调
  6. DeliverCallback delivercallback=(consumerTag, message)->{
  7. System.out.println("deadConsumer成功接收,消息为"+new String(message.getBody(),"UTF-8")+message.getEnvelope().getExchange());
  8. };
  9. //失败回调
  10. CancelCallback callback=(consumerTag)->{
  11. System.out.println("接收失败,标记为"+consumerTag);
  12. };
  13. channel.basicConsume(DEAD_QUEUE,delivercallback,callback);
  14. }
  15. }

3.超过队列最大长度

测试代码如上,仅需在设置中增加一个队列长度

  1. //设置队列NORMAL_QUEUE的最大长度为6 注意该处第二个参数不是string类型
  2. arguments.put("x-max-length",6);

注意,在完成上面的测试后,如想修改配置并测试本项,需将之前生成的队列删除,以避免报错

七、延迟队列

1、利用死信队列来实现延迟

新建一个springboot项目,引入如下依赖

  1. <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>
  6. <!--swagger-->
  7. <!-- https://mvnrepository.com/artifact/com.github.xiaoymin/knife4j-spring-boot-starter -->
  8. <dependency>
  9. <groupId>com.github.xiaoymin</groupId>
  10. <artifactId>knife4j-spring-boot-starter</artifactId>
  11. <version>3.0.3</version>
  12. </dependency>
  13. <!--swagger-ui-->
  14. <dependency>
  15. <groupId>io.springfox</groupId>
  16. <artifactId>springfox-swagger-ui</artifactId>
  17. <version>3.0.0</version>
  18. <scope>compile</scope>
  19. </dependency>
  20. <!--json-->
  21. <!-- https://mvnrepository.com/artifact/com.alibaba.fastjson2/fastjson2 -->
  22. <dependency>
  23. <groupId>com.alibaba.fastjson2</groupId>
  24. <artifactId>fastjson2</artifactId>
  25. <version>2.0.23</version>
  26. </dependency>
  27. <!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit-test -->
  28. <dependency>
  29. <groupId>org.springframework.amqp</groupId>
  30. <artifactId>spring-rabbit-test</artifactId>
  31. <version>3.0.8</version>
  32. <scope>test</scope>
  33. </dependency>
  34. <dependency>
  35. <groupId>org.springframework.boot</groupId>
  36. <artifactId>spring-boot-starter-thymeleaf</artifactId>
  37. </dependency>
  38. <dependency>
  39. <groupId>org.springframework.boot</groupId>
  40. <artifactId>spring-boot-starter-web</artifactId>
  41. </dependency>
  42. <dependency>
  43. <groupId>org.springframework.boot</groupId>
  44. <artifactId>spring-boot-devtools</artifactId>
  45. <scope>runtime</scope>
  46. <optional>true</optional>
  47. </dependency>
  48. <dependency>
  49. <groupId>org.projectlombok</groupId>
  50. <artifactId>lombok</artifactId>
  51. <optional>true</optional>
  52. </dependency>
  53. <dependency>
  54. <groupId>org.springframework.boot</groupId>
  55. <artifactId>spring-boot-starter-test</artifactId>
  56. <scope>test</scope>
  57. </dependency>

 编写swagger config

  1. @Configuration
  2. @EnableSwagger2
  3. public class SwaggerConfig {
  4. public Docket webApiConfig(){
  5. return new Docket(DocumentationType.SWAGGER_2)
  6. .groupName("webApi")
  7. .apiInfo(webApiInfo())
  8. .select()
  9. .build();
  10. }
  11. private ApiInfo webApiInfo(){
  12. return new ApiInfoBuilder()
  13. .title("rabbitmq接口文档")
  14. .description("本文档描述了rabbitmq微服务接口定义")
  15. .version("1.0")
  16. .contact(new Contact("WangXu","http://swagger.com","xu.wangcn@foxmail.com"))
  17. .build();
  18. }
  19. }

 声明队列、交换机并绑定

  1. @Component
  2. public class TtlQueueConfig {
  3. //普通交换机名
  4. public static final String X_EXCHANGE="X";
  5. //死信交换机名
  6. public static final String Y_DEAD_LETTER_EXCHANGE="Y";
  7. //普通队列名
  8. public static final String QUEUE_A="QA";
  9. public static final String QUEUE_B="QB";
  10. //死信队列名
  11. public static final String DEAD_LETTER_QUEUE="QD";
  12. //声明xExchange
  13. @Bean("xExchange")
  14. public DirectExchange xExchange(){
  15. return new DirectExchange(X_EXCHANGE);
  16. }
  17. //声明xExchange
  18. @Bean("yExchange")
  19. public DirectExchange yExchange(){
  20. return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
  21. }
  22. //声明队列A
  23. @Bean("queueA")
  24. public Queue aQueue(){
  25. Map<String, Object> arguments=new HashMap<>();
  26. //设置死信交换机
  27. arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
  28. //设置死信routing key
  29. arguments.put("x-dead-letter-routing-key","YD");
  30. //设置消息过期时间
  31. arguments.put("x-message-ttl",10000);
  32. return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
  33. }
  34. //声明队列B
  35. @Bean("queueB")
  36. public Queue bQueue(){
  37. Map<String, Object> arguments=new HashMap<>();
  38. //设置死信交换机
  39. arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
  40. //设置死信routing key
  41. arguments.put("x-dead-letter-routing-key","YD");
  42. //设置消息过期时间
  43. arguments.put("x-message-ttl",40000);
  44. return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
  45. }
  46. //声明队列D
  47. @Bean("queueD")
  48. public Queue dQueue(){
  49. return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
  50. }
  51. //绑定交换机与队列
  52. @Bean
  53. public Binding xExchangeBindingQaQueue(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){
  54. return BindingBuilder.bind(queueA).to(xExchange).with("XA");
  55. }
  56. //绑定交换机与队列
  57. @Bean
  58. public Binding xExchangeBindingQbQueue(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){
  59. return BindingBuilder.bind(queueB).to(xExchange).with("XB");
  60. }
  61. //绑定死信交换机与死信队列
  62. @Bean
  63. public Binding yExchangeBindingQdQueue(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){
  64. return BindingBuilder.bind(queueD).to(yExchange).with("YD");
  65. }
  66. }

 编写死信消费者

  1. @Slf4j
  2. @Component
  3. public class DeadLetterQueueConsumer {
  4. //接收消息
  5. @RabbitListener(queues="QD")
  6. public void receiveD(Message message, Channel channel) throws Exception{
  7. String msg=new String(message.getBody(),"UTF-8");
  8. log.info("当前时间为:{},接收到的消息为{}",new Date().toString(),msg);
  9. }
  10. }

 配置rabbitmq参数

  1. spring.rabbitmq.host=172.20.10.2
  2. spring.rabbitmq.username=admin
  3. spring.rabbitmq.password=root
  4. spring.mvc.pathmatch.matching-strategy=ant_path_matcher

消费者 & controller

  1. @Slf4j
  2. @RestController()
  3. @RequestMapping("/ttl")
  4. public class SendMsgController {
  5. @Autowired
  6. private RabbitTemplate rt;
  7. @GetMapping("/sendMsg/{message}")
  8. public void postMsg(@PathVariable("message") String message) {
  9. log.info("当前时间:{},发送一条消息给两个TTL队列:{}", new Date().toString(), message);
  10. rt.convertAndSend("X", "XA", "消息来自ttl为10s的交换机" + message);
  11. rt.convertAndSend("X", "XB", "消息来自ttl为40s的交换机" + message);
  12. }
  13. }

swagger后台访问地址

Swagger各版本访问地址:

2.9.x 访问地址:
http://ip:port/{context-path}/swagger-ui.html

3.0.x 访问地址:
http://ip:port/{context-path}/swagger-ui/index.html

3.0集成knife4j 访问地址:
http://ip:port/{context-path}/doc.html

注意点:

Channel需要引用import com.rabbitmq.client.Channel;包

测试结果 

使用死信队列的方式实现延迟虽然实现了,但是它有一个问题,该方式只能顺序消费消息,当第一个消息的ttl时间大于后面消息的ttl时间时,延迟将为第一个消息的ttl

 2、利用插件实现延迟

 于是,为了解决这个问题,我们就需要用到rabbitmq的延迟队列插件了

 首先我们先前往github上下载延迟队列的插件

Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub

随后将下载的插件放到linux服务器中

scp "C:\Users\q\Downloads\rabbitmq_delayed_message_exchange-3.8.0.ez" root@172.20.10.2:/root/rabbit

 复制插件到rabbitmq插件目录下

cp rabbitmq_delayed_message_exchange-3.8.0.ez /usr/lib/rabbitmq/lib/rabbimtq_server-3.8.6/plugins

 安装插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

 重启rabbitmq服务

systemctl restart rabbitmq-server

 打开后台查看交换机类型是否多出一种x_delayed-message,如果有那就是成功了

测试通过插件来实现延迟队列

编写配置类

  1. @Configuration
  2. public class DelayedQueueConfig {
  3. //交换机名称
  4. public static final String DELAYED_EXCHANGE_NAME="delayed.exchange";
  5. //队列名称
  6. public static final String DELAYED_QUEUE_NAME="delayed.queue";
  7. //routing key
  8. public static final String DELAYED_ROUTING_KEY_NAME="delayed.routing.key";
  9. //声明队列
  10. @Bean("dQueue")
  11. public Queue getQueue(){
  12. return QueueBuilder.durable(DELAYED_QUEUE_NAME).build();
  13. }
  14. //声明交换机
  15. @Bean("dExchange")
  16. public CustomExchange getExchange(){
  17. HashMap<String, Object> arguments = new HashMap<>();
  18. arguments.put("x-delayed-type","direct");
  19. /**
  20. * 1.交换机名称
  21. * 2.交换机类型
  22. * 3.是否需要持久化
  23. * 4.是否自动删除
  24. * 5.其他参数
  25. */
  26. return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",false,false,arguments);
  27. }
  28. //声明绑定关系
  29. @Bean
  30. public Binding dQueueBindingD(@Qualifier("dQueue")Queue dQueue, @Qualifier("dExchange")CustomExchange dExchange){
  31. return BindingBuilder.bind(dQueue).to(dExchange).with(DELAYED_ROUTING_KEY_NAME).noargs();
  32. }
  33. }

在controller类中编写请求

  1. //编写get请求
  2. @GetMapping("/delayed/{message}/{delayedTime}")
  3. public void testTwo(@PathVariable("message") String message, @PathVariable("delayedTime") Integer delayedTime) {
  4. //控制台打印消息
  5. log.info("{}发送了一个消息:{},delayedTime:{}", new Date(), message,delayedTime);
  6. //指明消息要发送的交换机及routing key,并设置延迟时间
  7. rt.convertAndSend("delayed.exchange", "delayed.routing.key", message,msg->{
  8. msg.getMessageProperties().setDelay(delayedTime);
  9. return msg;
  10. });
  11. }

编写消费者

  1. @Slf4j
  2. @Component
  3. public class DelayedQueueConsumer {
  4. //接收消息
  5. @RabbitListener(queues="delayed.queue")
  6. public void receiveD(Message message, Channel channel) throws Exception{
  7. String msg=new String(message.getBody(),"UTF-8");
  8. log.info("延迟队列当前时间为:{},接收到的消息为{}",new Date().toString(),msg);
  9. }
  10. }

 测试结果

 总结

延时队列在需要延时处理的场景下非常有用,延迟队列可以有效地利用Rabbitmq的特性,如消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次,以及未确认处理的消息不会被丢弃。另外,通过RabbitMQ集群的特性,可以很好的解决单点故障问题,不会因为单个接待你挂掉导致延时队列不可用或者消息丢失。

当然,延时队列还有其它选择,比如java的DelayQueue,利用Redis的zset,利用quartz或者利用kafka的时间轮,这些方式各有特点,看需要适用的场景。

八、发布确认高级内容

在生产环境中由于一些不明原因导致rabbitmq重启,在rabbitmq重启期间生产者消息投递失败导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行rabbitmq的消息可靠投递呢?特别是在这样比较极端的情况,rabbitmq集群不可用的时候,无法投递的消息该如何处理呢?

 1、确认回调接口

编写测试用例:当交换机没有接收到消息时调用confirmCallBack

配置类

  1. //开启日志支持
  2. @Slf4j
  3. //将该类注入容器
  4. @Component
  5. public class ConfirmConfig {
  6. //声明交换机名
  7. public static final String EXCHANGE_NAME="confirm.exchange";
  8. //声明队列名
  9. public static final String QUEUE_NAME="confirm.queue";
  10. //声明routing key
  11. public static final String ROUTING_KEY="key1";
  12. @Bean("confirmExchange")
  13. public DirectExchange confirmExchange(){
  14. //构建交换机
  15. return new DirectExchange(EXCHANGE_NAME,true,false);
  16. }
  17. @Bean("confirmQueue")
  18. public Queue confirmQueue(){
  19. //构建队列
  20. return QueueBuilder.durable(QUEUE_NAME).build();
  21. }
  22. @Bean
  23. public Binding qBindingExchange(@Qualifier("confirmExchange")DirectExchange confirmExchange,
  24. @Qualifier("confirmQueue")Queue confirmQueue){
  25. //绑定队列、交换机、routing key
  26. return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(ROUTING_KEY);
  27. }
  28. }

 生产者

  1. @GetMapping("/confirm/{message}")
  2. public void confirmTest(@PathVariable("message") String message) {
  3. CorrelationData correlationData = new CorrelationData("1");
  4. log.info("当前时间:{},发送一条消息:{}", new Date().toString(), message);
  5. rt.convertAndSend(ConfirmConfig.EXCHANGE_NAME, ConfirmConfig.ROUTING_KEY, "消息:" +
  6. message,correlationData);
  7. }

消费者

  1. @Slf4j
  2. @Component
  3. public class ConfirmConsumer {
  4. @RabbitListener(queues = "confirm.queue")
  5. public void receiveQ(Message message, Channel channel) throws UnsupportedEncodingException {
  6. String msg=new String(message.getBody(),"UTF-8");
  7. log.info("已经收到消息:{},当前时间为:{}",msg,new Date());
  8. }
  9. }

编写confirmCallBack实现类,实现消息不管是否到达交换机都会有回调

  1. @Slf4j
  2. @Component
  3. public class MyCallBack implements RabbitTemplate.ConfirmCallback {
  4. //如果我们实现的是一个类的内部接口,而当我们通过该类调用该接口时无法调到我们写的实现类,
  5. // 所以要先注入该类
  6. @Autowired
  7. private RabbitTemplate rabbitTemplate;
  8. @PostConstruct
  9. public void setConfirmCallback(){
  10. //将该实现类设置为rabbitTemplate的确认回调
  11. rabbitTemplate.setConfirmCallback(this);
  12. }
  13. /**
  14. * @param correlationData correlation data for the callback.
  15. * @param ack true for ack, false for nack
  16. * @param cause An optional cause, for nack, when available, otherwise null.
  17. * 交换机确认回调方法
  18. * 1.发消息 交换机收到了 回调
  19. * 1.1 correlationData 保存回调消息的id及相关信息
  20. * 1.2 交换机收到了消息 true
  21. * 1.3 cause null
  22. *
  23. * 2.发消息 交换机没收到 回调
  24. * 2.1 correlationData 保存回调消息的id及相关信息
  25. * 2.2 交换机收到了消息 false
  26. * 2.3 cause 失败的原因
  27. */
  28. @Override
  29. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  30. if(ack){
  31. log.info("成功应答,消息id为:{}",correlationData.getId());
  32. }else {
  33. log.info("交换机没有收到消息,消息的id为{},失败的原因为:{}",correlationData.getId(),cause);
  34. }
  35. }
  36. }

配置确认类型

  1. #默认是none 禁止发布确认模式 correlated 发布消息成功到交换机后会触发回调方法
  2. #simple 有两种效果,其一与correlated一样会触发回调方法,其二根据返回结果来判定下一步的逻辑
  3. spring.rabbitmq.publisher-confirm-type=correlated

 在生产者处将交换机名写错,测试结果如下:

 2、回退消息

当交换机可以正常接收,而队列无法接收时我们需要让消息回退

首先我们需要在MyCallBack中实现回退接口

  1. @Slf4j
  2. @Component
  3. public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
  4. //如果我们实现的是一个类的内部接口,而当我们通过该类调用该接口时无法调到我们写的实现类,
  5. // 所以要先注入该类
  6. @Autowired
  7. private RabbitTemplate rabbitTemplate;
  8. @PostConstruct
  9. public void setConfirmCallback(){
  10. //将该实现类设置为rabbitTemplate的确认回调
  11. rabbitTemplate.setConfirmCallback(this);
  12. }
  13. @PostConstruct
  14. public void setReturnsCallBack(){
  15. //将该实现类设置为rabbitTemplate的回退方法
  16. rabbitTemplate.setReturnsCallback(this);
  17. }
  18. /**
  19. * @param correlationData correlation data for the callback.
  20. * @param ack true for ack, false for nack
  21. * @param cause An optional cause, for nack, when available, otherwise null.
  22. * 交换机确认回调方法
  23. * 1.发消息 交换机收到了 回调
  24. * 1.1 correlationData 保存回调消息的id及相关信息
  25. * 1.2 交换机收到了消息 true
  26. * 1.3 cause null
  27. *
  28. * 2.发消息 交换机没收到 回调
  29. * 2.1 correlationData 保存回调消息的id及相关信息
  30. * 2.2 交换机收到了消息 false
  31. * 2.3 cause 失败的原因
  32. */
  33. @Override
  34. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  35. if(ack){
  36. log.info("交换机成功应答,消息id为:{}",correlationData.getId());
  37. }else {
  38. log.info("交换机没有收到消息,消息的id为{},失败的原因为:{}",correlationData.getId(),cause);
  39. }
  40. }
  41. //回退方法,可以在消息传递过程中不可达目的地时将消息返回给生产者
  42. //只有消息不可达时,才会回退
  43. @Override
  44. public void returnedMessage(ReturnedMessage returned) {
  45. try {
  46. log.error("消息{},被交换机{}退回,退回原因:{},路由Key:{}",new String(returned.getMessage().getBody(),"UTF-8"),
  47. returned.getExchange(), returned.getReplyText(),returned.getRoutingKey());
  48. } catch (UnsupportedEncodingException e) {
  49. throw new RuntimeException(e);
  50. }
  51. }

 修改配置类,新增

spring.rabbitmq.publisher-returns=true

在生产者中将routing key修改为错误的,并进行测试

3、备份交换机

声明交换机和队列并绑定

  1. //备份交换机
  2. @Bean("backupExchange")
  3. public FanoutExchange backupExchange(){
  4. return new FanoutExchange(BACKUP_EXCHANGE_NAME,true,false);
  5. }
  6. //备份队列
  7. @Bean("backupQueue")
  8. public Queue backupQueue(){
  9. return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
  10. }
  11. //警告队列
  12. @Bean("warningQueue")
  13. public Queue waringQueue(){
  14. return QueueBuilder.durable(WARING_QUEUE_NAME).build();
  15. }
  16. //绑定交换机与队列
  17. @Bean
  18. public Binding backupBindingBackupExchange(@Qualifier("backupQueue")Queue backupQueue,
  19. @Qualifier("backupExchange")FanoutExchange backupExchange ){
  20. return BindingBuilder.bind(backupQueue).to(backupExchange);
  21. }
  22. //绑定交换机与队列
  23. @Bean
  24. public Binding waringBindingBackupExchange(@Qualifier("warningQueue")Queue warningQueue,
  25. @Qualifier("backupExchange")FanoutExchange backupExchange ){
  26. return BindingBuilder.bind(warningQueue).to(backupExchange);
  27. }

 在确认交换机配置备份交换机

  1. @Bean("confirmExchange")
  2. public DirectExchange confirmExchange(){
  3. //构建交换机
  4. return ExchangeBuilder.directExchange(EXCHANGE_NAME).durable(true)
  5. //配置备份交换机
  6. .withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME).build();
  7. }

消费者

  1. @Slf4j
  2. @Component
  3. public class warningConsumer {
  4. @RabbitListener(queues= ConfirmConfig.WARING_QUEUE_NAME)
  5. public void receiveD(Message message, Channel channel) throws UnsupportedEncodingException {
  6. String msg = new String(message.getBody(), "UTF-8");
  7. log.info("warningQueue已收到消息:{},时间为:{}",msg,new Date());
  8. }
  9. }

 生产者

  1. @GetMapping("/confirm/{message}")
  2. public void confirmTest(@PathVariable("message") String message) {
  3. CorrelationData correlationData = new CorrelationData("1");
  4. log.info("当前时间:{},发送一条消息:{}", new Date().toString(), message);
  5. rt.convertAndSend(ConfirmConfig.EXCHANGE_NAME, ConfirmConfig.ROUTING_KEY+"1", "消息:" +
  6. message,correlationData);
  7. }

回退消息与备份交换机同时存在,备份交换机优先级更高

  测试结果:

 九、幂等性

用户对于同一操作发起的一次请求或多次请求的结果时一致的,不会因为多次点击而产生副作用。举个例子,支付,有时候因为网络异常,已经扣了钱,但当用户再次点击按钮,此时会进行第二次扣款,最终用户查询余额变少了,流水记录也变成了两条。在以前的但应用系统中,我们只需要把数据放入事务中即可,发生错误立即回滚,但是再响应客户端的时候也有可能出现网络中断或者异常等等。

消息被重复消费

解决思路

使用全局ID或者写个唯一标识比如时间戳或者UUID或者订单消费者消费MQ中的消息也可利用MQ的该id来判断,或者可按自己的规则生成一个全局唯一的id,每次消费前用该id先判断消息是否已消费过。

消息永远不会被消费多次,业界主流的幂等性有两种操作,唯一id+指纹码机制,利用数据库主键去重,b利用redis的原子性去实现

利用redis执行setnx命令,天然具有幂等性,从而实现不重复消费。

十、优先级队列

十一、惰性队列

消息保存在内存还是磁盘中,正常情况下消息是保存在内存中,而惰性队列消息时保存在磁盘中。

当消费者由于各种各样的原因(比如消费者下线、宕机亦或者由于维护而关闭等)而使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。

在队列声明时可以通过“x-queue-mode”参数来设置队列的模式,取值为“default”和“lazy”。

 十二、rabbitmq集群

1.克隆两台虚拟机并启动
2.修改机器的主机名称

vim /etc/hostname

三台机器分别命名为node1、node2、node3

3.配置各个节点的hosts文件,让各个节点都能互相识别对方

vim /etc/hosts

ip1 node1

ip2 node2

ip3 node3

4.确保各个节点使用的cookie文件使用的是同一个值

在node1上执行远程操作命令

scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/.erlang.cookie

scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/.erlang.cookie

5.启动RabbitMQ服务,顺带启动Erlang虚拟机和RabbitMQ应用服务(在三台节点上分别执行)

rabbitmq-server -detached

6.在克隆的两台虚拟机中执行如下命令:

rabbitmqctl stop_app

(rabbitmqctl stop会将Erlang虚拟机关闭,rabbitmqctl stop_app只关闭RabbitMQ服务)

rabbitmqctl reset

rabbitmqctl join_cluster rabbit@node1/第三台机器也可以执行:rabbitmqctl join_cluster rabbit@node2

rabbitmqctl start_app(只启动应用服务)

7.查看集群状态

rabbitmqctl cluster_status

8.创建集群用户账号

创建账号:rabbitmqctl add_user admin 123

设置角色:rabbitmqctl set_user_tags admin administrator

设置权限:rabbitmqctl set_permissions -p "/" admin "." ".*" ".*"

9.解除集群节点

rabbitmqctl stop_app

rabbitmqctl reset

rabbitmqctl start_app

rabbitmqctl cluster_status

rabbitmqctl forget_cluster_node rabbit@node2(node1机器上执行)

十三、镜像队列

如果RabbitMQ集群中只有一个Broker节点,那么该节点的失效将导致整体服务的临时性不可用,并且也可能会导致消息的丢失。可以将所有消息都设置持久化,并且对应队列的durable属性也设置为true,单消息发送之后和被写入磁盘并执行刷盘动作之间存在一个短暂却会产生问题的时间窗。通过publisherconfirm机制能够确保客户端知道哪些消息已经存入磁盘,尽管如此,一般不希望遇到因单点故障导致的服务不可用。

引入镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他Broker节点之上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性。

打开rabbitmq后台选择Admin-->policies

Haproxy高可用负载均衡

当我们有海量的请求时,如果我们仍然将请求发送到同一个服务器上时,服务器可能会崩溃,而有些服务器可能会因为没有收到相应而空闲,这时我们就需要用到Haproxy来帮助我们均衡的分配请求,从而使用户获得良好的体验,同时也避免服务器因大量请求而崩溃。

Federation Exchange联邦交换机

当我们的用户来自有不同的地区时,遥远的用户可能会因网络延时而体验变差,为了用户能够在不同地区体验到大致相同的体验,我们可以在不同的地区同时部署服务器。

而当我们在多个不同的地点部署服务器后,我们需要解决各个服务器的数据一致问题,这时我们就需要用到联邦交换机。

搭建步骤:

1.需要保证每台节点单独运行

2.哎每台机器上开启federation相关插件

rabbitmq-plugins enable rabbitmq_federtion

rabbitmq-plugins enable rabbitmq_federation_management

在从节点downstream(node2)上创建一个名为“fed_exchange”的交换机并声明队列

在downstream(node2)配置upstream(node1)

admin-->Federation Upstreams

Federation Queue

联邦队列可以额在多个Broker节点(或者集群)之间为单个队列提供均衡负载的功能。一个联邦队列可以连接一个或者多个上游队列(upstream queue),并从这些上游队列中获取消息以满足本地消费者消费消息的需求

搭建步骤

Shovel

首先在服务器中开启shovel支持

rabbitmq plugins enable rabbitmq_shovel

rabbitmq plugins enable rabbitmq_shovel_management

然后打开rabbitmq管理后台admin-->shovel management

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/558326
推荐阅读
相关标签
  

闽ICP备14008679号