当前位置:   article > 正文

RabbitMq的基础及springAmqp的使用

RabbitMq的基础及springAmqp的使用

RabbitMq

官网:RabbitMQ: One broker to queue them all | RabbitMQ

什么是MQ?

mq就是消息队列,消息队列遵循这先入先出原则。一般用来解决应用解耦,异步消息,流量削峰等问题,实现高性能,高可用,可伸缩和最终一致性架构。

rabbitMq的四大核心

image20230424154420240.png

RabbitMq的安装

RabbitMQ是一个开源的遵循 AMQP协议实现的基于 Erlang语言编写,即需要先安装部署Erlang环境再安装RabbitMQ环境。

查看兼容关系:Erlang Version Requirements | RabbitMQ

百度云地址:

链接:百度网盘 请输入提取码 提取码:6666

本篇文章使用版本:3.8.8,liunx7-cenOs7

  1. #在存放位置执行以下指令
  2. rpm -ivh erlang-21.3-1.el7.x86_64.rpm
  3. #安装socat
  4. yum install socat -y
  5. #安装mq
  6. rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm

启动

  1. #开机自动启动
  2. chkconfig rabbitmq-server on
  3. #启动服务
  4. /sbin/service rabbitmq-serve start
  5. #查看启动
  6. /sbin/service rabbitmq-serve status
  7. #停止服务
  8. /sbin/service rabbitmq-serve stop

image20230424154409259.png

坑:执行以上指令无效,重新执行下面指令

  1. systemctl start rabbitmq-server.service #启动
  2. systemctl status rabbitmq-server.service#查看状态

安装可视化界面

  1. #尽量停止服务,在安装
  2. #安装可视化界面
  3. rabbitmq-plugins enable rabbitmq_management

访问地址:http://ip:15672/

如果访问不了,查看防火墙是够关闭 systemctl stop firewalld 关闭防火墙,访问成功后走rabbitmq的基本指令

卸载MQ:

  1. systemctl stop rabbitmq-server
  2. yum list | grep rabbitmq
  3. yum -y remove rabbitmq-server.noarch
  4. yum list | grep erlang
  5. yum -y remove erlang-*
  6. rm -rf /usr/lib64/erlang
  7. rm -rf /var/lib/rabbitmq
  8. rm -rf /usr/local/erlang
  9. rm -rf /usr/local/rabbitmq

docker安装

docker pull rabbitmq:3-management
  1. #运行
  2. docker run \
  3. -e RABBITMQ_DEFAULT_USER=itcast \
  4. -e RABBITMQ_DEFAULT_PASS=123321 \
  5. --name mq \
  6. --hostname mq1 \
  7. -p 15672:15672 \   #网页访问端口
  8. -p 5672:5672 \     #mq连接端口
  9. -d \
  10. rabbitmq:3-management

rabbitMq基本指令

  1. #查看用户
  2. rabbitmqctl list_users
  3. #添加用户
  4. rabbitmqctl add_user admin 123456
  5. #设置角色 (超级管理员)
  6. rabbitmqctl set_user_tags admin administrator
  7. #设置权限
  8. rabbitmqctl set_permissions -p '/' admin '.*' '.*' '.*'

image20230424160438952.png

登录后也可以在此界面添加用户

image20230424160714982.png

对接java(入门)

image20230506093856370.png

创建一个maven工程:

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>com.chen</groupId>
  7. <artifactId>mq</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <properties>
  10. <rabbitmq.version>5.8.0</rabbitmq.version>
  11. <common.version>2.6</common.version>
  12. </properties>
  13. <dependencies>
  14. <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
  15. <dependency>
  16. <groupId>com.rabbitmq</groupId>
  17. <artifactId>amqp-client</artifactId>
  18. <version>${rabbitmq.version}</version>
  19. </dependency>
  20. <dependency>
  21. <groupId>commons-io</groupId>
  22. <artifactId>commons-io</artifactId>
  23. <version>${common.version}</version>
  24. </dependency>
  25. </dependencies>
  26. <build>
  27. <plugins>
  28. <plugin>
  29. <groupId>org.apache.maven.plugins</groupId>
  30. <artifactId>maven-compiler-plugin</artifactId>
  31. <configuration>
  32. <source>8</source>
  33. <target>8</target>
  34. </configuration>
  35. </plugin>
  36. </plugins>
  37. </build>
  38. </project>

生产者:

  1. package com.chen.rabbitmq.one;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. /**
  6. * 生产者
  7. */
  8. public class production {
  9. private static final String MQ_KEY="holle";
  10. public static void main(String[] args)throws Exception {
  11. // 创建rabbitmq的工厂
  12. ConnectionFactory factory = new ConnectionFactory();
  13. // 连接地址ip
  14. factory.setHost("172.17.18.162");
  15. // 用户名
  16. factory.setUsername("admin");
  17. // 密码
  18. factory.setPassword("123456");
  19. // 创建连接
  20. Connection connection = factory.newConnection();
  21. // 创建通道
  22. Channel channel = connection.createChannel();
  23. // 生产队列
  24. // 参数一:队列名称
  25. // 参数二:持久性(默认为false)
  26. // 参数三:该队列是否可以有多个消费者,是否消息共享
  27. // 参数四:是否自动删除
  28. // 参数五:其他参数
  29. channel.queueDeclare(MQ_KEY,true,false,false,null);
  30. /**
  31. * 发送一个消费者
  32. * 1.发送到那个交换机
  33. * 2.路由的key值是哪个 本次是队列的名称
  34. * 3.其他参数
  35. * 4.发送消息的消息体
  36. */
  37. channel.basicPublish("",MQ_KEY,null,"holle word".getBytes());
  38. System.out.println("消息发送成功!");
  39. }
  40. }

