当前位置:   article > 正文

Rabbitmq 核心知识、集群部署、springboot集成MQ集群实现高可用_rabbitmq 高可用springboot

rabbitmq 高可用springboot

MQ核心知识如下:

1、MQ的基本使用(hello world ,pubsub , topic , header , direct , rpc ),可以先去官网或者相关平台学习如何使用,这里跳过;

2、topic 和 header 方式的区别

可以说topic 包含 header , 所以header 粒度更小, 粒度小代表更灵活 ; 所以当业务是灵活多变的就用header, 如果没啥特殊要求,就直接用topic;

3、RabbitMQ死信队列

所谓“死信”, 从文字角度理解就是“消息死了,无声了”;rabbitmq作为一个消息中间件,什么情况下存在“需要消息死了,无声了”的场景;----- 消息延时,延时队列;说的没错,比如若有需求说,若订单一只未付款延时15分钟将订单自动取消、微信定时回复、钉钉定时日志等等都可以用延时队列。

消息被塞入到死信队列有2种方式:

  • 消费者拒绝消费或者NACK的方式驳回消息、超时未消费队列消息(包括发送消息时设置生存时间 / 队列消息生存时间)
  • 生产者生产的消息大于队列设置的最大长度,或者消息载体大于设置的最大长度

上述情况消息不会回到原队列,而是进入到死信队列。

死信队列实现方式:

1)、首先定义死信队列的交换机 exchange 和队列 Queue , 然后再将其与正常的交换机绑定;

  1. @Configuration
  2. public class DeadLetterConfig {
  3. public static final String NORMAL_EXCHANGE = "normal-exchange" ;
  4. public static final String NORMAL_QUEUE = "normal-queue";
  5. public static final String NORMAL_ROUTING_KEY = "normal.#";
  6. public static final String DEAD_EXCHANGE = "dead-exchange" ;
  7. public static final String DEAD_QUEUE = "dead-queue";
  8. public static final String DEAD_ROUTING_KEY = "dead.#";
  9. @Bean
  10. public Exchange normalExchange(){
  11. return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE).build();
  12. }
  13. @Bean
  14. public Queue normalQueue(){
  15. return QueueBuilder.durable(NORMAL_QUEUE)
  16. .deadLetterExchange(DEAD_EXCHANGE)
  17. .deadLetterRoutingKey("dead.abc")
  18. // .ttl(10000)
  19. .maxLength(1)
  20. .build();
  21. }
  22. @Bean
  23. public Binding normalBinding(Queue normalQueue, Exchange normalExchange){
  24. return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY).noargs();
  25. }
  26. @Bean
  27. public Exchange deadExchange(){
  28. return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).build();
  29. }
  30. @Bean
  31. public Queue deadQueue(){
  32. return QueueBuilder.durable(DEAD_QUEUE).build();
  33. }
  34. @Bean
  35. public Binding deadBinding(Queue deadQueue, Exchange deadExchange){
  36. return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
  37. }
  38. }

2)、定义消费者监听消息

  1. @Component
  2. public class DeadListener {
  3. @RabbitListener(queues = DeadLetterConfig.NORMAL_QUEUE)
  4. public void consume(String msg, Channel channel, Message message) throws IOException {
  5. System.out.println("接收到normal队列的消息为:" + msg);
  6. // channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
  7. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
  8. }

3)、定义测试类,发送消息,按照上面说的2个条件自行测试

  1. @SpringBootTest
  2. public class DeadPublisherTest {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. @Test
  6. public void publish(){
  7. rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE, "normal.abc", "dead letter!");
  8. System.out.println("发送成功!");
  9. }
  10. @Test
  11. public void publishExpire(){
  12. rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE, "normal.abc", "dead letter!", message -> {
  13. message.getMessageProperties().setExpiration("5000");
  14. return message;
  15. });
  16. System.out.println("发送成功!");
  17. }
  18. }

4)、设置消息长度可以在RabbitMQ的控制台设置也可以在代码中声明队列的时候设置!

总结:

死信队列就是普通队列中一些设置项的兜底,可以把其当成一个正常的队列

4、如何保证RabbitMQ消息的可靠性?

RabbitMQ的整个发送消费结构如下图:

图中红色椭圆就是消息在整个发送过程中容易丢失的点,下面详细讲讲

