赞
踩
org.springframework.amqp.rabbit.core.RabbitTemplate#convertAndSend方法
- public void convertAndSend(String exchange, String routingKey, final Object message,
- final MessagePostProcessor messagePostProcessor, CorrelationData correlationData) throws AmqpException {
- Message messageToSend = convertMessageIfNecessary(message);
- messageToSend = messagePostProcessor.postProcessMessage(messageToSend);
- send(exchange, routingKey, messageToSend, correlationData);
- }
参数CorrelationData 的属性
- public class CorrelationData {
-
- private String id;
- 。。。。。。。
- }
发送的时候会带上自定义的CorrelationData.id 这样rabbitmq服务器回调给客户端的时候会带上这个值。客户端就可以一一对应代码如下
- // RabbitTemplate的方法,设置投递给exchange的回调函数
- public void setConfirmCallback(ConfirmCallback confirmCallback) {
- Assert.state(this.confirmCallback == null || this.confirmCallback == confirmCallback,
- "Only one ConfirmCallback is supported by each RabbitTemplate");
- this.confirmCallback = confirmCallback;
- }
-
-
-
- public interface ConfirmCallback {
-
- /**
- * Confirmation callback.
- * @param correlationData correlation data for the callback.
- * @param ack true for ack, false for nack
- * @param cause An optional cause, for nack, when available, otherwise null.
- */
- void confirm(CorrelationData correlationData, boolean ack, String cause);
-
- }
-
-
- // RabbitTemplate的方法,设置投递的消息从exchange转queue回调的方法
- public void setReturnCallback(ReturnCallback returnCallback) {
- Assert.state(this.returnCallback == null || this.returnCallback == returnCallback,
- "Only one ReturnCallback is supported by each RabbitTemplate");
- this.returnCallback = returnCallback;
- }
-
- public interface ReturnCallback {
-
- /**
- * Returned message callback.
- * @param message the returned message.
- * @param replyCode the reply code.
- * @param replyText the reply text.
- * @param exchange the exchange.
- * @param routingKey the routing key.
- */
- void returnedMessage(Message message, int replyCode, String replyText,
- String exchange, String routingKey);
- }
-
发送失败的用定时任务扫描重新投递,需要做一个次数限制。防止无限次重发
rabbitmq进行集群部署 队列持久化。
队列进行镜像备份 这样就可以实现 RabbitMQ 的 HA 高可用性
- RabbitMQ 的 Cluster 集群模式一般分为两种,普通模式和镜像模式。
-
- 普通模式:默认的集群模式,以两个节点(rabbit01、rabbit02)为例来进行说明。对于 Queue 来说,
- 消息实体只存在于其中一个节点 rabbit01(或者 rabbit02),rabbit01 和 rabbit02 两个节点
- 仅有相同的元数据,即队列的结构。当消息进入 rabbit01 节点的 Queue 后,consumer
- 从 rabbit02 节点消费时,RabbitMQ 会临时在 rabbit01、rabbit02 间进行消息传输,
- 把 A 中的消息实体取出并经过 B 发送给 consumer。所以 consumer 应尽量连接每一个节点,
- 从中取消息。即对于同一个逻辑队列,要在多个节点建立物理 Queue。否则无论 consumer
- 连 rabbit01 或 rabbit02,出口总在 rabbit01,会产生瓶颈。当 rabbit01 节点故障后,
- rabbit02 节点无法取到 rabbit01 节点中还未消费的消息实体。如果做了消息持久化,
- 那么得等 rabbit01 节点恢复,然后才可被消费;如果没有持久化的话,就会产生消息丢失的现象。
- 镜像模式:将需要消费的队列变为镜像队列,存在于多个节点,这样就可以实现 RabbitMQ 的 HA 高可用性。作
- 用就是消息实体会主动在镜像节点之间实现同步,而不是像普通模式那样,在 consumer 消费数据时临时读取。
- 缺点就是,集群内部的同步通讯会占用大量的网络带宽。
- 镜像队列实现了 RabbitMQ 的高可用性(HA),具体的实现策略如下所示:
-
- ha-mode ha-params 功能
- all 空 镜像队列将会在整个集群中复制。当一个新的节点加入后,也会在这 个节点上复制一份。
- exactly count 镜像队列将会在集群上复制 count 份。如果集群数量少于 count 时候,队列会复制到所有节点上。如果大于 Count 集群,有一个节点 crash 后,新进入节点也不会做新的镜像。
- nodes node name 镜像队列会在 node name 中复制。如果这个名称不是集群中的一个,这不会触发错误。如果在这个 node list 中没有一个节点在线,那么这个 queue 会被声明在 client 连接的节点。
- 实例列举:
-
- queue_args("x-ha-policy":"all") //定义字典来设置额外的队列声明参数
- channel.queue_declare(queue="hello-queue",argument=queue_args)
- 如果需要设定特定的节点(以rabbit@localhost为例),再添加一个参数:
-
- queue_args("x-ha-policy":"nodes",
- "x-ha-policy-params":["rabbit@localhost"])
- channel.queue_declare(queue="hello-queue",argument=queue_args)
- 可以通过命令行查看那个主节点进行了同步:
RabbitMQ中有三种类型Exchange,分别是fanout、direct、topic;
队列消费的时候,进行幂等性判断;防止重复投递带来的重复消费
- if (success) { //消费成功,则ack;否则消息还会在rabbitmq队列中
-
- channel.basicAck(tag, false);// 消费确认
- } else {
- channel.basicNack(tag, false, true);
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。