测试是否发送成功:

image20230506094013364.png

消费者:

  1. package com.chen.rabbitmq.one;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. public class Consumption {
  5. private static final String MQ_KEY="holle";
  6. public static void main(String[] args) throws Exception {
  7. ConnectionFactory factory = new ConnectionFactory();
  8. factory.setHost("172.17.18.162");
  9. factory.setUsername("admin");
  10. factory.setPassword("123456");
  11. // 创建一个新的连接
  12. Connection connection = factory.newConnection();
  13. Channel channel = connection.createChannel();
  14. /*
  15. 参数:
  16. 1: 消费哪个队列
  17. 2.消费成功之后是否要自动应答, true 带边自动应答 false 手动
  18. 3.消费者未成功的回调
  19. 4.消费者取录成功的回调
  20. */
  21. channel.basicConsume(MQ_KEY, true,(DeliverCallback) (consumerTag, message) -> System.out.println(new String(message.getBody())),
  22. (CancelCallback) (consumerTag)-> System.out.println(consumerTag));
  23. }
  24. }

工作队列:

工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。

image20230506103939702.png

1.线程轮询

类似nginx的负载均衡(轮询),线1一次,线2一次。

image20230506111137320.png

工具类:

  1. package com.chen.rabbitmq.tow.utils;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. public class RabbitUtils {
  6. public static Channel rabbitConnection() throws Exception{
  7. ConnectionFactory factory = new ConnectionFactory();
  8. factory.setHost("172.17.18.162");
  9. factory.setUsername("admin");
  10. factory.setPassword("123456");
  11. // 创建一个新的连接
  12. Connection connection = factory.newConnection();
  13. Channel channel = connection.createChannel();
  14. return channel;
  15. }
  16. }
  17. 生产者:
  18. package com.chen.rabbitmq.tow.test;
  19. import com.chen.rabbitmq.tow.utils.RabbitUtils;
  20. import com.rabbitmq.client.Channel;
  21. import java.util.Scanner;
  22. public class Production {
  23. private final static String MQ_KEY="word";
  24. // 生产者
  25. public static void production() throws Exception{
  26. Channel channel = RabbitUtils.rabbitConnection();
  27. Scanner scanner = new Scanner(System.in);
  28. //生产队列
  29. channel.queueDeclare(MQ_KEY,true,false,false,null);
  30. while (scanner.hasNext()){
  31. String next = scanner.next();
  32. channel.basicPublish("",MQ_KEY,null,next.getBytes());
  33. System.out.println("消息发布成功-> "+next);
  34. }
  35. }
  36. public static void main(String[] args) throws Exception{
  37. production();
  38. }
  39. }
  40. 消费者:
  41. package com.chen.rabbitmq.tow.test;
  42. import com.chen.rabbitmq.tow.utils.RabbitUtils;
  43. import com.rabbitmq.client.CancelCallback;
  44. import com.rabbitmq.client.Channel;
  45. import com.rabbitmq.client.DeliverCallback;
  46. public class Consumption {
  47. private final static String MQ_KEY="word";
  48. // 消费者
  49. public static void consumption() throws Exception{
  50. // 获取连接队列
  51. Channel channel = RabbitUtils.rabbitConnection();
  52. channel.basicConsume(MQ_KEY,true,(DeliverCallback)(consumerTag,message)->{
  53. System.out.println(new String(message.getBody()));
  54. },(CancelCallback)(tag)->{
  55. System.out.println(tag);
  56. System.out.println("中断了");
  57. });
  58. }
  59. public static void main(String[] args) throws Exception{
  60. consumption();
  61. }
  62. }

image20230506111456297.png

idea开启两个线程。

消息应答

1.自动应答

RabbitMQ 是一个广泛使用的开源消息代理,它支持多种消息协议,例如 AMQP、MQTT、STOMP 等。在 RabbitMQ 中,自动应答(Automatic Acknowledgement,Auto-ack)是一种消息确认机制,用于标记消息是否已被成功接收和处理。了解自动应答的概念,对于构建可靠、高效的消息传递系统非常重要。

当消费者接收并处理来自 RabbitMQ 的消息时,通常会使用消息确认(acknowledgements)机制来告知 RabbitMQ 该消息已经成功处理。这样一来,RabbitMQ 就可以确保消息不会意外丢失。然而,这种确认过程可能会导致一定的延迟和额外开销。为了解决这个问题,RabbitMQ 提供了自动应答机制。

在自动应答模式下,消费者接收到消息后,RabbitMQ 会立即将该消息标记为已处理。这意味着消费者不需要显式地发送确认(ack)消息给 RabbitMQ。这种机制可以降低延迟,提高消息传递的速度,但是也存在一定的风险。因为消息一旦被发送出去,RabbitMQ 就认为它已经成功处理,而实际上消费者可能还没有完成对消息的处理。如果消费者在处理消息时发生故障,那么这个消息可能会丢失。

2.手动应答

方法:

Channel.basicAck (用于肯定确认)

RabbitMQ已知道该消息并且成功的处理消息,可以将其丢弃了

Channel.basicNack(用于否定确认)

Channel.basicReject (用于否定确认)

Channel.basicNack 相比少一个参数 不处理该消息了直接拒绝,可以将其丢弃了

