赞
踩
可以说topic 包含 header , 所以header 粒度更小, 粒度小代表更灵活 ; 所以当业务是灵活多变的就用header, 如果没啥特殊要求,就直接用topic;
所谓“死信”, 从文字角度理解就是“消息死了,无声了”;rabbitmq作为一个消息中间件,什么情况下存在“需要消息死了,无声了”的场景;----- 消息延时,延时队列;说的没错,比如若有需求说,若订单一只未付款延时15分钟将订单自动取消、微信定时回复、钉钉定时日志等等都可以用延时队列。
消息被塞入到死信队列有2种方式:
上述情况消息不会回到原队列,而是进入到死信队列。
死信队列实现方式:
1)、首先定义死信队列的交换机 exchange 和队列 Queue , 然后再将其与正常的交换机绑定;
- @Configuration
- public class DeadLetterConfig {
-
- public static final String NORMAL_EXCHANGE = "normal-exchange" ;
- public static final String NORMAL_QUEUE = "normal-queue";
- public static final String NORMAL_ROUTING_KEY = "normal.#";
-
- public static final String DEAD_EXCHANGE = "dead-exchange" ;
- public static final String DEAD_QUEUE = "dead-queue";
- public static final String DEAD_ROUTING_KEY = "dead.#";
-
- @Bean
- public Exchange normalExchange(){
- return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE).build();
- }
-
- @Bean
- public Queue normalQueue(){
- return QueueBuilder.durable(NORMAL_QUEUE)
- .deadLetterExchange(DEAD_EXCHANGE)
- .deadLetterRoutingKey("dead.abc")
- // .ttl(10000)
- .maxLength(1)
- .build();
- }
-
- @Bean
- public Binding normalBinding(Queue normalQueue, Exchange normalExchange){
- return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY).noargs();
- }
-
- @Bean
- public Exchange deadExchange(){
- return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).build();
- }
-
- @Bean
- public Queue deadQueue(){
- return QueueBuilder.durable(DEAD_QUEUE).build();
- }
-
- @Bean
- public Binding deadBinding(Queue deadQueue, Exchange deadExchange){
- return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
- }
-
-
- }
2)、定义消费者监听消息
- @Component
- public class DeadListener {
-
- @RabbitListener(queues = DeadLetterConfig.NORMAL_QUEUE)
- public void consume(String msg, Channel channel, Message message) throws IOException {
- System.out.println("接收到normal队列的消息为:" + msg);
- // channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
- channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
- }
3)、定义测试类,发送消息,按照上面说的2个条件自行测试
- @SpringBootTest
- public class DeadPublisherTest {
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Test
- public void publish(){
- rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE, "normal.abc", "dead letter!");
- System.out.println("发送成功!");
- }
-
- @Test
- public void publishExpire(){
- rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE, "normal.abc", "dead letter!", message -> {
- message.getMessageProperties().setExpiration("5000");
- return message;
- });
- System.out.println("发送成功!");
- }
-
- }
4)、设置消息长度可以在RabbitMQ的控制台设置也可以在代码中声明队列的时候设置!
总结:
死信队列就是普通队列中一些设置项的兜底,可以把其当成一个正常的队列
RabbitMQ的整个发送消费结构如下图:
图中红色椭圆就是消息在整个发送过程中容易丢失的点,下面详细讲讲
(1)第一个点消息很有可能发出去了但是没有到达交换机就消失了,比如网络不好、比如交换机故障了没运行。解决方式是采用某些机制保证消息一定到达交换机(confirm机制,return机制这两种都可以,另一个是同步机制,没必要,原因在于我用MQ就是为了异步,解耦。)
- channel.confirmSelect();
-
- // 设置confirms异步回调
- channel.addConfirmListener(new ConfirmListener() {
- @Override
- public void handleAck(long deliveryTag, boolean multiple) throws IOException {
- System.out.println("消息成功的发送到Exchange");
- }
-
- @Override
- public void handleNack(long deliveryTag, boolean multiple) throws IOException {
- System.out.println("消息没有发送到Exchange, 尝试重试或者保存到数据库做其他补偿操作!");
- }
- });
进入handleAck代表消息一定到了交换机,反之Nack代表失败!
(2)第二个点是交换机出问题,比如发消息的时候服务重启了,然后服务运行完毕后发现全没了?解决方式是持久化交换机,可以在声明交换机的时候设置其durable属性
(3)第三个点是队列,同交换机,队列也有可能服务重启后队列都没了、也有可能队列里面的消息没了。所以这里我们既要设置队列的持久化(声明队列时的durable属性),也要持久化我们的消息
DeliveryMode设置消息持久化,1代表不持久化,2代表持久化
- AMQP.BasicProperties props = new AMQP.BasicProperties()
- .builder()
- .deliveryMode(2)
- .build();
(4)第四个点保证消费者正常消费消息。这点简单,也就是消费者受到消息后通知一下MQ就行,这里采用ack的方式,可以自动ack,手动ack,一般情况都是手动ack,毕竟会执行一段业务后再返回ack;
经过以上四个步骤,消息就非常可靠了;
集群搭建:
既然要保证高可用,大多数都是多机集群;单机多节点可以用于个人学习阶段,这里我将用两台阿里云的ecs模拟多节点集群的部署。
- docker run -d --hostname rabbitmq --add-host="rabbitmq":xxx.93.18.250 --add-host="tb-rabbitmq":xxx.40.130.124 --name myrabbitmq1 --restart=always -p "4369:4369" -p "5672:5672" -p "15672:15672" -p "25672:25672" -v /opt/rabbitmq:/var/lib/rabbitmq:z -e RABBITMQ_ERLANG_COOKIE='rabbitClusterCookie' rabbitmq:3.6.15-management
-
-
- docker run -d --hostname tb-rabbitmq --add-host="rabbitmq":xxx.93.18.250 --add-host="tb-rabbitmq":xxx.40.130.124 --name myrabbitmq2 --restart=always -p "4369:4369" -p "5672:5672" -p "15672:15672" -p "25672:25672" -v /opt/rabbitmq:/var/lib/rabbitmq:z -e RABBITMQ_ERLANG_COOKIE='rabbitClusterCookie' rabbitmq:3.6.15-management
- $ docker exec -it tb-rabbitmq bash
- $ rabbitmqctl stop_app
- $ rabbitmqctl reset
- $ rabbitmqctl join_cluster rabbit@rabbitmq
- $ rabbitmqctl start_app
- # 查看集群信息
- $ rabbitmqctl cluster_status
上面步骤容易出现的错误:
(1)云服务器一定要在安全组开放对应的端口,如果是虚拟机将防火墙关掉;
(2)保证双方节点能够互相 PING 通
(3)--add-host 不能掉,这个是保证双方docker节点能够互相找到的配置,相当于windows里面的hosts文件配置ip,域名。
搭建好后去访问(随便哪一个),会得到如下图,就说明ok了。
然后在xxx.40.130.124控制台创建一个队列a,此时发送一个消息,之后关闭这个mq节点,你会发现,另一个节点中队列a消失了。说明了什么?说明了并不是高可用啊!那我这个集群还能干嘛,所以现在我们得制定高可用战略:
使用mirror镜像模式,同步集群之间所有数据,保证高可用。
如下图步骤,创建mirror就ok
然后再去控制台创建一个队列,重复上面关闭节点的步骤,发现此时队列并没有消失,则说明镜像模式设置成功!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。