赞
踩
先执行业务操作,业务操作成功后执行消息发送,消息发送过程通过try catch 方式捕获异常, 在异常处理理的代码块中执行行回滚业务操作或者执行行重发操作等。这是一种最大努力确保的方式, 并无法保证100%绝对可靠,因为这里没有异常并不代表消息就一定投递成功。
boolean result = doBiz();
if(result){
try{
sendMsg();
}catch(Exception e){
//retrySene();
//delaySend();
rollbackBiz();
}
}
另外,可以通过spring.rabbitmq.template.retry.enabled=true 配置开启发送端的重试
没有捕获到异常并不能代表消息就一定投递成功了。
一直到事务提交后都没有异常,确实就说明消息是投递成功了。但是,这种方式在性能方面的开销比较大,一般也不推荐使用。
try{
//将channel设置为事务格式
channel.txSelect();
//发布消息到交换器,routingKey为空
channel.basicPublish(EXCHANGE_NAME, "",null,message.getBytes("UTF-8"));
//提交事务,只有消息成功被Broker接收了才能提交成功
channel.txCommit();
}catch(Exception e){
//事务回滚
channel.txRollback();
}
RabbitMQ后来引入了一种轻量量级的方式,叫发送方确认(publisher confirm)机制。生产者将信 道设置成confirm(确认)模式,一旦信道进入confirm 模式,所有在该信道上面面发布的消息都会被指派 一个唯一的ID(从1 开始),一旦消息被投递到所有匹配的队列之后(如果消息和队列是持久化的,那么 确认消息会在消息持久化后发出),RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一 ID),这样生产者就知道消息已经正确送达了。
RabbitMQ 回传给生产者的确认消息中的deliveryTag 字段包含了确认消息的序号,另外,通过设 置channel.basicAck方法中的multiple参数,表示到这个序号之前的所有消息是否都已经得到了处理 了。生产者投递消息后并不需要一直阻塞着,可以继续投递下一条消息并通过回调方式处理理ACK响 应。如果 RabbitMQ 因为自身内部错误导致消息丢失等异常情况发生,就会响应一条nack(Basic.Nack) 命令,生产者应用程序同样可以在回调方法中处理理该 nack 命令。
Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // Publisher Confirms channel.confirmSelect(); channel.exchangeDeclare(EX_PUBLISHER_CONFIRMS, BuiltinExchangeType.DIRECT); channel.queueDeclare(QUEUE_PUBLISHER_CONFIRMS, false, false, false, null); channel.queueBind(QUEUE_PUBLISHER_CONFIRMS, EX_PUBLISHER_CONFIRMS, QUEUE_PUBLISHER_CONFIRMS); String message = "hello"; channel.basicPublish(EX_PUBLISHER_CONFIRMS, QUEUE_PUBLISHER_CONFIRMS, null,message.getBytes()); try { channel.waitForConfirmsOrDie(5_000); System.out.println("消息被确认:message = " + message); } catch (IOException e) { e.printStackTrace(); System.err.println("消息被拒绝! message = " + message); } catch (InterruptedException e) { e.printStackTrace(); System.err.println("在不是Publisher Confirms的通道上使用该方法"); } catch (TimeoutException e) { e.printStackTrace(); System.err.println("等待消息确认超时! message = " + message); }
waitForConfirm方法有个重载的,可以自定义timeout超时时间,超时后会抛 TimeoutException。类似的有几个waitForConfirmsOrDie方法,Broker端在返回nack(Basic.Nack)之 后该方法会抛出java.io.IOException。需要根据异常类型来做区别处理理, TimeoutException超时是 属于第三状态(无法确定成功还是失败),而返回Basic.Nack抛出IOException这种是明确的失败。上 面的代码主要只是演示confirm机制,实际上还是同步阻塞模式的,性能并不不是太好。
实际上,我们也可以通过“批处理理”的方式来改善整体的性能(即批量量发送消息后仅调用一次 waitForConfirms方法)。正常情况下这种批量处理的方式效率会高很多,但是如果发生了超时或者 nack(失败)后那就需要批量量重发消息或者通知上游业务批量回滚(因为我们只知道这个批次中有消 息没投递成功,而并不知道具体是那条消息投递失败了,所以很难针对性处理),如此看来,批量重发 消息肯定会造成部分消息重复。另外,我们可以通过异步回调的方式来处理Broker的响应。 addConfirmListener 方法可以添加ConfirmListener 这个回调接口,这个 ConfirmListener 接口包含 两个方法:handleAck 和handleNack,分别用来处理 RabbitMQ 回传的 Basic.Ack 和 Basic.Nack。
public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setUri("amqp://root:123456@node1:5672/%2f"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); // 向RabbitMQ服务器发送AMQP命令,将当前通道标记为发送方确认通道 final AMQP.Confirm.SelectOk selectOk = channel.confirmSelect(); channel.queueDeclare("queue.pc", true, false, false, null); channel.exchangeDeclare("ex.pc", "direct", true, false, null); channel.queueBind("queue.pc", "ex.pc", "key.pc"); String message = "hello-"; // 批处理的大小 int batchSize = 10; // 用于对需要等待确认消息的计数 int outstrandingConfirms = 0; for (int i = 0; i < 103; i++) { channel.basicPublish("ex.pc", "key.pc", null, (message + i).getBytes()); outstrandingConfirms++; if (outstrandingConfirms == batchSize) { // 此时已经有一个批次的消息需要同步等待broker的确认消息 // 同步等待 channel.waitForConfirmsOrDie(5_000); System.out.println("消息已经被确认了"); outstrandingConfirms = 0; } } if (outstrandingConfirms > 0) { channel.waitForConfirmsOrDie(5_000); System.out.println("剩余消息已经被确认了"); } channel.close(); connection.close(); }
还可以使用回调方法:
public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setUri("amqp://root:123456@node1:5672/%2f"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); // 向RabbitMQ服务器发送AMQP命令,将当前通道标记为发送方确认通道 final AMQP.Confirm.SelectOk selectOk = channel.confirmSelect(); channel.queueDeclare("queue.pc", true, false, false, null); channel.exchangeDeclare("ex.pc", "direct", true, false, null); channel.queueBind("queue.pc", "ex.pc", "key.pc"); final ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap(); ConfirmCallback clearOutstandingConfirms = new ConfirmCallback() { @Override public void handle(long deliveryTag, boolean multiple) throws IOException { if (multiple) { System.out.println("编号小于等于 " + deliveryTag + " 的消息都已经被确认了"); final ConcurrentNavigableMap<Long, String> headMap = outstandingConfirms.headMap(deliveryTag, true); // 清空outstandingConfirms中已经被确认的消息信息 headMap.clear(); } else { // 移除已经被确认的消息 outstandingConfirms.remove(deliveryTag); System.out.println("编号为:" + deliveryTag + " 的消息被确认"); } } }; // 设置channel的监听器,处理确认的消息和不确认的消息 channel.addConfirmListener(clearOutstandingConfirms, new ConfirmCallback() { @Override public void handle(long deliveryTag, boolean multiple) throws IOException { if (multiple) { // 将没有确认的消息记录到一个集合中 // 此处省略实现 System.out.println("消息编号小于等于:" + deliveryTag + " 的消息 不确认"); } else { System.out.println("编号为:" + deliveryTag + " 的消息不确认"); } } }); String message = "hello-"; for (int i = 0; i < 1000; i++) { // 获取下一条即将发送的消息的消息ID final long nextPublishSeqNo = channel.getNextPublishSeqNo(); channel.basicPublish("ex.pc", "key.pc", null, (message + i).getBytes()); System.out.println("编号为:" + nextPublishSeqNo + " 的消息已经发送成功,尚未确认"); outstandingConfirms.put(nextPublishSeqNo, (message + i)); } // 等待消息被确认 Thread.sleep(10000); channel.close(); connection.close(); }
持久化是提高RabbitMQ可靠性的基础,否则当RabbitMQ遇到异常时(如:重启、断电、停机 等)数据将会丢失。主要从以下几个方面来保障消息的持久性:
public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setUri("amqp://root:123456@node1:5672/%2f"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); // durable:true表示是持久化消息队列 channel.queueDeclare("queue.persistent", true, false, false, null); // 持久化的交换器 channel.exchangeDeclare("ex.persistent", "direct", true, false, null); channel.queueBind("queue.persistent", "ex.persistent", "key.persistent"); final AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .deliveryMode(2) // 表示是持久化消息 .build(); channel.basicPublish("ex.persistent", "key.persistent", properties, // 设置消息的属性,此时消息是持久化消息 "hello world".getBytes()); channel.close(); connection.close(); }
RabbitMQ中的持久化消息都需要写入磁盘(当系统内存不不足时,非持久化的消息也会被刷盘处 理理),这些处理理动作都是在“持久层”中完成的。持久层是一个逻辑上的概念,实际包含两个部分:
[root@rpp 628WB79CIFDYO9LJI6DKMI09L]# pwd
/usr/local/rabbitmq/var/lib/rabbitmq/mnesia/rabbit@rpp/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L
[root@rpp 628WB79CIFDYO9LJI6DKMI09L]# ll
total 20
drwxr-xr-x 2 root root 4096 Oct 28 11:19 msg_store_persistent
drwxr-xr-x 2 root root 4096 Oct 28 11:19 msg_store_transient
drwxr-xr-x 4 root root 4096 Oct 28 18:30 queues
-rw-r--r-- 1 root root 5464 Oct 28 11:19 recovery.dets
$RABBITMQ_HOME/var/lib/rabbitmq/mnesia/rabbit@hostname/msg_stores/vhosts/$VHostId
这个路径下包含 queues、msg_store_persistent、 msg_store_transient 这 3 个目录,这是实际存储消息的位置。其中queues目录中保存着 rabbit_queue_index相关的数据,而msg_store_persistent保存着持久化消息数据, msg_store_transient保存着非非持久化相关的数据。另外,RabbitMQ通过配置queue_index_embed_msgs_below可以根据消息大小决定存储位置, 默认queue_index_embed_msgs_below是4096字节(包含消息体、属性及headers),小于该值的消息 存在rabbit_queue_index中。
如何保证消息被消费者成功消费?
前面我们讲了生产者发送确认机制和消息的持久化存储机制,然而这依然无法完全保证整个过程的可靠性,因为如果消息被消费过程中业务处理失败了但是消息却已经出列了(被标记为已消费了),我们又没有任何重试,那结果跟消息丢失没什么分别。
RabbitMQ在消费端会有Ack机制,即消费端消费消息后需要发送Ack确认报文给Broker端,告知自 己是否已消费完成,否则可能会一直重发消息直到消息过期(AUTO模式)。
这也是我们之前一直在讲的“最终一致性”、“可恢复性” 的基础。 一般而言,我们有如下处理手段:
在电商的秒杀活动中,活动一开始会有大量并发写请求到达服务端,需要对消息进行削峰处理,如何削峰?
当消息投递速度远快于消费速度时,随着时间积累就会出现“消息积压”。消息中间件本身是具备一 定的缓冲能力的,但这个能力是有容量限制的,如果长期运行并没有任何处理,最终会导致Broker崩溃,而分布式系统的故障往往会发生上下游传递,连锁反应那就会很悲剧… 下面我将从多个角度介绍QoS与限流,防止上面的悲剧发生。
3. RabbitMQ中有一种QoS保证机制,可以限制Channel上接收到的未被Ack的消息数量,如果 超过这个数量限制RabbitMQ将不会再往消费端推送消息。这是一种流控手段,可以防止大量 消息瞬时从Broker送达消费端造成消费端巨大压力(甚至压垮消费端)。比较值得注意的是 QoS机制仅对于消费端推模式有效,对拉模式无效。而且不支持NONE Ack模式。执行
channel.basicConsume 方法之前通过 channel.basicQoS 方法可以设置该数量。消息的发 送是异步的,消息的确认也是异步的。在消费者消费慢的时候,可以设置Qos的 prefetchCount,它表示broker在向消费者发送消息的时候,一旦发送了prefetchCount个消 息而没有一个消息确认的时候,就停止发送。消费者确认一个,broker就发送一个,确认两个就发送两个。换句话说,消费者确认多少,broker就发送多少,消费者等待处理的个数永 远限制在prefetchCount个。如果对于每个消息都发送确认,增加了网络流量,此时可以批量确认消息。如果设置了 multiple为true,消费者在确认的时候,比如说id是8的消息确认了,则在8之前的所有消息都 确认了。
生产者往往是希望自己产生的消息能快速投递出去,而当消息投递太快且超过了下游的消费速度时 就容易出现消息积压/堆积,所以,从上游来讲我们应该在生产端应用程序中也加入限流,应急开关等手段,避免超过Broker端的极限承载能力或者压垮下游消费者。
再看看下游,我们期望下游消费端能尽快消费完消息,而且还要防止瞬时大量消息压垮消费端(推模式),我们期望消费端处理速度是最快、最稳定而且还相对均匀(比较理想化)。
提升下游应用的吞吐量和缩短消费过程的耗时,优化主要以下几种方式:
消息可靠传输一般是业务系统接入消息中间件时首要考虑的问题,一般消息中间件的消息传输保障 分为三个层级:
RabbitMQ 支持其中的“最多一次”和“最少一次”。 其中“最少一次”投递实现需要考虑以下这个几个方面的内容:
“最多一次”的方式就无须考虑以上那些方面,生产者随意发送,消费者随意消费,不过这样很难确 保消息不会丢失。
“恰好一次”是RabbitMQ 目前无法保障 的。
考虑这样一种情况,消费者在消费完一条消息之后向RabbitMQ 发送确认Basic.Ack 命令,此时由 于网络断开或者其他原因造成RabbitMQ 并没有收到这个确认命令,那么RabbitMQ 不会将此条消息标 记删除。在重新建立连接之后,消费者还是会消费到这一条消息,这就造成了重复消费。
再考虑一种情况,生产者在使用publisher confirm机制的时候,发送完一条消息等待RabbitMQ返 回确认通知,此时网络断开,生产者捕获到异常情况,为了确保消息可靠性选择重新发送,这样 RabbitMQ 中就有两条同样的消息,在消费的时候消费者就会重复消费。
刚刚我们讲到,追求高性能就无法保证消息的顺序,而追求可靠性那么就可能产生重复消息,从而 导致重复消费…真是应证了那句老话:做架构就是权衡取舍。
RabbitMQ层面有实现“去重机制”来保证“恰好一次”吗?答案是并没有。而且这个在目前主流的消息 中间件都没有实现。
借用淘宝沈洵的一句话:最好的解决办法就是不去解决。当为了在基础的分布式中间件中实现某种 相对不太通用的功能,需要牺牲到性能、可靠性、扩展性时,并且会额外增加很多复杂度,最简单的办 法就是交给业务自己去处理。事实证明,很多业务场景下是可以容忍重复消息的。例如:操作日志收 集,而对一些金融类的业务则要求比较严苛。
一般解决重复消息的办法是,在消费端让我们消费消息的操作具备幂等性。
幂等性问题并不是消息系统独有,而是(分布式)系统中普遍存在的问题。例如:RPC框架调用超 后会重试,HTTP请求会重复发起(用户手抖多点了几下按钮)
幂等(Idempotence)是一个数学上的概念,它是这样定义的:
如果一个函数f(x) 满足:f(f(x)) = f(x),则函数f(x) 满足幂等性。这个概念被拓展到计算机领域,被用来描述一个操作、方法或者服务。
一个幂等操作的特点是,其任意多次执行所产生的影响均与一次执行的影响相同。一个幂等的方法,使用同样的参数,对它进行多次调用和一次调用,对系统产生的影响是一样的。对于幂等的方法,不用担心重复执行会对系统造成任何改变。
业界对于幂等性的一些常见做法:
对于接口请求类的幂等性保证要相对更复杂,我们通常要求上游请求时传递一个类GUID的请求号 (或TOKEN),如果我们发现已经存在了并且上一次请求处理结果是成功状态的(有时候上游的重试请 求是正常诉求,我们不能将上一次异常/失败的处理结果返回或者直接提示“请求异常”,如果这样重试就 变得没意义了)则不继续往下执行,直接返回“重复请求”的提示和上次的处理结果(上游通常是由于请 求超时等未知情况才发起重试的,所以直接返回上次请求的处理结果就好了)。如果请求ID都不存在或 者上次处理结果是失败/异常的,那就继续处理流程,并最终记录最终的处理结果。这个请求序号由上 游自己生成,上游通用需要根据请求参数、时间间隔等因子来生成请求ID。同样也需要利用这个请求ID 做分布式锁的KEY实现排他。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。