Multiple

  1. //源码
  2. public void basicAck(long deliveryTag, boolean multiple) throws IOException {
  3. this.delegate.basicAck(deliveryTag, multiple);
  4. }

multiple 的 true 和 false 代表不同意思:

  1. true 代表批量应答 channel 上未应答的消息

    比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时5-8 的这些还未应答的消息都会被确认收到消息应答

    image20230506145530499.png

    2.false 只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答

    image20230506145536088.png

消息重新入队

为了解决消息丢失问题。

image20230506150713801.png

具体代码:

生产者:

  1. package com.chen.rabbitmq.three;
  2. import com.chen.rabbitmq.tow.utils.RabbitUtils;
  3. import com.rabbitmq.client.Channel;
  4. import java.nio.charset.StandardCharsets;
  5. import java.util.Scanner;
  6. public class Pro {
  7. private static final String MQ_KEY="mqkey";
  8. public static void pro() throws Exception{
  9. Channel channel = RabbitUtils.rabbitConnection();
  10. channel.queueDeclare(MQ_KEY,true,false,false,null);
  11. Scanner scanner = new Scanner(System.in);
  12. while (scanner.hasNext()){
  13. String next = scanner.next();
  14. channel.basicPublish("",MQ_KEY,null,scanner.next().getBytes());
  15. System.out.println("消息发布成功-> "+next);
  16. }
  17. }
  18. public static void main(String[] args) throws Exception {
  19. pro();
  20. }
  21. }

