赞
踩
最近在写Android应用时,使用到了RabbitMQ实现评论消息与聊天消息的实时收发,这两种消息分别通过两个阻塞线程来接收,两个线程监听同一个队列(每个用户拥有一个队列),根据发送消息的exchange进行区分。在接收消息并保存至本地数据库后会手动应答,阻塞线程销毁时会关闭Connection连接。代码如下:
@Override private void consumeMessage(final String queueName){ Thread thread = new Thread() { @Override public void run() { super.run(); try { connection = factory.newConnection(); channel=connection.createChannel(); } catch (Exception e){ e.printStackTrace(); } while (true) { //阻塞线程,持续监听RabbitMQ消息,注意:Service销毁并不会销毁该线程 try { channel.basicConsume(queueName, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { Log.e("TAG", "handleDelivery()方法执行"); String messageJson = new String(body); String exchange = envelope.getExchange(); HashMap<String, Object> headers = (HashMap<String, Object>) properties.getHeaders(); Object obj = headers.get("messageType"); String messageType = String.valueOf(obj); Message msg = handler.obtainMessage(); if (exchange.equals("myforum_interaction_exchange")){ if ("praise".equals(messageType)){ //点赞消息 Log.e("TAG", "监听到点赞消息,Json格式为"+messageJson); msg.what=1; msg.obj=messageJson; handler.sendMessage(msg); }else if ("comment".equals(messageType)){ //评论消息 Log.e("TAG", "监听到评论消息,Json格式为:"+messageJson); msg.what=2; msg.obj=messageJson; handler.sendMessage(msg); } //消息应答应该在if判断里面,否则可能会应答聊天消息,造成消息丢失 channel.basicAck(envelope.getDeliveryTag(), false); } } }); sleep(500); if (isServiceDestroy) { //监听Service销毁标志位,当Service销毁时跳出阻塞循环,线程随之结束 if (isThreadCompleteTheLastWork){ try { if (channel!=null){ channel.close(); } if (connection!=null){ connection.close(); } } catch (Exception e) { e.printStackTrace(); } break; }else { isThreadCompleteTheLastWork=true; } } } catch (Exception e) { e.printStackTrace(); } } } }; thread.start(); }
private void consumeMessage(final String queueName){ Thread thread = new Thread() { @Override public void run() { super.run(); try { connection = factory.newConnection(); channel=connection.createChannel(); } catch (Exception e){ e.printStackTrace(); } while (true) { //阻塞线程,持续监听RabbitMQ消息,注意:Service销毁并不会销毁该线程 if (isServiceDestroy) { //监听Service销毁标志位,当Service销毁时跳出阻塞循环,线程随之结束 try { if (channel!=null){ channel.close(); } if (connection!=null){ connection.close(); } } catch (Exception e) { e.printStackTrace(); } break; } try { channel.basicConsume(queueName, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String exchange = envelope.getExchange(); if ("myforum_communication_exchange".equals(exchange)){ String chatMessageJson = new String(body); Message message = handler.obtainMessage(); message.obj=chatMessageJson; message.what=1; handler.sendMessage(message); channel.basicAck(envelope.getDeliveryTag(), false); } } }); sleep(500); } catch (Exception e) { e.printStackTrace(); } } } }; thread.start(); }
但是,RabbitMQ Manager界面时不时就会显示评论消息Unacked,Android端表现为收不到消息,没有UI显示,且重启后便能接收到消息 (RabbitMQ未应答的消息在断开连接后会重新入队)
由于两个阻塞线程监听同一队列,它们会竞争处理消息,当接收聊天消息的线程获取到评论消息时,虽然会进入handleDelivery()方法进行消费,但是方法中检查到该消息并不是来自聊天exchange,不会应答。也就是说消息虽然被送达到客户端,但是被不对应的消费者线程处理而被丢弃,最终造成未应答。
将两个阻塞线程合二为一,即一个消费者线程监听一个队列,在线程内部通过逻辑对消息进行区分
private void consumeMessage(final String queueName){ Thread thread = new Thread() { @Override public void run() { super.run(); try { connection = factory.newConnection(); channel=connection.createChannel(); } catch (Exception e){ e.printStackTrace(); } while (true) { //阻塞线程,持续监听RabbitMQ消息,注意:Service销毁并不会销毁该线程 try { channel.basicConsume(queueName, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { Message msg = handler.obtainMessage(); String messageJson = new String(body); String exchange = envelope.getExchange(); if ("myforum_communication_exchange".equals(exchange)){ //聊天消息 String chatMessageJson = new String(body); Message message = handler.obtainMessage(); message.obj=chatMessageJson; message.what=3; handler.sendMessage(message); channel.basicAck(envelope.getDeliveryTag(), false); }else { //互动消息 HashMap<String, Object> headers = (HashMap<String, Object>) properties.getHeaders(); Object obj = headers.get("messageType"); String messageType = String.valueOf(obj); if (exchange.equals("myforum_interaction_exchange")){ if ("praise".equals(messageType)){ //点赞消息 msg.what=1; msg.obj=messageJson; handler.sendMessage(msg); }else if ("comment".equals(messageType)){ //评论消息 msg.what=2; msg.obj=messageJson; handler.sendMessage(msg); } //消息应答应该在if判断里面,否则可能会应答聊天消息,造成消息丢失 channel.basicAck(envelope.getDeliveryTag(), false); } } } }); sleep(500); if (isServiceDestroy) { //监听Service销毁标志位,当Service销毁时跳出阻塞循环,线程随之结束 try { if (channel!=null){ channel.close(); } if (connection!=null){ connection.close(); } } catch (Exception e) { e.printStackTrace(); } break; } } catch (Exception e) { e.printStackTrace(); } } } }; thread.start(); }
两个消费者监听同一队列时会存在竞争,即使调用了消费消息的回调方法,也并不代表者消费成功,RabbitMQ只会在消息应答后才会确认消息消费成功。
不要图省事,直接使用消息自动应答,自动应答会在消费者调用回调方法时进行应答。假如在本案例使用了自动应答,那么在聊天消费者线程接收到了消息的那一刻,RabbitMQ就会接收到消息应答从而删除消息,但是该消息在客户端已被丢弃,从而造成消息丢失。而使用手动应答,即使聊天消费者线程丢弃了消息,RabbitMQ由于未接收到消息应答,会将消息重新入队直到接收到消息应答为止 (即聊天消费者线程接收到聊天消息并应答)。换句话说,本案例未修改前虽然有UI上的不友好(多次重新启动才可能接收到消息),但是保证了消息不丢失
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。