(1)第一个点消息很有可能发出去了但是没有到达交换机就消失了,比如网络不好、比如交换机故障了没运行。解决方式是采用某些机制保证消息一定到达交换机(confirm机制,return机制这两种都可以,另一个是同步机制,没必要,原因在于我用MQ就是为了异步,解耦。)

  1. channel.confirmSelect();
  2. // 设置confirms异步回调
  3. channel.addConfirmListener(new ConfirmListener() {
  4. @Override
  5. public void handleAck(long deliveryTag, boolean multiple) throws IOException {
  6. System.out.println("消息成功的发送到Exchange");
  7. }
  8. @Override
  9. public void handleNack(long deliveryTag, boolean multiple) throws IOException {
  10. System.out.println("消息没有发送到Exchange, 尝试重试或者保存到数据库做其他补偿操作!");
  11. }
  12. });

进入handleAck代表消息一定到了交换机,反之Nack代表失败!

(2)第二个点是交换机出问题,比如发消息的时候服务重启了,然后服务运行完毕后发现全没了?解决方式是持久化交换机,可以在声明交换机的时候设置其durable属性

(3)第三个点是队列,同交换机,队列也有可能服务重启后队列都没了、也有可能队列里面的消息没了。所以这里我们既要设置队列的持久化(声明队列时的durable属性),也要持久化我们的消息

DeliveryMode设置消息持久化,1代表不持久化,2代表持久化

  1. AMQP.BasicProperties props = new AMQP.BasicProperties()
  2. .builder()
  3. .deliveryMode(2)
  4. .build();

(4)第四个点保证消费者正常消费消息。这点简单,也就是消费者受到消息后通知一下MQ就行,这里采用ack的方式,可以自动ack,手动ack,一般情况都是手动ack,毕竟会执行一段业务后再返回ack;

经过以上四个步骤,消息就非常可靠了;

6、rabbitmq部署集群(单机多节点集群,多机集群)

集群搭建:

既然要保证高可用,大多数都是多机集群;单机多节点可以用于个人学习阶段,这里我将用两台阿里云的ecs模拟多节点集群的部署。

  • 准备两台阿里云ecs服务器(配置无所谓),也可以是两台虚拟机,都安装好对应的linux系统。
  • 两个都安装docker。
  • 执行pull镜像的命令,我这里用的是3.6.15。
  1. docker run -d --hostname rabbitmq --add-host="rabbitmq":xxx.93.18.250 --add-host="tb-rabbitmq":xxx.40.130.124 --name myrabbitmq1 --restart=always -p "4369:4369" -p "5672:5672" -p "15672:15672" -p "25672:25672" -v /opt/rabbitmq:/var/lib/rabbitmq:z -e RABBITMQ_ERLANG_COOKIE='rabbitClusterCookie' rabbitmq:3.6.15-management
  2. docker run -d --hostname tb-rabbitmq --add-host="rabbitmq":xxx.93.18.250 --add-host="tb-rabbitmq":xxx.40.130.124 --name myrabbitmq2 --restart=always -p "4369:4369" -p "5672:5672" -p "15672:15672" -p "25672:25672" -v /opt/rabbitmq:/var/lib/rabbitmq:z -e RABBITMQ_ERLANG_COOKIE='rabbitClusterCookie' rabbitmq:3.6.15-management
  • 将xxx.40.130.124这个节点加入到节点xxx.93.18.250
  1. $ docker exec -it tb-rabbitmq bash
  2. $ rabbitmqctl stop_app
  3. $ rabbitmqctl reset
  4. $ rabbitmqctl join_cluster rabbit@rabbitmq
  5. $ rabbitmqctl start_app
  6. # 查看集群信息
  7. $ rabbitmqctl cluster_status
  • 访问节点,密码默认都是guest.

上面步骤容易出现的错误:

(1)云服务器一定要在安全组开放对应的端口,如果是虚拟机将防火墙关掉;

(2)保证双方节点能够互相 PING 通

(3)--add-host 不能掉,这个是保证双方docker节点能够互相找到的配置,相当于windows里面的hosts文件配置ip,域名。

搭建好后去访问(随便哪一个),会得到如下图,就说明ok了。

然后在xxx.40.130.124控制台创建一个队列a,此时发送一个消息,之后关闭这个mq节点,你会发现,另一个节点中队列a消失了。说明了什么?说明了并不是高可用啊!那我这个集群还能干嘛,所以现在我们得制定高可用战略:

使用mirror镜像模式,同步集群之间所有数据,保证高可用。

如下图步骤,创建mirror就ok

 然后再去控制台创建一个队列,重复上面关闭节点的步骤,发现此时队列并没有消失,则说明镜像模式设置成功!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/126820?site
推荐阅读
相关标签
  

闽ICP备14008679号