赞
踩
消息队列是一种应用之间的通信方式,消息发送后可以立即返回,由消息系统保证消息的可靠传递。
消息(Message)是指应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以很复杂,可以嵌入对象。
消息队列是一种应用之间的通信方式,消息发送后可以立即返回,由消息系统保证消息的可靠传递。消息发布者只管把消息发布到MQ中而不管是谁来取,消息使用者只管从消息队列中取消息而不管是谁发布的。这样发布者和使用者都不知道对方的存在。
特点
Active MQ | Rabbit MQ | Kafka | Rocket MQ | |
---|---|---|---|---|
发布订阅 | 支持 | 支持 | 支持 | 支持 |
轮询分发 | 支持 | 支持 | 支持 | / |
公平分发 | / | 支持 | 支持 | / |
重发 | 支持 | 支持 | / | 支持 |
消息拉去 | / | 支持 | 支持 | 支持 |
# 添加erlang solutions源 wget https://packages.erlang-solutions.com/erlang-solutions-2.0-1.noarch.rpm # 也可直接从官网下载 sudo rpm -Uvh erlang-solutions-2.0-1.noarch.rpm yum install -y erlang -------------------------------------------------------------------------------- # 直接安装对应版本 [root@localhost otp_src_22.1]# mv otp_src_22.1.tar.gz /usr/local [root@localhost otp_src_22.1]# cd /usr/local [root@localhost local]# tar -zxvf otp_src_22.1.tar.gz [root@localhost local]# mv otp_src_22.1 erlang [root@localhost local]# cd erlang [root@localhost erlang]# ./configure --prefix=/usr/local/erlang [root@localhost erlang]# make & make install #设置环境变量(按a输入,按esc 输入:wq保存,q不保存) vim /etc/profile #加入一条: export PATH=$JAVA_HOME/bin/:$PATH:/usr/local/erlang/otp_src_22.1/bin:$PATH #让配置生效 source /etc/profile --------------------------------------------------------------------------- #查看erlang是否安装成功 erl # 安装rabbit mq 注意版本对应 yum install -y socat rpm -Uvh rabbitmq-server-3.8.16-1.el7.noarch.rpm yum install -y rabbitmq-server ---------------------------------------------------------------- # 删除 # 查看rabbitmq安装的相关列表 $ yum list | grep rabbitmq # 卸载rabbitmq已安装的相关内容 $ yum -y remove rabbitmq-server.noarch # 查看erlang安装的相关列表 $ yum list | grep erlang # 卸载erlang已安装的相关内容 $ yum -y remove erlang-* $ yum remove erlang.x86_64(当卸载不干净时执行这个) # 部分相关文件夹大家看着删除,我就是记录一下 $ rm -rf /usr/lib64/erlang $ rm -rf /var/lib/rabbitmq $ rm -rf /usr/local/erlang $ rm -rf /usr/local/rabbitmq
# 启动rabbitmq [root@slave01 opt]# systemctl start rabbitmq-server [root@slave01 opt]# systemctl status rabbitmq-server.service ● rabbitmq-server.service - RabbitMQ broker Loaded: loaded (/usr/lib/systemd/system/rabbitmq-server.service; disabled; vendor preset: disabled) Active: active (running) since 二 2021-05-11 21:33:15 CST; 1min 45s ago Main PID: 8128 (beam.smp) Status: "Initialized" CGroup: /system.slice/rabbitmq-server.service ├─8128 /usr/lib64/erlang/erts-11.2/bin/beam.smp -W w -MBas ageffcbf -MHas ageffcbf -MBlmbcs 5... ├─8143 erl_child_setup 32768 ├─8167 /usr/lib64/erlang/erts-11.2/bin/epmd -daemon ├─8187 inet_gethost 4 └─8221 inet_gethost 4 5月 11 21:32:41 slave01 rabbitmq-server[8128]: ########## Licensed under the MPL 2.0. Website: http...com 5月 11 21:32:41 slave01 rabbitmq-server[8128]: Doc guides: https://rabbitmq.com/documentation.html 5月 11 21:32:41 slave01 rabbitmq-server[8128]: Support: https://rabbitmq.com/contact.html 5月 11 21:32:41 slave01 rabbitmq-server[8128]: Tutorials: https://rabbitmq.com/getstarted.html 5月 11 21:32:41 slave01 rabbitmq-server[8128]: Monitoring: https://rabbitmq.com/monitoring.html 5月 11 21:32:41 slave01 rabbitmq-server[8128]: Logs: /var/log/rabbitmq/rabbit@slave01.log 5月 11 21:32:41 slave01 rabbitmq-server[8128]: /var/log/rabbitmq/rabbit@slave01_upgrade.log 5月 11 21:32:41 slave01 rabbitmq-server[8128]: Config file(s): (none) 5月 11 21:33:15 slave01 rabbitmq-server[8128]: Starting broker... completed with 0 plugins. 5月 11 21:33:15 slave01 systemd[1]: Started RabbitMQ broker. Hint: Some lines were ellipsized, use -l to show in full. [root@slave01 opt]# systemctl enable rabbitmq-server Created symlink from /etc/systemd/system/multi-user.target.wants/rabbitmq-server.service to /usr/lib/systemd/system/rabbitmq-server.service.
# 配置图形化界面 rabbitmq-plugins enable rabbitmq_management # 出现问题 {:query, :rabbit@slave01, {:badrpc, :timeout}} # 需要设置主机 vi /etc/hosts # 先通过hostname查到自己的主机名 127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 slave01 ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6 192.168.169.129 slave01(对应的主机名称) # 安装成功 Enabling plugins on node rabbit@slave01: rabbitmq_management The following plugins have been configured: rabbitmq_management rabbitmq_management_agent rabbitmq_web_dispatch Applying plugin configuration to rabbit@slave01... Plugin configuration unchanged. # 可以访问 ip:15672 账号:gust 密码:gust 远程访问需要授权 [root@slave01 rabbitmq]# rabbitmqctl add_user admin admin Adding user "admin" ... Done. Don't forget to grant the user permissions to some virtual hosts! See 'rabbitmqctl help set_permissions' to learn more. [root@slave01 rabbitmq]# rabbitmqctl set_user_tags admin administrator Setting tags for user "admin" to [administrator] ... # 授予权限 rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
# docker 安装rabbit mq
[root@master opt]# docker -v
Docker version 20.10.6, build 370c289
# 拉取镜像
[root@master opt]# docker pull rabbitmq:management
[root@master opt]# docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
a45bac2ef4cefcad3bca364eb1682bdeb6e4eb04bb47ec5bfd9e9f42b68e759f
# 查看
docker ps -a
# 启动
docker start a45bac2ef4ce
none、management、policymaker、monitoring、administrator
RabbitMQ各类角色描述:
package com.zhj.rabbitmq.simple; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; /** * @author zhj */ public class Producer { public static void main(String[] args) { // 基于 amqp 协议 // 1. 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.169.131"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { // 2. 创建连接对象 connection = connectionFactory.newConnection("生产者"); // 3. 获取通道 channel = connection.createChannel(); // 4. 通过通道创建交换机,声明队列,绑定关系,路由key,发送消息和接收消息 String queueName01 = "queue01"; /** * @params1 队列名称 * @params2 是否需要持久化 * @params3 排他性,是否独占 * @params4 是否自动删除 * @params5 附带参数 */ channel.queueDeclare(queueName01, false, false, false, null); // 5. 准备消息内容 String message = "hello rabbit mq"; // 6. 发送消息给队列queue channel.basicPublish("", queueName01, null, message.getBytes(StandardCharsets.UTF_8)); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { // 7. 关闭连接 if (channel != null && channel.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } // 8. 关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } } } } // ------------------------------------------------------------------------ package com.zhj.rabbitmq.simple; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author zhj */ public class Consumer { public static void main(String[] args) { // 基于 amqp 协议 // 1. 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.169.131"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { // 2. 创建连接对象 connection = connectionFactory.newConnection("生产者"); // 3. 获取通道 channel = connection.createChannel(); // 4. 通过通道创建交换机,声明队列,绑定关系,路由key,发送消息和接收消息 String queueName01 = "queue01"; channel.basicConsume(queueName01, true, new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { System.out.println("收到消息是:" + new String(delivery.getBody(), "UTF-8")); } }, new CancelCallback() { @Override public void handle(String s) throws IOException { System.out.println("接收消息失败!"); } }); // 5. 阻断 System.out.println("开始接收消息"); System.in.read(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { // 7. 关闭连接 if (channel != null && channel.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } // 8. 关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } } } }
AMQP全称: Advanced Message Queuing Protocol(高级消息队列协议)。是应用层协议的一个开发标准,为面向消息的中间件设计。
工作过程
深入理解
https://www.rabbitmq.com/getstarted.html
简单模式Simple
工作模式Work 公平模式(根据服务器性能读取 【basicQos 一次读多少条】)
发布订阅模式 fanout
发送给所有订阅的队列 没有路由key 是一种广播机制
路由模式 routing (direct) 有routing - key
主题模式 Topic
模糊匹配 符号“#”匹配路由键的一个或多个词或没有,符号“*”匹配路由键的一个词
参数模式
导入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency>
配置
server:
port: 8080
# 配置rabbit mq
spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
host: 192.168.169.131
port: 5672
config
/** * @author zhj */ @Configuration public class RabbitMqConfiguration { // 1. 声明fanout模式交换机 @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanout_order_exchange",true,false); } // 2. 声明队列 sms email wechat @Bean public Queue smsQueue() { return new Queue("sms.fanout.queue",true); } @Bean public Queue emailQueue() { return new Queue("email.fanout.queue",true); } @Bean public Queue wechatQueue() { return new Queue("wechat.fanout.queue",true); } // 3. 完成绑定关系 @Bean public Binding smsBinding() { return BindingBuilder.bind(smsQueue()).to(fanoutExchange()); } @Bean public Binding emailBinding() { return BindingBuilder.bind(emailQueue()).to(fanoutExchange()); } @Bean public Binding wechatBinding() { return BindingBuilder.bind(wechatQueue()).to(fanoutExchange()); } }
// 消息发送
String exchangeName = "fanout_order_exchange";
String routingKey = "";
rabbitTemplate.convertAndSend(exchangeName,routingKey,"发送信息");
@Service
@RabbitListener(queues = "email.fanout.queue")
public class FanoutEmailService {
@RabbitHandler
public void receiveMessage(String message) {
System.out.println("邮件接收到订单信息:" + message);
}
}
给队列设置过期时间,超过时间会转移消息(过期队列可以到死信队列),默认会直接删除
消息过期时间和过期队列同时存在,以时间短的为主
// 过期队列
@Bean
public DirectExchange ttlDirectExchange() {
return new DirectExchange("ttl_direct_exchange",true,false);
}
@Bean
public Queue ttlDirectQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 5000);
return new Queue("ttl.direct.queue",true,false,false,args);
}
@Bean
public Binding ttlBinding() {
return BindingBuilder.bind(ttlDirectQueue()).to(ttlDirectExchange()).with("ttl");
}
// 消息过期时间
// 消息发送
String exchangeName = "ttl_direct_exchange";
String routingKey = "ttl-message";
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("8000");
message.getMessageProperties().setContentEncoding("UTF-8");
return message;
}
};
rabbitTemplate.convertAndSend(exchangeName,routingKey,order.getId(),messagePostProcessor);
消息拒绝,消息过期等会转移如死信队列
// 消息过期移入死信队列 @Configuration public class TTLRabbitMqConfiguration { @Bean public DirectExchange ttlDirectExchange() { return new DirectExchange("ttl_direct_exchange",true,false); } @Bean public Queue ttlDirectQueue() { Map<String, Object> args = new HashMap<>(); args.put("x-message-ttl", 5000); // 长度限制 args.put("x-max-length", 5); args.put("x-dead-letter-exchange","dead_direct_exchange"); args.put("x-dead-letter-routing-key","dead"); return new Queue("ttl.direct.queue",true,false,false,args); } @Bean public Queue ttlMessageDirectQueue() { return new Queue("ttl.message.direct.queue",true); } @Bean public Binding ttlBinding() { return BindingBuilder.bind(ttlDirectQueue()).to(ttlDirectExchange()).with("ttl"); } @Bean public Binding ttlMessageBinding() { return BindingBuilder.bind(ttlMessageDirectQueue()).to(ttlDirectExchange()).with("ttl-message"); } } // 死信队列 @Configuration public class DeadRabbitMqConfiguration { @Bean public DirectExchange deadDirectExchange() { return new DirectExchange("dead_direct_exchange",true,false); } @Bean public Queue deadDirectQueue() { return new Queue("dead.direct.queue",true); } @Bean public Binding deadBinding() { return BindingBuilder.bind(deadDirectQueue()).to(deadDirectExchange()).with("dead"); } }
首先需要架设几台服务器做集群使用,初始环境配置各个节点的主机名,与id映射 注意开防防火墙
# vim /etc/hostname 修改主机名
# vim /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 slave01
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.169.131 master
192.168.169.132 slave01
192.168.169.133 slave03
192.168.169.134 slave04
192.168.169.135 slave05
192.168.169.136 slave06
将 rabbit-slave01 上的 .erlang.cookie 文件拷贝到其他两台主机上。该 cookie 文件相当于密钥令牌,集群中的 RabbitMQ 节点需要通过交换密钥令牌以获得相互认证,因此处于同一集群的所有节点需要具有相同的密钥令牌,否则在搭建过程中会出现 Authentication Fail 错误。
RabbitMQ 服务启动时,erlang VM 会自动创建该 cookie 文件,默认的存储路径为 /var/lib/rabbitmq/.erlang.cookie 或 $HOME/.erlang.cookie,该文件是一个隐藏文件,需要使用 ls -al 命令查看。(拷贝.cookie时,各节点都必须停止MQ服务):
停止所有服务,构建Erlang的集群环境
systemctl stop rabbitmq-server
scp /var/lib/rabbitmq/.erlang.cookie root@slave02:/var/lib/rabbitmq/
scp /var/lib/rabbitmq/.erlang.cookie root@slave03:/var/lib/rabbitmq/
由于你可能在三台主机上使用不同的账户进行操作,为避免后面出现权限不足的问题,这里建议将 cookie 文件原来的 400 权限改为 600,命令如下:
chmod 600 /var/lib/rabbitmq/.erlang.cookie
注:cookie 中的内容就是一行随机字符串,可以使用 cat 命令查看。
在三台主机上均执行以下命令,启动 RabbitMQ 服务:
systemctl start rabbitmq-server
开通 EPMD 端口
epmd进程使用的端口。用于RabbitMQ节点和CLI工具的端点发现服务。
开启防火墙 4369 端口
firewall-cmd --zone=public --add-port=4369/tcp --permanent
重启
systemctl restart firewalld.service
RabbitMQ 集群的搭建需要选择其中任意一个节点为基准,将其它节点逐步加入。这里我们以 rabbit-slave01 为基准节点,将 rabbit-slave02 和 rabbit-slave03 加入集群。在 rabbit-slave02 和rabbit-slave03 上执行以下命令:
# 1.停止服务
rabbitmqctl stop_app
# 2.重置状态
rabbitmqctl reset
# 3.节点加入, 在一个node加入cluster之前,必须先停止该node的rabbitmq应用,即先执行stop_app
# rabbit-slave02加入slave01, rabbit-salave03加入slave02
rabbitmqctl join_cluster rabbit@rabbit-node1
# 4.启动服务
rabbitmqctl start_app
join_cluster 命令有一个可选的参数 --ram ,该参数代表新加入的节点是内存节点,默认是磁盘节点。如果是内存节点,则所有的队列、交换器、绑定关系、用户、访问权限和 vhost 的元数据都将存储在内存中,如果是磁盘节点,则存储在磁盘中。内存节点可以有更高的性能,但其重启后所有配置信息都会丢失,因此RabbitMQ 要求在集群中至少有一个磁盘节点,其他节点可以是内存节点。当内存节点离开集群时,它可以将变更通知到至少一个磁盘节点;然后在其重启时,再连接到磁盘节点上获取元数据信息。除非是将 RabbitMQ 用于 RPC 这种需要超低延迟的场景,否则在大多数情况下,RabbitMQ 的性能都是够用的,可以采用默认的磁盘节点的形式。这里为了演示, rabbit-slave03 我就设置为内存节点。
另外,如果节点以磁盘节点的形式加入,则需要先使用 reset 命令进行重置,然后才能加入现有群集,重置节点会删除该节点上存在的所有的历史资源和数据。采用内存节点的形式加入时可以略过 reset 这一步,因为内存上的数据本身就不是持久化的。
在 rabbit-slave03 和 3 上执行以上命令后,集群就已经搭建成功,此时可以在任意节点上使用 rabbitmqctl cluster_status 命令查看集群状态,输出如下:
[root@slave01 rabbitmq]# rabbitmqctl cluster_status
可以看到 nodes 下显示了全部节点的信息,其中 rabbit-slave02 和 rabbit-slave03 上的节点都是 disc 类型,即磁盘节点;而 rabbit-slave03 上的节点为 ram,即内存节点。此时代表集群已经搭建成功,默认的 cluster_name 名字为 rabbit@slave01,如果你想进行修改,可以使用以下命令:
rabbitmqctl set_cluster_name my_rabbitmq_cluster
开启镜像队列
这里我们为所有队列开启镜像配置,其语法如下:
rabbitmqctl set_policy ha-all “^” ‘{“ha-mode”:“all”}’
复制系数
在上面我们指定了 ha-mode 的值为 all ,代表消息会被同步到所有节点的相同队列中。这里我们之所以这样配置,因为我们本身只有三个节点,因此复制操作的性能开销比较小。如果你的集群有很多节点,那么此时复制的性能开销就比较大,此时需要选择合适的复制系数。通常可以遵循过半写原则,即对于一个节点数为 n 的集群,只需要同步到 n/2+1 个节点上即可。此时需要同时修改镜像策略为 exactly,并指定复制系数 ha-params,示例命令如下:
rabbitmqctl set_policy ha-two “^” ‘{“ha-mode”:“exactly”,“ha-params”:2,“ha-sync-mode”:“automatic”}’
除此之外,RabbitMQ 还支持使用正则表达式来过滤需要进行镜像操作的队列,示例如下:
rabbitmqctl set_policy ha-all “^ha.” ‘{“ha-mode”:“all”}’
此时只会对 ha 开头的队列进行镜像。更多镜像队列的配置说明,可以参考官方文档:Highly Available (Mirrored) Queues
查看镜像状态
配置完成后,可以通过 Web UI 界面查看任意队列的镜像状态,情况如下:
没有一个直接的命令可以关闭整个集群,需要逐一进行关闭。但是需要保证在重启时,最后关闭的节点最先被启动。如果第一个启动的不是最后关闭的节点,那么这个节点会等待最后关闭的那个节点启动,默认进行 10 次连接尝试,超时时间为 30 秒,如果依然没有等到,则该节点启动失败。
这带来的一个问题是,假设在一个三节点的集群当中,关闭的顺序为 node1,node2,node3,如果 node1 因为故障暂时没法恢复,此时 node2 和 node3 就无法启动。想要解决这个问题,可以先将 node1 节点进行剔除,命令如下:
rabbitmqctl forget_cluster_node rabbit@slave01 --offline
此时需要加上 -offline 参数,它允许节点在自身没有启动的情况下将其他节点剔除。
重置当前节点 # 1.停止服务 rabbitmqctl stop_app # 2.重置集群状态 rabbitmqctl reset # 3.重启服务 rabbitmqctl start_app 重新加入集群 # 1.停止服务 rabbitmqctl stop_app # 2.重置状态 rabbitmqctl reset # 3.节点加入 rabbitmqctl join_cluster rabbit@rabbit-node1 # 4.重启服务 rabbitmqctl start_app # 完成后重新检查 RabbitMQ 集群状态 rabbitmqctl cluster_status # 除了在当前节点重置集群外,还可在集群其他正常节点将节点踢出集群 rabbitmqctl forget_cluster_node rabbit@rabbit-node3 - 变更节点类型 我们可以将节点的类型从RAM更改为Disk,反之亦然。假设我们想要反转rabbit@rabbit-node2和rabbit@rabbit-node1的类型,将前者从RAM节点转换为磁盘节点,而后者从磁盘节点转换为RAM节点。为此,我们可以使用change_cluster_node_type命令。必须首先停止节点。 - 停止服务 rabbitmqctl stop_app - 变更类型 ram disc rabbitmqctl change_cluster_node_type disc - 重启服务 rabbitmqctl start_app 清除 RabbitMQ 节点配置 - 如果遇到不能正常退出直接kill进程 systemctl stop rabbitmq-server - 查看进程 ps aux|grep rabbitmq - 清楚节点rabbitmq配置 rm -rf /var/lib/rabbitmq/mnesia
分布式事务可以保证数据的最终一致性
在分布式服务调用过程中,虽然双方都加了事务控制,但是只有其中一个服务出现异常,就会造成事务的单方面回滚。一个服务回滚一个服务不回滚
引入Rabbit MQ解决分布式事务
可靠生产和可靠消费
增加冗余数据 保证可靠生产
设置消息队列确认机制
增加配置
可靠消费
解决消息重试的方案
控制重发次数
不要加try catch
try+catch + 手动ack
try+catch + 手动ack + 死信队列 +人工干预
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。