消费者1:

  1. package com.chen.rabbitmq.three;
  2. import com.chen.rabbitmq.tow.utils.RabbitUtils;
  3. import com.rabbitmq.client.CancelCallback;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.DeliverCallback;
  6. //消费者1
  7. public class Word1 {
  8. public static final String MQ_KEY="mqkey";
  9. public static void word() throws Exception{
  10. Channel channel = RabbitUtils.rabbitConnection();
  11. channel.basicConsume(MQ_KEY,false,(DeliverCallback) (consumerTag,message)->{
  12. // 睡眠1s
  13. try {
  14. Thread.sleep(1*1000);
  15. System.out.println("Word1接收到消息->"+new String(message.getBody()));
  16. // 参数一:tag标记 参数二:是否批量
  17. channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. },(CancelCallback) e->{
  22. System.out.println("消息中断"+e);
  23. } );
  24. }
  25. public static void main(String[] args) throws Exception {
  26. word();
  27. }
  28. }

消费者2:

  1. package com.chen.rabbitmq.three;
  2. import com.chen.rabbitmq.tow.utils.RabbitUtils;
  3. import com.rabbitmq.client.CancelCallback;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.DeliverCallback;
  6. //消费者1
  7. public class Word2 {
  8. public static final String MQ_KEY="mqkey";
  9. public static void word() throws Exception{
  10. Channel channel = RabbitUtils.rabbitConnection();
  11. channel.basicConsume(MQ_KEY,false,(DeliverCallback) (consumerTag,message)->{
  12. // 睡眠10s
  13. try {
  14. Thread.sleep(10*1000);
  15. System.out.println("Word2接收到消息->"+new String(message.getBody()));
  16. // 参数一:tag标记 参数二:是否批量
  17. channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. },(CancelCallback) e->{
  22. System.out.println("消息中断"+e);
  23. } );
  24. }
  25. public static void main(String[] args) throws Exception{
  26. word();
  27. }
  28. }

image20230506160401052.png

经测试会发现,消费者1为第一个接收到消息,接下来当生产者在生产出一条消息,应到消费者2接收到消息,但是此时消费者2突然出现宕机,使用了应答机制,消息则会重新打到消费者1;

持久化设置

1.队列持久化

作用:当rabbitmq宕机后,重启队列依然存在

  1. //创建队列时的第二个参数为设置持久化
  2. channel.queueDeclare(MQ_KEY,true,false,false,null);

image20230506161649268.png

2.消息持久化

作用:当rabbitmq宕机了重新启动,发送的消息依然存在。

下面的方法不是绝对的能保证消息的持久化

  1. //生产者
  2. private static final String MQ_KEY="mqkey";
  3. public static void pro() throws Exception{
  4. Channel channel = RabbitUtils.rabbitConnection();
  5. channel.queueDeclare(MQ_KEY,true,false,false,null);
  6. Scanner scanner = new Scanner(System.in);
  7. while (scanner.hasNext()){
  8. String next = scanner.next();
  9. //MessageProperties.PERSISTENT_TEXT_PLAIN 消息持久化
  10. channel.basicPublish("",MQ_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,scanner.next().getBytes());
  11. System.out.println("消息发布成功-> "+next);
  12. }
  13. }

3.发布确认

完成以上两步还不足以持久化,要把发布确认加上。

  1. //默认是不开启的
  2. Channel channel = RabbitUtils.rabbitConnection();
  3. channel.confirmSelect();//开启发布确认
发布确认的策略:

1.单个确认发布

这个发布确认是同步的,需等待确认一次在发布下一次,一手交钱一手交货原则

缺点:发布速度特别慢

  1. //单个确认
  2. public static void one() throws Exception{
  3. Channel channel = RabbitUtils.rabbitConnection();
  4. //开启发布确认
  5. channel.confirmSelect();
  6. String uuid = UUID.randomUUID().toString();
  7. //创建队列
  8. channel.queueDeclare(uuid,true,false,false,null);
  9. //开始时间
  10. long begin = System.currentTimeMillis();
  11. for (Integer i = 0; i < COUNT; i++) {
  12. String message = i + "";
  13. channel.basicPublish("",uuid,null,message.getBytes());
  14. //发布确认
  15. boolean flag = channel.waitForConfirms();
  16. if(flag){
  17. System.out.println("消息确认成功!");
  18. }
  19. }
  20. long last = System.currentTimeMillis();
  21. System.out.println("耗时:"+(last-begin));
  22. }

2.批量确认发布

发布速度相对单个发布确认要快,但是当其中一条消息出现异常,将无法查找到那个消息丢失 。

  1. //批量
  2. public static void batch() throws Exception{
  3. Channel channel = RabbitUtils.rabbitConnection();
  4. String uuid = UUID.randomUUID().toString();
  5. //开启消息确认
  6. channel.confirmSelect();
  7. //创建队列
  8. channel.queueDeclare(uuid,true,false,false,null);
  9. //这个这个变量用记录发布值
  10. Integer messageCount=100;
  11. Integer record =0;
  12. //开始时间
  13. long begin = System.currentTimeMillis();
  14. for (Integer i = 0; i < COUNT; i++) {
  15. record++;
  16. String message=i+"";
  17. //发布消息
  18. channel.basicPublish("",uuid,null,message.getBytes());
  19. if(messageCount.equals(record)){
  20. channel.waitForConfirms();
  21. record=0;
  22. }
  23. }
  24. long last = System.currentTimeMillis();
  25. System.out.println("耗时"+(last-begin));
  26. }

3.异步确认发布(推荐使用)

异步确认虽然比上的两个代码复杂,但同时也解决了上面两种方式遗留下来的问题。

image20230511104153138.png

  1. public static void asyn() throws Exception{
  2. Channel channel = RabbitUtils.rabbitConnection();
  3. //开启发布确认
  4. channel.confirmSelect();
  5. String uuid = UUID.randomUUID().toString();
  6. //创建队列
  7. channel.queueDeclare(uuid,true,false,false,null);
  8. //开始时间
  9. long begin = System.currentTimeMillis();
  10. // 创建一个线程的ListMap用于记录 ----》处理异步未确认的消息
  11. ConcurrentSkipListMap<Long, String> map = new ConcurrentSkipListMap<>();
  12. // 监听消息
  13. channel.addConfirmListener((deliveryTag, multiple)->{
  14. if(multiple){
  15. ConcurrentNavigableMap<Long, String> longStringConcurrentNavigableMap =
  16. map.headMap(deliveryTag);
  17. longStringConcurrentNavigableMap.clear();
  18. }else{
  19. map.remove(deliveryTag);
  20. }
  21. System.out.println("确认消息:"+deliveryTag);
  22. },(deliveryTag, multiple)->{
  23. String message = map.get(deliveryTag);
  24. System.out.println("发送失败的数据是:"+message+"未确认消息:"+deliveryTag+"-----失败");
  25. });
  26. for (Integer i = 0; i < COUNT; i++) {
  27. String message=""+i;
  28. channel.basicPublish("",uuid,null,message.getBytes());
  29. //获取信道的标识,存入消息
  30. map.put(channel.getNextPublishSeqNo(),message);
  31. }
  32. long last = System.currentTimeMillis();
  33. System.out.println("耗时:"+(last-begin));
  34. }

不公平分发原则(能者多劳原则)

在上面中的所有例子都是尊寻这轮询的规则去执行的,问题:当其中的一台服务响应特别慢时就会影响到整体的效率。

channel.basicQos(1);

  1. //消费者
  2. public static void word() throws Exception{
  3. Channel channel = RabbitUtils.rabbitConnection();
  4. //设置不公平分发
  5. channel.basicQos(1);
  6. channel.basicConsume(MQ_KEY,false,(DeliverCallback) (consumerTag,message)->{
  7. try {
  8. //模拟虚拟机延迟
  9. Thread.sleep(1*1000);
  10. System.out.println("Word2接收到消息->"+new String(message.getBody()));
  11. channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. },(CancelCallback) e->{
  16. System.out.println("消息中断"+e);
  17. } );
  18. }

也可以用来设置预期值!

  1. //消费者1
  2. public static void word2() throws Exception{
  3. Channel channel = RabbitUtils.rabbitConnection();
  4. //设置预期值
  5. channel.basicQos(3);
  6. channel.basicConsume(MQ_KEY,false,(DeliverCallback) (consumerTag,message)->{
  7. try {
  8. //模拟虚拟机延迟
  9. Thread.sleep(1*1000);
  10. System.out.println("Word2接收到消息->"+new String(message.getBody()));
  11. channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. },(CancelCallback) e->{
  16. System.out.println("消息中断"+e);
  17. } );
  18. }
  19. //消费者2
  20. public static void word2() throws Exception{
  21. Channel channel = RabbitUtils.rabbitConnection();
  22. //设置预期值
  23. channel.basicQos(5);
  24. channel.basicConsume(MQ_KEY,false,(DeliverCallback) (consumerTag,message)->{
  25. try {
  26. //模拟虚拟机延迟
  27. Thread.sleep(10*1000);
  28. System.out.println("Word2接收到消息->"+new String(message.getBody()));
  29. channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. }
  33. },(CancelCallback) e->{
  34. System.out.println("消息中断"+e);
  35. } );
  36. }

交换机

在RabbitMQ中,生产者发送消息不会直接将消息投递到队列中,而是先将消息投递到交换机中, 在由交换机转发到具体的队列, 队列再将消息以推送或者拉取方式给消费者进行消费

绑定(bindings)

与交换机产生关系,并且能有routekey控制发送消息给哪个队列。

fanout交换机(扇形)

扇形交换机是最基本的交换机类型,它所能做的事清非常简单广播消息。扇形交换机会把能接收到的消息全部发送给绑定在自己身上的队列。因为广播不需要'思考”,所以扇形交换机处理消息的速度也是所有的交换机类型里面最快的。

  1. //消费者
  2. public class Word {
  3. // 交换机名称
  4. private static String EXCHANGE_NAME="logs";
  5. public static void main(String[] args) throws Exception {
  6. Channel channel = RabbitUtils.rabbitConnection();
  7. // 声明一个交换机
  8. channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
  9. // 声明一个队列 临时队列
  10. String queue = channel.queueDeclare().getQueue();
  11. // 绑定交换机与队列
  12. channel.queueBind(queue,EXCHANGE_NAME,"");
  13. System.out.println("等待消息~");
  14. //消费者取消消息时回调接口
  15. channel.basicConsume(queue,true, (consumerTag,message)->{
  16. System.out.println("word1控制台打印接收消息:"+new String(message.getBody(),"UTF-8"));
  17. },cancelCallback->{});
  18. }
  19. }
  20. public class Word2 {
  21. // 交换机名称
  22. private static String EXCHANGE_NAME="logs";
  23. public static void main(String[] args) throws Exception {
  24. Channel channel = RabbitUtils.rabbitConnection();
  25. // 声明一个交换机
  26. channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
  27. // 声明一个队列 临时队列
  28. String queue = channel.queueDeclare().getQueue();
  29. // 绑定交换机与队列
  30. channel.queueBind(queue,EXCHANGE_NAME,"");
  31. System.out.println("等待消息~");
  32. //消费者取消消息时回调接口
  33. channel.basicConsume(queue,true, (consumerTag,message)->{
  34. System.out.println("word2控制台打印接收消息:"+new String(message.getBody(),"UTF-8"));
  35. },cancelCallback->{});
  36. }
  37. }
  38. //生产者
  39. public class send {
  40. // 交换机名称
  41. private static String EXCHANGE_NAME="logs";
  42. public static void main(String[] args) throws Exception {
  43. Channel channel = RabbitUtils.rabbitConnection();
  44. Scanner scanner = new Scanner(System.in);
  45. while (scanner.hasNext()){
  46. String next = scanner.next();
  47. channel.basicPublish(EXCHANGE_NAME,"",null,next.getBytes("UTF-8"));
  48. System.out.println("生产者发送消息:"+next);
  49. }
  50. }
  51. }

直连交换机: Direct exchange

直连交换机的路由算法非常简单: 将消息推送到binding key与该消息的routing key相同的队列。

代码几乎类型fanout交换机,只需要指定routerkey即可。

主题交换机: Topic exchange

发送到主题交换机的 消息不能有任意的 routing key, 必须是由点号分开的一串单词,这些单词可以是任意的,但通常是与消息相关的一些特征。

如以下是几个有效的routing key:

"stock.usd.nyse", "nyse.vmw", "quick.orange.rabb 代", routing key的单词可以 有很多,最大限制是255 bytes。

Topic 交换机的 逻辑与 direct 交换机有点 相似 使用特定路由键发送的消息 将被发送到所有使用匹配绑定键绑定的队列 ,然而 ,绑定键有两个特殊的情况:

*表示匹配任意一个单词

#表示匹配任意—个或多个单词

image20230525104709697.png

比如上图:

发送routerkey为:ws.orange.rabbit那么对应的就是Q1,Q2

发送routerkey为:lazy.orange.elephant那么对应的就是Q1,Q2

  1. //消费者
  2. public class word1 {
  3. private static final String EXCHANGE_NAME="topic_logs";
  4. private static final String QUEUE_NAME="Q1";
  5. public static void main(String[] args) throws Exception {
  6. Channel channel = RabbitUtils.rabbitConnection();
  7. // 创建交换机
  8. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
  9. // 创建队列
  10. channel.queueDeclare(QUEUE_NAME,true,true,false,null);
  11. // 绑定队列
  12. channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.orange.*");
  13. // 接收消息
  14. channel.basicConsume(QUEUE_NAME,true,(consumerTag,message)->{
  15. System.out.println("接收到的消息:"+new String(message.getBody()));
  16. },cancelCallback->{});
  17. System.out.println("等下消息~");
  18. }
  19. }
  20. public class word2 {
  21. private static final String EXCHANGE_NAME="topic_logs";
  22. private static final String QUEUE_NAME="Q2";
  23. public static void main(String[] args) throws Exception {
  24. Channel channel = RabbitUtils.rabbitConnection();
  25. // 创建交换机
  26. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
  27. // 创建队列
  28. channel.queueDeclare(QUEUE_NAME,true,true,false,null);
  29. // 绑定队列
  30. channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.*.rabbit");
  31. channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"lazy.#");
  32. // 接收消息
  33. channel.basicConsume(QUEUE_NAME,true,(consumerTag,message)->{
  34. System.out.println("接收到的消息:"+new String(message.getBody()));
  35. },cancelCallback->{});
  36. System.out.println("等下消息~");
  37. }
  38. }
  39. //生产者
  40. public class send {
  41. private static final String EXCHANGE_NAME="topic_logs";
  42. public static void main(String[] args) throws Exception{
  43. Channel channel = RabbitUtils.rabbitConnection();
  44. Scanner scanner = new Scanner(System.in);
  45. while (true){
  46. System.out.println("请输入routerkey:");
  47. String key = scanner.next();
  48. System.out.println("请输入消息内容:");
  49. String message = scanner.next();
  50. channel.basicPublish(EXCHANGE_NAME,key,null,message.getBytes());
  51. }
  52. }
  53. }

image20230525112943356.png

image20230525112959575.png

死信队列

顾名思义:无法被消费的消息,一般来说,producer将消息投递broker或者直接到queue里了,consumer(消费者)从queue取出消息进行消费,但某些时间由特定原因导致queue中的某些消息无法被消费,这样如果没有后续的处理,就变成了死信。

应用场景:为了确保订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息被消息时发生了异常,这是就将消息存到死信中,还比如说:用户商城下单成功,并且点击支付后在指定时间支付时自动失效。

image20230525143950674.png

消息TTL过期时间测试:

  1. //生产者
  2. public class send {
  3. private static final String NORMAL_EXCHANGE="normal_exchange";
  4. public static final String NORMAL_QUEUE="normal_queue";
  5. public static void main(String[] args) throws Exception {
  6. Channel channel = RabbitUtils.rabbitConnection();
  7. // 设置死信时间
  8. AMQP.BasicProperties basicProperties =
  9. new AMQP.BasicProperties().builder()
  10. .expiration("10000").build();
  11. for (int i = 0; i < 11; i++) {
  12. String msg="info"+i;
  13. channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",basicProperties,msg.getBytes());
  14. }
  15. }
  16. }
  17. //消费者1
  18. public class C1 {
  19. private static final String NORMAL_EXCHANGE="normal_exchange";
  20. private static final String DEAD_EXCHANGE="dead_exchange";
  21. public static final String NORMAL_QUEUE="normal_queue";
  22. public static final String DEAD_QUEUE="dead_queue";
  23. public static void main(String[] args) throws Exception {
  24. Channel channel = RabbitUtils.rabbitConnection();
  25. // 创建c1交换机
  26. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
  27. channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.TOPIC);
  28. // 声明普通队列
  29. HashMap<String, Object> map = new HashMap<>();
  30. // 设置过期时间 10s 单位ms 这里有消费整去做控制
  31. // map.put("x-message-ttl",100000);
  32. // 正常队列设置死信交换机
  33. map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
  34. // 设置死信的routerKey
  35. map.put("x-dead-letter-routing-key","lisi");
  36. // 创建普通队列
  37. channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
  38. //创建死信队列
  39. channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
  40. // 绑定
  41. channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
  42. channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
  43. channel.basicConsume(NORMAL_QUEUE,true,(consumerTag, message) -> {
  44. System.out.println("C1消息为:"+message.getBody());
  45. },cancelCallback->{
  46. });
  47. }
  48. }
  49. //消费者2
  50. public class C2 {
  51. public static final String DEAD_QUEUE="dead_queue";
  52. private static final String DEAD_EXCHANGE="dead_exchange";
  53. public static void main(String[] args) throws Exception {
  54. Channel channel = RabbitUtils.rabbitConnection();
  55. channel.basicConsume(DEAD_QUEUE,true,(consumerTag,message)->{
  56. System.out.println("消息为:"+new String(message.getBody()));
  57. },cancelCallback->{});
  58. }
  59. }

正常队列长度的限制:

根据c1做修改,测试报错先删除原来的队列与交换机

//设置正常队列长度的限制
map.put("x-max-length",6);

拒接消息:

添加手动应答拒接。

  1. public class C1 {
  2. private static final String NORMAL_EXCHANGE="normal_exchange";
  3. private static final String DEAD_EXCHANGE="dead_exchange";
  4. public static final String NORMAL_QUEUE="normal_queue";
  5. public static final String DEAD_QUEUE="dead_queue";
  6. public static void main(String[] args) throws Exception {
  7. Channel channel = RabbitUtils.rabbitConnection();
  8. // 创建c1交换机
  9. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
  10. channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.TOPIC);
  11. // 声明普通队列
  12. HashMap<String, Object> map = new HashMap<>();
  13. // 设置过期时间 10s 单位ms
  14. // map.put("x-message-ttl",100000);
  15. // 正常队列设置死信交换机
  16. map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
  17. // 设置死信的routerKey
  18. map.put("x-dead-letter-routing-key","lisi");
  19. // 设置正常队列长度的限制
  20. // map.put("x-max-length",6);
  21. // 创建普通队列
  22. channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
  23. //创建死信队列
  24. channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
  25. // 绑定
  26. channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
  27. channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
  28. channel.basicConsume(NORMAL_QUEUE,false,(consumerTag, message) -> {
  29. String msg = new String(message.getBody());
  30. System.out.println("C1消息为:"+msg);
  31. // 拒接对应消息
  32. if(msg.equals("info2")){
  33. // deliveryTag
  34. channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
  35. }else{
  36. channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
  37. }
  38. },cancelCallback->{
  39. });
  40. }
  41. }

SpringAMQP

官网地址:Spring AMQP

Spring AMQP 是 Spring 框架中的一个模块,它提供了基于 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)标准的抽象层,用于简化在 Spring 应用程序中使用消息队列的过程。Spring AMQP 不仅简化了与消息代理(如 RabbitMQ)的集成,还提供了一套高度可配置的模板类来生产、消费消息,并管理AMQP基础设施组件,如队(Queue)、交换机(Exchange)和绑定(Binding)。

