当前位置:   article > 正文

RabbitMQ 确保消息不丢失_only one confirmcallback is supported by each rabb

only one confirmcallback is supported by each rabbittemplate
  • 生产端保证消息不丢失

org.springframework.amqp.rabbit.core.RabbitTemplate#convertAndSend方法

  1. public void convertAndSend(String exchange, String routingKey, final Object message,
  2. final MessagePostProcessor messagePostProcessor, CorrelationData correlationData) throws AmqpException {
  3. Message messageToSend = convertMessageIfNecessary(message);
  4. messageToSend = messagePostProcessor.postProcessMessage(messageToSend);
  5. send(exchange, routingKey, messageToSend, correlationData);
  6. }

参数CorrelationData 的属性

  1. public class CorrelationData {
  2. private String id;
  3. 。。。。。。。
  4. }

发送的时候会带上自定义的CorrelationData.id     这样rabbitmq服务器回调给客户端的时候会带上这个值。客户端就可以一一对应代码如下

  1. // RabbitTemplate的方法,设置投递给exchange的回调函数
  2. public void setConfirmCallback(ConfirmCallback confirmCallback) {
  3. Assert.state(this.confirmCallback == null || this.confirmCallback == confirmCallback,
  4. "Only one ConfirmCallback is supported by each RabbitTemplate");
  5. this.confirmCallback = confirmCallback;
  6. }
  7. public interface ConfirmCallback {
  8. /**
  9. * Confirmation callback.
  10. * @param correlationData correlation data for the callback.
  11. * @param ack true for ack, false for nack
  12. * @param cause An optional cause, for nack, when available, otherwise null.
  13. */
  14. void confirm(CorrelationData correlationData, boolean ack, String cause);
  15. }
  16. // RabbitTemplate的方法,设置投递的消息从exchange转queue回调的方法
  17. public void setReturnCallback(ReturnCallback returnCallback) {
  18. Assert.state(this.returnCallback == null || this.returnCallback == returnCallback,
  19. "Only one ReturnCallback is supported by each RabbitTemplate");
  20. this.returnCallback = returnCallback;
  21. }
  22. public interface ReturnCallback {
  23. /**
  24. * Returned message callback.
  25. * @param message the returned message.
  26. * @param replyCode the reply code.
  27. * @param replyText the reply text.
  28. * @param exchange the exchange.
  29. * @param routingKey the routing key.
  30. */
  31. void returnedMessage(Message message, int replyCode, String replyText,
  32. String exchange, String routingKey);
  33. }

 发送失败的用定时任务扫描重新投递,需要做一个次数限制。防止无限次重发

  • rabbitmq保证消息不丢失

rabbitmq进行集群部署    队列持久化。

队列进行镜像备份           这样就可以实现 RabbitMQ 的 HA 高可用性

  1. RabbitMQ 的 Cluster 集群模式一般分为两种,普通模式和镜像模式。
  2. 普通模式:默认的集群模式,以两个节点(rabbit01、rabbit02)为例来进行说明。对于 Queue 来说,
  3. 消息实体只存在于其中一个节点 rabbit01(或者 rabbit02),rabbit01 和 rabbit02 两个节点
  4. 仅有相同的元数据,即队列的结构。当消息进入 rabbit01 节点的 Queue 后,consumer
  5. 从 rabbit02 节点消费时,RabbitMQ 会临时在 rabbit01、rabbit02 间进行消息传输,
  6. 把 A 中的消息实体取出并经过 B 发送给 consumer。所以 consumer 应尽量连接每一个节点,
  7. 从中取消息。即对于同一个逻辑队列,要在多个节点建立物理 Queue。否则无论 consumer
  8. 连 rabbit01 或 rabbit02,出口总在 rabbit01,会产生瓶颈。当 rabbit01 节点故障后,
  9. rabbit02 节点无法取到 rabbit01 节点中还未消费的消息实体。如果做了消息持久化,
  10. 那么得等 rabbit01 节点恢复,然后才可被消费;如果没有持久化的话,就会产生消息丢失的现象。
  11. 镜像模式:将需要消费的队列变为镜像队列,存在于多个节点,这样就可以实现 RabbitMQ 的 HA 高可用性。作
  12. 用就是消息实体会主动在镜像节点之间实现同步,而不是像普通模式那样,在 consumer 消费数据时临时读取。
  13. 缺点就是,集群内部的同步通讯会占用大量的网络带宽。
  14. 镜像队列实现了 RabbitMQ 的高可用性(HA),具体的实现策略如下所示:
  15. ha-mode ha-params 功能
  16. all 空 镜像队列将会在整个集群中复制。当一个新的节点加入后,也会在这 个节点上复制一份。
  17. exactly count 镜像队列将会在集群上复制 count 份。如果集群数量少于 count 时候,队列会复制到所有节点上。如果大于 Count 集群,有一个节点 crash 后,新进入节点也不会做新的镜像。
  18. nodes node name 镜像队列会在 node name 中复制。如果这个名称不是集群中的一个,这不会触发错误。如果在这个 node list 中没有一个节点在线,那么这个 queue 会被声明在 client 连接的节点。
  19. 实例列举:
  20. queue_args("x-ha-policy":"all") //定义字典来设置额外的队列声明参数
  21. channel.queue_declare(queue="hello-queue",argument=queue_args)
  22. 如果需要设定特定的节点(以rabbit@localhost为例),再添加一个参数:
  23. queue_args("x-ha-policy":"nodes",
  24. "x-ha-policy-params":["rabbit@localhost"])
  25. channel.queue_declare(queue="hello-queue",argument=queue_args)
  26. 可以通过命令行查看那个主节点进行了同步:
RabbitMQ中有三种类型Exchange,分别是fanout、direct、topic;

  • 消费端保证消息不丢失

队列消费的时候,进行幂等性判断;防止重复投递带来的重复消费

  1. if (success) { //消费成功,则ack;否则消息还会在rabbitmq队列中
  2. channel.basicAck(tag, false);// 消费确认
  3. } else {
  4. channel.basicNack(tag, false, true);
  5. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/1019009
推荐阅读
相关标签
  

闽ICP备14008679号