赞
踩
官网:RabbitMQ: One broker to queue them all | RabbitMQ
mq就是消息队列,消息队列遵循这先入先出
原则。一般用来解决应用解耦,异步消息,流量削峰
等问题,实现高性能,高可用,可伸缩和最终一致性架构。
rabbitMq的四大核心
RabbitMQ是一个开源的遵循 AMQP协议实现的基于 Erlang语言编写,即需要先安装部署Erlang环境再安装RabbitMQ环境。
查看兼容关系:Erlang Version Requirements | RabbitMQ
百度云地址:
链接:百度网盘 请输入提取码 提取码:6666
本篇文章使用版本:3.8.8,liunx7-cenOs7
- #在存放位置执行以下指令
- rpm -ivh erlang-21.3-1.el7.x86_64.rpm
- #安装socat
- yum install socat -y
- #安装mq
- rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
启动
- #开机自动启动
- chkconfig rabbitmq-server on
- #启动服务
- /sbin/service rabbitmq-serve start
- #查看启动
- /sbin/service rabbitmq-serve status
- #停止服务
- /sbin/service rabbitmq-serve stop
坑:执行以上指令无效,重新执行下面指令
- systemctl start rabbitmq-server.service #启动
- systemctl status rabbitmq-server.service#查看状态
安装可视化界面
- #尽量停止服务,在安装
- #安装可视化界面
- rabbitmq-plugins enable rabbitmq_management
访问地址:http://ip:15672/
如果访问不了,查看防火墙是够关闭
systemctl stop firewalld 关闭防火墙,访问成功后走rabbitmq的基本指令
卸载MQ:
- systemctl stop rabbitmq-server
- yum list | grep rabbitmq
- yum -y remove rabbitmq-server.noarch
- yum list | grep erlang
- yum -y remove erlang-*
- rm -rf /usr/lib64/erlang
- rm -rf /var/lib/rabbitmq
- rm -rf /usr/local/erlang
- rm -rf /usr/local/rabbitmq
docker pull rabbitmq:3-management
- #运行
- docker run \
- -e RABBITMQ_DEFAULT_USER=itcast \
- -e RABBITMQ_DEFAULT_PASS=123321 \
- --name mq \
- --hostname mq1 \
- -p 15672:15672 \ #网页访问端口
- -p 5672:5672 \ #mq连接端口
- -d \
- rabbitmq:3-management
- #查看用户
- rabbitmqctl list_users
- #添加用户
- rabbitmqctl add_user admin 123456
- #设置角色 (超级管理员)
- rabbitmqctl set_user_tags admin administrator
- #设置权限
- rabbitmqctl set_permissions -p '/' admin '.*' '.*' '.*'
登录后也可以在此界面添加用户
创建一个maven工程:
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>com.chen</groupId>
- <artifactId>mq</artifactId>
- <version>1.0-SNAPSHOT</version>
-
- <properties>
- <rabbitmq.version>5.8.0</rabbitmq.version>
- <common.version>2.6</common.version>
- </properties>
- <dependencies>
- <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>${rabbitmq.version}</version>
- </dependency>
-
- <dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- <version>${common.version}</version>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>8</source>
- <target>8</target>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </project>
- package com.chen.rabbitmq.one;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- /**
- * 生产者
- */
- public class production {
-
-
- private static final String MQ_KEY="holle";
-
-
- public static void main(String[] args)throws Exception {
- // 创建rabbitmq的工厂
- ConnectionFactory factory = new ConnectionFactory();
- // 连接地址ip
- factory.setHost("172.17.18.162");
- // 用户名
- factory.setUsername("admin");
- // 密码
- factory.setPassword("123456");
-
- // 创建连接
- Connection connection = factory.newConnection();
- // 创建通道
- Channel channel = connection.createChannel();
- // 生产队列
- // 参数一:队列名称
- // 参数二:持久性(默认为false)
- // 参数三:该队列是否可以有多个消费者,是否消息共享
- // 参数四:是否自动删除
- // 参数五:其他参数
- channel.queueDeclare(MQ_KEY,true,false,false,null);
-
- /**
- * 发送一个消费者
- * 1.发送到那个交换机
- * 2.路由的key值是哪个 本次是队列的名称
- * 3.其他参数
- * 4.发送消息的消息体
- */
- channel.basicPublish("",MQ_KEY,null,"holle word".getBytes());
- System.out.println("消息发送成功!");
- }
-
- }
测试是否发送成功:
- package com.chen.rabbitmq.one;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
-
- public class Consumption {
-
-
- private static final String MQ_KEY="holle";
-
- public static void main(String[] args) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("172.17.18.162");
- factory.setUsername("admin");
- factory.setPassword("123456");
- // 创建一个新的连接
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- /*
- 参数:
- 1: 消费哪个队列
- 2.消费成功之后是否要自动应答, true 带边自动应答 false 手动
- 3.消费者未成功的回调
- 4.消费者取录成功的回调
- */
- channel.basicConsume(MQ_KEY, true,(DeliverCallback) (consumerTag, message) -> System.out.println(new String(message.getBody())),
- (CancelCallback) (consumerTag)-> System.out.println(consumerTag));
- }
-
- }
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
类似nginx的负载均衡(轮询),线1一次,线2一次。
工具类:
- package com.chen.rabbitmq.tow.utils;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- public class RabbitUtils {
- public static Channel rabbitConnection() throws Exception{
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("172.17.18.162");
- factory.setUsername("admin");
- factory.setPassword("123456");
- // 创建一个新的连接
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- return channel;
- }
- }
- 生产者:
-
- package com.chen.rabbitmq.tow.test;
-
- import com.chen.rabbitmq.tow.utils.RabbitUtils;
- import com.rabbitmq.client.Channel;
-
- import java.util.Scanner;
-
- public class Production {
- private final static String MQ_KEY="word";
- // 生产者
- public static void production() throws Exception{
- Channel channel = RabbitUtils.rabbitConnection();
- Scanner scanner = new Scanner(System.in);
- //生产队列
- channel.queueDeclare(MQ_KEY,true,false,false,null);
- while (scanner.hasNext()){
- String next = scanner.next();
- channel.basicPublish("",MQ_KEY,null,next.getBytes());
- System.out.println("消息发布成功-> "+next);
- }
- }
- public static void main(String[] args) throws Exception{
- production();
- }
- }
- 消费者:
-
- package com.chen.rabbitmq.tow.test;
-
-
- import com.chen.rabbitmq.tow.utils.RabbitUtils;
- import com.rabbitmq.client.CancelCallback;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.DeliverCallback;
-
- public class Consumption {
-
- private final static String MQ_KEY="word";
-
- // 消费者
- public static void consumption() throws Exception{
- // 获取连接队列
- Channel channel = RabbitUtils.rabbitConnection();
- channel.basicConsume(MQ_KEY,true,(DeliverCallback)(consumerTag,message)->{
- System.out.println(new String(message.getBody()));
- },(CancelCallback)(tag)->{
- System.out.println(tag);
- System.out.println("中断了");
- });
- }
-
- public static void main(String[] args) throws Exception{
- consumption();
- }
- }
idea开启两个线程。
RabbitMQ 是一个广泛使用的开源消息代理,它支持多种消息协议,例如 AMQP、MQTT、STOMP 等。在 RabbitMQ 中,自动应答(Automatic Acknowledgement,Auto-ack)是一种消息确认机制,用于标记消息是否已被成功接收和处理。了解自动应答的概念,对于构建可靠、高效的消息传递系统非常重要。
当消费者接收并处理来自 RabbitMQ 的消息时,通常会使用消息确认(acknowledgements)机制来告知 RabbitMQ 该消息已经成功处理。这样一来,RabbitMQ 就可以确保消息不会意外丢失。然而,这种确认过程可能会导致一定的延迟和额外开销。为了解决这个问题,RabbitMQ 提供了自动应答机制。
在自动应答模式下,消费者接收到消息后,RabbitMQ 会立即将该消息标记为已处理。这意味着消费者不需要显式地发送确认(ack)消息给 RabbitMQ。这种机制可以降低延迟,提高消息传递的速度,但是也存在一定的风险。因为消息一旦被发送出去,RabbitMQ 就认为它已经成功处理,而实际上消费者可能还没有完成对消息的处理。如果消费者在处理消息时发生故障,那么这个消息可能会丢失。
方法:
Channel.basicAck
(用于肯定确认)
RabbitMQ
已知道该消息并且成功的处理消息,可以将其丢弃了
Channel.basicNack
(用于否定确认)
Channel.basicReject
(用于否定确认)
与 Channel.basicNack
相比少一个参数 不处理该消息了直接拒绝,可以将其丢弃了
Multiple
- //源码
- public void basicAck(long deliveryTag, boolean multiple) throws IOException {
- this.delegate.basicAck(deliveryTag, multiple);
- }
multiple 的 true 和 false 代表不同意思:
true 代表批量应答 channel 上未应答的消息
比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时5-8 的这些还未应答的消息都会被确认收到消息应答
2.false 只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答
为了解决消息丢失问题。
具体代码:
生产者:
- package com.chen.rabbitmq.three;
- import com.chen.rabbitmq.tow.utils.RabbitUtils;
- import com.rabbitmq.client.Channel;
- import java.nio.charset.StandardCharsets;
- import java.util.Scanner;
- public class Pro {
-
- private static final String MQ_KEY="mqkey";
-
- public static void pro() throws Exception{
- Channel channel = RabbitUtils.rabbitConnection();
- channel.queueDeclare(MQ_KEY,true,false,false,null);
-
- Scanner scanner = new Scanner(System.in);
- while (scanner.hasNext()){
- String next = scanner.next();
- channel.basicPublish("",MQ_KEY,null,scanner.next().getBytes());
- System.out.println("消息发布成功-> "+next);
- }
- }
- public static void main(String[] args) throws Exception {
- pro();
- }
- }
消费者1:
- package com.chen.rabbitmq.three;
-
- import com.chen.rabbitmq.tow.utils.RabbitUtils;
- import com.rabbitmq.client.CancelCallback;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.DeliverCallback;
-
- //消费者1
- public class Word1 {
-
- public static final String MQ_KEY="mqkey";
-
-
- public static void word() throws Exception{
- Channel channel = RabbitUtils.rabbitConnection();
- channel.basicConsume(MQ_KEY,false,(DeliverCallback) (consumerTag,message)->{
- // 睡眠1s
- try {
- Thread.sleep(1*1000);
- System.out.println("Word1接收到消息->"+new String(message.getBody()));
- // 参数一:tag标记 参数二:是否批量
- channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- },(CancelCallback) e->{
- System.out.println("消息中断"+e);
- } );
-
- }
-
- public static void main(String[] args) throws Exception {
- word();
- }
-
- }
消费者2:
- package com.chen.rabbitmq.three;
-
- import com.chen.rabbitmq.tow.utils.RabbitUtils;
- import com.rabbitmq.client.CancelCallback;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.DeliverCallback;
-
- //消费者1
- public class Word2 {
-
- public static final String MQ_KEY="mqkey";
-
-
- public static void word() throws Exception{
- Channel channel = RabbitUtils.rabbitConnection();
- channel.basicConsume(MQ_KEY,false,(DeliverCallback) (consumerTag,message)->{
- // 睡眠10s
- try {
- Thread.sleep(10*1000);
- System.out.println("Word2接收到消息->"+new String(message.getBody()));
- // 参数一:tag标记 参数二:是否批量
- channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- },(CancelCallback) e->{
- System.out.println("消息中断"+e);
- } );
-
- }
-
- public static void main(String[] args) throws Exception{
- word();
- }
-
- }
经测试会发现,消费者1为第一个接收到消息,接下来当生产者在生产出一条消息,应到消费者2接收到消息,但是此时消费者2突然出现宕机
,使用了应答机制,消息则会重新打到消费者1;
作用:当rabbitmq宕机后,重启队列依然存在
- //创建队列时的第二个参数为设置持久化
- channel.queueDeclare(MQ_KEY,true,false,false,null);
作用:当rabbitmq宕机了重新启动,发送的消息依然存在。
下面的方法不是绝对的能保证消息的持久化
- //生产者
- private static final String MQ_KEY="mqkey";
- public static void pro() throws Exception{
- Channel channel = RabbitUtils.rabbitConnection();
- channel.queueDeclare(MQ_KEY,true,false,false,null);
-
- Scanner scanner = new Scanner(System.in);
- while (scanner.hasNext()){
- String next = scanner.next();
- //MessageProperties.PERSISTENT_TEXT_PLAIN 消息持久化
- channel.basicPublish("",MQ_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,scanner.next().getBytes());
- System.out.println("消息发布成功-> "+next);
- }
- }
完成以上两步还不足以持久化,要把发布确认加上。
- //默认是不开启的
- Channel channel = RabbitUtils.rabbitConnection();
- channel.confirmSelect();//开启发布确认
1.单个确认发布
这个发布确认是同步的,需等待确认一次在发布下一次,一手交钱一手交货原则
缺点:发布速度特别慢
- //单个确认
- public static void one() throws Exception{
- Channel channel = RabbitUtils.rabbitConnection();
- //开启发布确认
- channel.confirmSelect();
- String uuid = UUID.randomUUID().toString();
-
- //创建队列
- channel.queueDeclare(uuid,true,false,false,null);
- //开始时间
- long begin = System.currentTimeMillis();
-
- for (Integer i = 0; i < COUNT; i++) {
- String message = i + "";
- channel.basicPublish("",uuid,null,message.getBytes());
- //发布确认
- boolean flag = channel.waitForConfirms();
- if(flag){
- System.out.println("消息确认成功!");
- }
- }
- long last = System.currentTimeMillis();
- System.out.println("耗时:"+(last-begin));
- }
2.批量确认发布
发布速度相对单个发布确认要快,但是当其中一条消息出现异常,将无法查找到那个消息丢失 。
- //批量
- public static void batch() throws Exception{
- Channel channel = RabbitUtils.rabbitConnection();
- String uuid = UUID.randomUUID().toString();
- //开启消息确认
- channel.confirmSelect();
- //创建队列
- channel.queueDeclare(uuid,true,false,false,null);
- //这个这个变量用记录发布值
- Integer messageCount=100;
- Integer record =0;
- //开始时间
- long begin = System.currentTimeMillis();
-
- for (Integer i = 0; i < COUNT; i++) {
- record++;
- String message=i+"";
- //发布消息
- channel.basicPublish("",uuid,null,message.getBytes());
- if(messageCount.equals(record)){
- channel.waitForConfirms();
- record=0;
- }
- }
- long last = System.currentTimeMillis();
- System.out.println("耗时"+(last-begin));
- }
3.异步确认发布
(推荐使用)
异步确认虽然比上的两个代码复杂,但同时也解决了上面两种方式遗留下来的问题。
- public static void asyn() throws Exception{
-
- Channel channel = RabbitUtils.rabbitConnection();
- //开启发布确认
- channel.confirmSelect();
- String uuid = UUID.randomUUID().toString();
-
- //创建队列
- channel.queueDeclare(uuid,true,false,false,null);
- //开始时间
- long begin = System.currentTimeMillis();
-
- // 创建一个线程的ListMap用于记录 ----》处理异步未确认的消息
- ConcurrentSkipListMap<Long, String> map = new ConcurrentSkipListMap<>();
-
- // 监听消息
- channel.addConfirmListener((deliveryTag, multiple)->{
- if(multiple){
- ConcurrentNavigableMap<Long, String> longStringConcurrentNavigableMap =
- map.headMap(deliveryTag);
- longStringConcurrentNavigableMap.clear();
- }else{
- map.remove(deliveryTag);
- }
- System.out.println("确认消息:"+deliveryTag);
-
- },(deliveryTag, multiple)->{
- String message = map.get(deliveryTag);
- System.out.println("发送失败的数据是:"+message+"未确认消息:"+deliveryTag+"-----失败");
- });
- for (Integer i = 0; i < COUNT; i++) {
- String message=""+i;
- channel.basicPublish("",uuid,null,message.getBytes());
- //获取信道的标识,存入消息
- map.put(channel.getNextPublishSeqNo(),message);
- }
- long last = System.currentTimeMillis();
- System.out.println("耗时:"+(last-begin));
- }
能者多劳原则
)在上面中的所有例子都是尊寻这轮询的规则去执行的,问题:当其中的一台服务响应特别慢时就会影响到整体的效率。
channel.basicQos(1);
- //消费者
- public static void word() throws Exception{
- Channel channel = RabbitUtils.rabbitConnection();
- //设置不公平分发
- channel.basicQos(1);
- channel.basicConsume(MQ_KEY,false,(DeliverCallback) (consumerTag,message)->{
- try {
- //模拟虚拟机延迟
- Thread.sleep(1*1000);
- System.out.println("Word2接收到消息->"+new String(message.getBody()));
- channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- },(CancelCallback) e->{
- System.out.println("消息中断"+e);
- } );
- }
也可以用来设置预期值!
- //消费者1
- public static void word2() throws Exception{
- Channel channel = RabbitUtils.rabbitConnection();
- //设置预期值
- channel.basicQos(3);
- channel.basicConsume(MQ_KEY,false,(DeliverCallback) (consumerTag,message)->{
- try {
- //模拟虚拟机延迟
- Thread.sleep(1*1000);
- System.out.println("Word2接收到消息->"+new String(message.getBody()));
- channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- },(CancelCallback) e->{
- System.out.println("消息中断"+e);
- } );
- }
- //消费者2
- public static void word2() throws Exception{
- Channel channel = RabbitUtils.rabbitConnection();
- //设置预期值
- channel.basicQos(5);
- channel.basicConsume(MQ_KEY,false,(DeliverCallback) (consumerTag,message)->{
- try {
- //模拟虚拟机延迟
- Thread.sleep(10*1000);
- System.out.println("Word2接收到消息->"+new String(message.getBody()));
- channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- },(CancelCallback) e->{
- System.out.println("消息中断"+e);
- } );
- }
在RabbitMQ中,生产者发送消息不会直接将消息投递到队列中,而是先将消息投递到交换机中, 在由交换机转发到具体的队列, 队列再将消息以推送或者拉取方式给消费者进行消费
与交换机产生关系,并且能有routekey控制发送消息给哪个队列。
扇形交换机是最基本的交换机类型,它所能做的事清非常简单广播消息。扇形交换机会把能接收到的消息全部发送给绑定在自己身上的队列。因为广播不需要'思考”,所以扇形交换机处理消息的速度也是所有的交换机类型里面最快的。
- //消费者
- public class Word {
- // 交换机名称
- private static String EXCHANGE_NAME="logs";
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitUtils.rabbitConnection();
- // 声明一个交换机
- channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
- // 声明一个队列 临时队列
- String queue = channel.queueDeclare().getQueue();
- // 绑定交换机与队列
- channel.queueBind(queue,EXCHANGE_NAME,"");
- System.out.println("等待消息~");
-
- //消费者取消消息时回调接口
- channel.basicConsume(queue,true, (consumerTag,message)->{
- System.out.println("word1控制台打印接收消息:"+new String(message.getBody(),"UTF-8"));
- },cancelCallback->{});
- }
- }
-
- public class Word2 {
- // 交换机名称
- private static String EXCHANGE_NAME="logs";
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitUtils.rabbitConnection();
- // 声明一个交换机
- channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
- // 声明一个队列 临时队列
- String queue = channel.queueDeclare().getQueue();
- // 绑定交换机与队列
- channel.queueBind(queue,EXCHANGE_NAME,"");
- System.out.println("等待消息~");
- //消费者取消消息时回调接口
- channel.basicConsume(queue,true, (consumerTag,message)->{
- System.out.println("word2控制台打印接收消息:"+new String(message.getBody(),"UTF-8"));
- },cancelCallback->{});
- }
- }
- //生产者
- public class send {
- // 交换机名称
- private static String EXCHANGE_NAME="logs";
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitUtils.rabbitConnection();
-
- Scanner scanner = new Scanner(System.in);
- while (scanner.hasNext()){
- String next = scanner.next();
- channel.basicPublish(EXCHANGE_NAME,"",null,next.getBytes("UTF-8"));
- System.out.println("生产者发送消息:"+next);
- }
- }
- }
直连交换机的路由算法非常简单: 将消息推送到binding key与该消息的routing key相同的队列。
代码几乎类型fanout交换机,只需要指定routerkey即可。
发送到主题交换机的 消息不能有任意的 routing key, 必须是由点号分开的一串单词,这些单词可以是任意的,但通常是与消息相关的一些特征。
如以下是几个有效的routing key:
"stock.usd.nyse", "nyse.vmw", "quick.orange.rabb 代", routing key的单词可以 有很多,最大限制是255 bytes。
Topic 交换机的 逻辑与 direct 交换机有点 相似 使用特定路由键发送的消息 将被发送到所有使用匹配绑定键绑定的队列 ,然而 ,绑定键有两个特殊的情况:
*表示匹配任意一个单词
#表示匹配任意—个或多个单词
比如上图:
发送routerkey为:ws.orange.rabbit
那么对应的就是Q1,Q2
发送routerkey为:lazy.orange.elephant
那么对应的就是Q1,Q2
- //消费者
- public class word1 {
-
- private static final String EXCHANGE_NAME="topic_logs";
-
- private static final String QUEUE_NAME="Q1";
-
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitUtils.rabbitConnection();
- // 创建交换机
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
- // 创建队列
- channel.queueDeclare(QUEUE_NAME,true,true,false,null);
- // 绑定队列
- channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.orange.*");
- // 接收消息
- channel.basicConsume(QUEUE_NAME,true,(consumerTag,message)->{
- System.out.println("接收到的消息:"+new String(message.getBody()));
- },cancelCallback->{});
- System.out.println("等下消息~");
- }
- }
- public class word2 {
-
- private static final String EXCHANGE_NAME="topic_logs";
-
- private static final String QUEUE_NAME="Q2";
-
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitUtils.rabbitConnection();
- // 创建交换机
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
- // 创建队列
- channel.queueDeclare(QUEUE_NAME,true,true,false,null);
- // 绑定队列
- channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.*.rabbit");
- channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"lazy.#");
- // 接收消息
- channel.basicConsume(QUEUE_NAME,true,(consumerTag,message)->{
- System.out.println("接收到的消息:"+new String(message.getBody()));
- },cancelCallback->{});
- System.out.println("等下消息~");
- }
- }
- //生产者
- public class send {
- private static final String EXCHANGE_NAME="topic_logs";
- public static void main(String[] args) throws Exception{
- Channel channel = RabbitUtils.rabbitConnection();
- Scanner scanner = new Scanner(System.in);
- while (true){
- System.out.println("请输入routerkey:");
- String key = scanner.next();
- System.out.println("请输入消息内容:");
- String message = scanner.next();
- channel.basicPublish(EXCHANGE_NAME,key,null,message.getBytes());
- }
- }
- }
顾名思义:无法被消费的消息,一般来说,producer将消息投递broker或者直接到queue里了,consumer(消费者)从queue取出消息进行消费,但某些时间由特定原因导致queue中的某些消息无法被消费
,这样如果没有后续的处理,就变成了死信。
应用场景:为了确保订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息被消息时发生了异常,这是就将消息存到死信中,还比如说:用户商城下单成功,并且点击支付后在指定时间支付时自动失效。
- //生产者
- public class send {
- private static final String NORMAL_EXCHANGE="normal_exchange";
- public static final String NORMAL_QUEUE="normal_queue";
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitUtils.rabbitConnection();
- // 设置死信时间
- AMQP.BasicProperties basicProperties =
- new AMQP.BasicProperties().builder()
- .expiration("10000").build();
- for (int i = 0; i < 11; i++) {
- String msg="info"+i;
- channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",basicProperties,msg.getBytes());
- }
- }
- }
- //消费者1
- public class C1 {
- private static final String NORMAL_EXCHANGE="normal_exchange";
- private static final String DEAD_EXCHANGE="dead_exchange";
- public static final String NORMAL_QUEUE="normal_queue";
- public static final String DEAD_QUEUE="dead_queue";
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitUtils.rabbitConnection();
- // 创建c1交换机
- channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
- channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.TOPIC);
- // 声明普通队列
- HashMap<String, Object> map = new HashMap<>();
- // 设置过期时间 10s 单位ms 这里有消费整去做控制
- // map.put("x-message-ttl",100000);
- // 正常队列设置死信交换机
- map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
- // 设置死信的routerKey
- map.put("x-dead-letter-routing-key","lisi");
- // 创建普通队列
- channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
- //创建死信队列
- channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
- // 绑定
- channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
- channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
- channel.basicConsume(NORMAL_QUEUE,true,(consumerTag, message) -> {
- System.out.println("C1消息为:"+message.getBody());
- },cancelCallback->{
- });
- }
- }
- //消费者2
- public class C2 {
- public static final String DEAD_QUEUE="dead_queue";
- private static final String DEAD_EXCHANGE="dead_exchange";
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitUtils.rabbitConnection();
- channel.basicConsume(DEAD_QUEUE,true,(consumerTag,message)->{
- System.out.println("消息为:"+new String(message.getBody()));
- },cancelCallback->{});
- }
- }
根据c1做修改,测试报错先删除原来的队列与交换机
//设置正常队列长度的限制 map.put("x-max-length",6);
添加手动应答拒接。
- public class C1 {
- private static final String NORMAL_EXCHANGE="normal_exchange";
- private static final String DEAD_EXCHANGE="dead_exchange";
- public static final String NORMAL_QUEUE="normal_queue";
- public static final String DEAD_QUEUE="dead_queue";
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitUtils.rabbitConnection();
- // 创建c1交换机
- channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
- channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.TOPIC);
- // 声明普通队列
- HashMap<String, Object> map = new HashMap<>();
- // 设置过期时间 10s 单位ms
- // map.put("x-message-ttl",100000);
- // 正常队列设置死信交换机
- map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
- // 设置死信的routerKey
- map.put("x-dead-letter-routing-key","lisi");
- // 设置正常队列长度的限制
- // map.put("x-max-length",6);
- // 创建普通队列
- channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
- //创建死信队列
- channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
- // 绑定
- channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
- channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
- channel.basicConsume(NORMAL_QUEUE,false,(consumerTag, message) -> {
- String msg = new String(message.getBody());
- System.out.println("C1消息为:"+msg);
- // 拒接对应消息
- if(msg.equals("info2")){
- // deliveryTag
- channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
- }else{
- channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
- }
- },cancelCallback->{
- });
- }
- }
官网地址:Spring AMQP
Spring AMQP 是 Spring 框架中的一个模块,它提供了基于 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)标准的抽象层,用于简化在 Spring 应用程序中使用消息队列的过程。Spring AMQP 不仅简化了与消息代理(如 RabbitMQ)的集成,还提供了一套高度可配置的模板类来生产、消费消息,并管理AMQP基础设施组件,如队(Queue)、交换机(Exchange)和绑定(Binding)。
使用
- <!--AMQP依赖,包含RabbitMQ-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
生产者
- logging:
- pattern:
- dateformat: MM-dd HH:mm:ss:SSS
- spring:
- rabbitmq:
- host: 38.6.217.70
- port: 5672
- username: itcast
- password: 123321
- virtual-host: /
- package cn.itcast.mq.spring;
- import org.junit.Test;
- import org.junit.runner.RunWith;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.boot.test.context.SpringBootTest;
- import org.springframework.test.context.junit4.SpringRunner;
- import javax.annotation.Resource;
- @RunWith(SpringRunner.class)
- @SpringBootTest
- public class SpringAmqpTest {
- @Resource
- private RabbitTemplate rabbitTemplate;
- @Test
- public void testSendMessage2SimpleQueue() throws InterruptedException {
- // 1.发送消息
- String message = "Hello, Spring Amqp!";
- rabbitTemplate.convertAndSend("simple.queue", message);
- }
- }
消费者
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- //监听机制
- @Component
- public class SpringRabbitListener {
- @RabbitListener(queues = "simple.queue")
- public void listenSimpleQueueMessage(String msg) {
- System.out.println("spring接收到的消息是:" + msg);
- }
- }
案例:将50条消息在一秒内分类交给两个消费者消费。
- //生成者
- @Test
- public void testSendWordSimpleQueue() throws InterruptedException {
- // 1.发送消息
- String key ="simple.queue";
- String message = "Hello, Spring Amqp____";
- for (int i = 0; i < 49; i++) {
- rabbitTemplate.convertAndSend(key, message+i);
- Thread.sleep(20);
- }
- }
- //消费者
- @Component
- public class SpringRabbitListener {
- @RabbitListener(queues = "simple.queue")
- public void listenSimpleQueueMessage(String msg) throws InterruptedException {
- System.out.println("spring接收到的消息是:" + msg+"___"+ LocalDateTime.now());
- Thread.sleep(20);
- }
-
- @RabbitListener(queues = "simple.queue")
- public void listenFanoutQueue1(String msg) throws InterruptedException {
- System.err.println("FanoutQueue1接收到的消息是:" + msg+"___"+ LocalDateTime.now());
- Thread.sleep(200); //模拟性能
- }
- }
通过执行结果我们可以看出listenFanoutQueue1这个监听器执行的是奇数,而listenSimpleQueueMessage则是偶数。且时间超出了1秒。为什么呢?
因为在生产者发送到队列中时,消费者会预取消息,在默认情况下进行平分机制,在上面代码中我们可以看到我们使用了线程睡眠的方式模拟了性能,在平分的情况下,睡眠200的执行了25条,所以导致了超出了1s。 如何调整呢?
- logging:
- pattern:
- dateformat: MM-dd HH:mm:ss:SSS
- spring:
- rabbitmq:
- host: 38.6.217.70
- port: 5672
- username: itcast
- password: 123321
- virtual-host: /
- listener: #设置预取
- simple:
- prefetch: 1 #每次只取一条
- #这段配置的作用是在使用 RabbitMQ 的时候,配置消费者监听器的简单模式,并设置消息预取值为 1。这意味着每次只会从队列中取出一条消息进行处理,处理完后再去取下一条消息。这种方式可以保证消息的顺序处理。
这种交换机需要进行绑定对应的队列,绑定对应的队列后,生产者将消息推送给交换机,交换机会将消息分别都发给绑定的消息队列。
实现
- //消费者配置
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.FanoutExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- @Configuration
- public class FanoutExchangeConfig {
-
- // 创建队列1 fanout.queue1
- @Bean
- public Queue queue1(){
- return new Queue("fanout.queue1");
- }
- // 创建交换机 fanoutExchange
- @Bean
- public FanoutExchange fanoutExchange(){
- return new FanoutExchange("fanoutExchange");
- }
- // 队列1绑定交换机
- @Bean
- public Binding bindingExchange1(){
- return BindingBuilder.bind(queue1()).to(fanoutExchange());
- }
- // 创建队列1 fanout.queue2
- @Bean
- public Queue queue2(){
- return new Queue("fanout.queue2");
- }
- // 队列2绑定交换机
- @Bean
- public Binding bindingExchange2(){
- return BindingBuilder.bind(queue2()).to(fanoutExchange());
- }
- }
消费者
- @Component
- public class SpringRabbitListener {
- @RabbitListener(queues = "fanout.queue1")
- public void listenFanoutQueue1QueueMessage(String msg) throws InterruptedException {
- System.out.println("fanout.queue1接收到的消息是:" + msg+"___"+ LocalDateTime.now());
- }
- @RabbitListener(queues = "fanout.queue2")
- public void listenFanoutQueue2QueueMessage(String msg) throws InterruptedException {
- System.out.println("fanout.queue2接收到的消息是:" + msg+"___"+ LocalDateTime.now());
- }
- }
生产者
- @Test
- public void testSendMessageFanoutQueue() {
- // 1.发送消息
- String message = "Hello, testSendMessageFanoutQueue !";
- // 交换机名称
- String exchange = "fanoutExchange";
- rabbitTemplate.convertAndSend(exchange,"",message);
- }
这种交换机需要指定一个key进行发送,通过可以区别发送到那个队列,同时这些队列也可以绑定相同的key
,那么也就是实现了fanout的效果。
实现
- //消费者
- @Component
- public class DirectExchangeListener {
-
- // 可以通过@bena的方式进注入,这里我们采用@RabbitListenner的方式
-
- @RabbitListener(
- bindings = @QueueBinding(
- value = @Queue(name = "direct.queue1"),//绑定的队列
- exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT),//绑定的交换机
- key = {"red", "blue"} //绑定的key
- )
- )
- public void listenDirectQueue1(String msg) throws InterruptedException {
- System.out.println("listenDirectQueue1接收到的消息是:" + msg);
- }
-
-
- @RabbitListener(
- bindings = @QueueBinding(
- value = @Queue(name = "direct.queue2"),//绑定的队列
- exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT),//绑定的交换机
- key = {"red", "yellow"} //绑定的key
- )
- )
- public void listenDirectQueue2(String msg) throws InterruptedException {
- System.out.println("listenDirectQueue2接收到的消息是:" + msg);
- }
- }
- //生产者
- @Test
- public void testSendMessageDirectQueue() {
- String routingKey = "yellow";
- // 1.发送消息
- String message = "Hello, testSendMessageFanoutQueue !"+"__"+routingKey;
- // 交换机名称
- String exchange = "direct.exchange";
- rabbitTemplate.convertAndSend(exchange,routingKey,message);
- }
这种交换机其实和direct类型的交换机差不错,只不过它是使用通配符的方式。
使用
- //消费者
- @Component
- public class TopicExchangeListener {
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "topic.queue1"),
- exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
- key = {"china.#"}
- ))
- public void listenTopicQueue1(String msg) throws InterruptedException {
- System.out.println("topic.queue1接收到消息:" + msg);
- }
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "topic.queue2"),
- exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
- key = {"#.news"}
- ))
- public void listenTopicQueue2(String msg) throws InterruptedException {
- System.out.println("topic.queue2接收到消息:" + msg);
- }
- }
- //生产者
- @Test
- public void testSendMessageTopicQueue() {
- String routingKey = "news";
- // 1.发送消息
- String message = "Hello, testSendMessageTopicQueue !"+"__"+routingKey;
- // 交换机名称
- String exchange = "topic.exchange";
- rabbitTemplate.convertAndSend(exchange,routingKey,message);
- }
例子:
- //我们声明一个objQueue
- @Bean
- public Queue objQueue(){
- return new Queue("obj.queue");
- }
-
- //发送消息
- @Test
- public void testSendMessageobjQueue() {
- Map<String, Object> map = new HashMap<>();
- map.put("name","test");
- map.put("age",18);
- rabbitTemplate.convertAndSend("obj.queue",map);
- }
我们重rabbitmq的ui界面中我们可以发现消息是基于JDK完成的序列化。
缺点:这样不能很直接的看出消息的结果,并且占用大量内存,所以下面我们使用jdckson进行json序列化。
发送者
依赖:
- <dependency>
- <groupId>com.fasterxml.jackson.dataformat</groupId>
- <artifactId>jackson-dataformat-xml</artifactId>
- </dependency>
配置bean
- //生产者配置
- import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
- import org.springframework.amqp.support.converter.MessageConverter;
- @Bean
- public MessageConverter messageConverter() {
- return new Jackson2JsonMessageConverter();
- }
消费者
- <dependency>
- <groupId>com.fasterxml.jackson.dataformat</groupId>
- <artifactId>jackson-dataformat-xml</artifactId>
- </dependency>
- //销售者配置
- import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
- import org.springframework.amqp.support.converter.MessageConverter;
- @Bean
- public MessageConverter messageConverter() {
- return new Jackson2JsonMessageConverter();
- }
- @RabbitListener(queues = "obj.queue")
- public void listenObjQueueMessage( Map<String, Object> msg) throws InterruptedException {
- System.out.println("obj.queue接收到的消息是:" + msg);
- }
后续会更新使用MQ做的具体案例:秒杀、订单业务处理等。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。