使用

  1. <!--AMQP依赖,包含RabbitMQ-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>

生产者

  1. logging:
  2. pattern:
  3. dateformat: MM-dd HH:mm:ss:SSS
  4. spring:
  5. rabbitmq:
  6. host: 38.6.217.70
  7. port: 5672
  8. username: itcast
  9. password: 123321
  10. virtual-host: /
  1. package cn.itcast.mq.spring;
  2. import org.junit.Test;
  3. import org.junit.runner.RunWith;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.boot.test.context.SpringBootTest;
  6. import org.springframework.test.context.junit4.SpringRunner;
  7. import javax.annotation.Resource;
  8. @RunWith(SpringRunner.class)
  9. @SpringBootTest
  10. public class SpringAmqpTest {
  11. @Resource
  12. private RabbitTemplate rabbitTemplate;
  13. @Test
  14. public void testSendMessage2SimpleQueue() throws InterruptedException {
  15. // 1.发送消息
  16. String message = "Hello, Spring Amqp!";
  17. rabbitTemplate.convertAndSend("simple.queue", message);
  18. }
  19. }

 

消费者

  1. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2. import org.springframework.stereotype.Component;
  3. //监听机制
  4. @Component
  5. public class SpringRabbitListener {
  6. @RabbitListener(queues = "simple.queue")
  7. public void listenSimpleQueueMessage(String msg) {
  8. System.out.println("spring接收到的消息是:" + msg);
  9. }
  10. }

预取限制

案例:将50条消息在一秒内分类交给两个消费者消费。

  1. //生成者
  2. @Test
  3. public void testSendWordSimpleQueue() throws InterruptedException {
  4. // 1.发送消息
  5. String key ="simple.queue";
  6. String message = "Hello, Spring Amqp____";
  7. for (int i = 0; i < 49; i++) {
  8. rabbitTemplate.convertAndSend(key, message+i);
  9. Thread.sleep(20);
  10. }
  11. }
  12. //消费者
  13. @Component
  14. public class SpringRabbitListener {
  15. @RabbitListener(queues = "simple.queue")
  16. public void listenSimpleQueueMessage(String msg) throws InterruptedException {
  17. System.out.println("spring接收到的消息是:" + msg+"___"+ LocalDateTime.now());
  18. Thread.sleep(20);
  19. }
  20. @RabbitListener(queues = "simple.queue")
  21. public void listenFanoutQueue1(String msg) throws InterruptedException {
  22. System.err.println("FanoutQueue1接收到的消息是:" + msg+"___"+ LocalDateTime.now());
  23. Thread.sleep(200); //模拟性能
  24. }
  25. }

通过执行结果我们可以看出listenFanoutQueue1这个监听器执行的是奇数,而listenSimpleQueueMessage则是偶数。且时间超出了1秒。为什么呢?

因为在生产者发送到队列中时,消费者会预取消息,在默认情况下进行平分机制,在上面代码中我们可以看到我们使用了线程睡眠的方式模拟了性能,在平分的情况下,睡眠200的执行了25条,所以导致了超出了1s。 如何调整呢?

  1. logging:
  2. pattern:
  3. dateformat: MM-dd HH:mm:ss:SSS
  4. spring:
  5. rabbitmq:
  6. host: 38.6.217.70
  7. port: 5672
  8. username: itcast
  9. password: 123321
  10. virtual-host: /
  11. listener: #设置预取
  12. simple:
  13. prefetch: 1 #每次只取一条
  14. #这段配置的作用是在使用 RabbitMQ 的时候,配置消费者监听器的简单模式,并设置消息预取值为 1。这意味着每次只会从队列中取出一条消息进行处理,处理完后再去取下一条消息。这种方式可以保证消息的顺序处理。

发布与订阅

 

fanoutExchange

这种交换机需要进行绑定对应的队列,绑定对应的队列后,生产者将消息推送给交换机,交换机会将消息分别都发给绑定的消息队列。

实现

  1. //消费者配置
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.FanoutExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. @Configuration
  9. public class FanoutExchangeConfig {
  10. // 创建队列1 fanout.queue1
  11. @Bean
  12. public Queue queue1(){
  13. return new Queue("fanout.queue1");
  14. }
  15. // 创建交换机 fanoutExchange
  16. @Bean
  17. public FanoutExchange fanoutExchange(){
  18. return new FanoutExchange("fanoutExchange");
  19. }
  20. // 队列1绑定交换机
  21. @Bean
  22. public Binding bindingExchange1(){
  23. return BindingBuilder.bind(queue1()).to(fanoutExchange());
  24. }
  25. // 创建队列1 fanout.queue2
  26. @Bean
  27. public Queue queue2(){
  28. return new Queue("fanout.queue2");
  29. }
  30. // 队列2绑定交换机
  31. @Bean
  32. public Binding bindingExchange2(){
  33. return BindingBuilder.bind(queue2()).to(fanoutExchange());
  34. }
  35. }

 

消费者

  1. @Component
  2. public class SpringRabbitListener {
  3. @RabbitListener(queues = "fanout.queue1")
  4. public void listenFanoutQueue1QueueMessage(String msg) throws InterruptedException {
  5. System.out.println("fanout.queue1接收到的消息是:" + msg+"___"+ LocalDateTime.now());
  6. }
  7. @RabbitListener(queues = "fanout.queue2")
  8. public void listenFanoutQueue2QueueMessage(String msg) throws InterruptedException {
  9. System.out.println("fanout.queue2接收到的消息是:" + msg+"___"+ LocalDateTime.now());
  10. }
  11. }

生产者

  1. @Test
  2. public void testSendMessageFanoutQueue() {
  3. // 1.发送消息
  4. String message = "Hello, testSendMessageFanoutQueue !";
  5. // 交换机名称
  6. String exchange = "fanoutExchange";
  7. rabbitTemplate.convertAndSend(exchange,"",message);
  8. }
DirectExchange

这种交换机需要指定一个key进行发送,通过可以区别发送到那个队列,同时这些队列也可以绑定相同的key,那么也就是实现了fanout的效果。 

实现

  1. //消费者
  2. @Component
  3. public class DirectExchangeListener {
  4. // 可以通过@bena的方式进注入,这里我们采用@RabbitListenner的方式
  5. @RabbitListener(
  6. bindings = @QueueBinding(
  7. value = @Queue(name = "direct.queue1"),//绑定的队列
  8. exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT),//绑定的交换机
  9. key = {"red", "blue"} //绑定的key
  10. )
  11. )
  12. public void listenDirectQueue1(String msg) throws InterruptedException {
  13. System.out.println("listenDirectQueue1接收到的消息是:" + msg);
  14. }
  15. @RabbitListener(
  16. bindings = @QueueBinding(
  17. value = @Queue(name = "direct.queue2"),//绑定的队列
  18. exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT),//绑定的交换机
  19. key = {"red", "yellow"} //绑定的key
  20. )
  21. )
  22. public void listenDirectQueue2(String msg) throws InterruptedException {
  23. System.out.println("listenDirectQueue2接收到的消息是:" + msg);
  24. }
  25. }
  26. //生产者
  27. @Test
  28. public void testSendMessageDirectQueue() {
  29. String routingKey = "yellow";
  30. // 1.发送消息
  31. String message = "Hello, testSendMessageFanoutQueue !"+"__"+routingKey;
  32. // 交换机名称
  33. String exchange = "direct.exchange";
  34. rabbitTemplate.convertAndSend(exchange,routingKey,message);
  35. }

 

TopicExchange

这种交换机其实和direct类型的交换机差不错,只不过它是使用通配符的方式。

使用

  1. //消费者
  2. @Component
  3. public class TopicExchangeListener {
  4. @RabbitListener(bindings = @QueueBinding(
  5. value = @Queue(name = "topic.queue1"),
  6. exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
  7. key = {"china.#"}
  8. ))
  9. public void listenTopicQueue1(String msg) throws InterruptedException {
  10. System.out.println("topic.queue1接收到消息:" + msg);
  11. }
  12. @RabbitListener(bindings = @QueueBinding(
  13. value = @Queue(name = "topic.queue2"),
  14. exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
  15. key = {"#.news"}
  16. ))
  17. public void listenTopicQueue2(String msg) throws InterruptedException {
  18. System.out.println("topic.queue2接收到消息:" + msg);
  19. }
  20. }
  21. //生产者
  22. @Test
  23. public void testSendMessageTopicQueue() {
  24. String routingKey = "news";
  25. // 1.发送消息
  26. String message = "Hello, testSendMessageTopicQueue !"+"__"+routingKey;
  27. // 交换机名称
  28. String exchange = "topic.exchange";
  29. rabbitTemplate.convertAndSend(exchange,routingKey,message);
  30. }

消息转换器

例子:

  1. //我们声明一个objQueue
  2. @Bean
  3. public Queue objQueue(){
  4. return new Queue("obj.queue");
  5. }
  6. //发送消息
  7. @Test
  8. public void testSendMessageobjQueue() {
  9. Map<String, Object> map = new HashMap<>();
  10. map.put("name","test");
  11. map.put("age",18);
  12. rabbitTemplate.convertAndSend("obj.queue",map);
  13. }

我们重rabbitmq的ui界面中我们可以发现消息是基于JDK完成的序列化。

缺点:这样不能很直接的看出消息的结果,并且占用大量内存,所以下面我们使用jdckson进行json序列化。

发送者

依赖:

  1. <dependency>
  2. <groupId>com.fasterxml.jackson.dataformat</groupId>
  3. <artifactId>jackson-dataformat-xml</artifactId>
  4. </dependency>

配置bean

  1. //生产者配置
  2. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
  3. import org.springframework.amqp.support.converter.MessageConverter;
  4. @Bean
  5. public MessageConverter messageConverter() {
  6. return new Jackson2JsonMessageConverter();
  7. }

 

消费者

  1. <dependency>
  2. <groupId>com.fasterxml.jackson.dataformat</groupId>
  3. <artifactId>jackson-dataformat-xml</artifactId>
  4. </dependency>
  5. //销售者配置
  6. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
  7. import org.springframework.amqp.support.converter.MessageConverter;
  8. @Bean
  9. public MessageConverter messageConverter() {
  10. return new Jackson2JsonMessageConverter();
  11. }
  12. @RabbitListener(queues = "obj.queue")
  13. public void listenObjQueueMessage( Map<String, Object> msg) throws InterruptedException {
  14. System.out.println("obj.queue接收到的消息是:" + msg);
  15. }

后续会更新使用MQ做的具体案例:秒杀、订单业务处理等。 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/酷酷是懒虫/article/detail/781781
推荐阅读
相关标签
  

闽ICP备14008679号