赞
踩
在项目开发设计过程中,我们经常用到异步推送的场景,比如下单和扣减库存异步执行。常用的异步中间件有mq有很多,我们这里以Rabbitmq为例进行讲解。我们知道只要是涉及异步场景的问题,就有成功失败之分,如果生产者发送消息一直失败要怎么做呢,这里我们就用到了死信队列来实现失败消息的处理。
流程图如下图:
废话不多说直接上代码:
rabbitmq配置类:
package com.mq.test.config; import com.rabbitmq.client.Address; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; /** * Created by Administrator on 2018/6/7. */ public class RabbitmqConfig { private final static String IP = "172.16.11.119"; private final static int PORT = 5672; private final static String USERNAME = "root"; private final static String PASSWORD = "root"; private static Connection connection = null; private static ConnectionFactory factory = null; static { factory = new ConnectionFactory(); Address[] addresses = new Address[]{new Address(IP, PORT)}; factory.setUsername(USERNAME); factory.setPassword(PASSWORD); try { connection = factory.newConnection(addresses); } catch (IOException e) { e.printStackTrace(); } } public static Channel getChannel() throws IOException { if (connection == null) { if (factory != null) { return factory.newConnection().createChannel(); }else{ return null; } } return connection.createChannel(); } }
生产者类:
package com.mq.test; import com.mq.test.config.RabbitmqConfig; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; import java.io.IOException; /** * Created by Administrator on 2018/6/7. */ public class MqProducerTest { private final static String DLX_EXCHANGE_NAME = "dlx_exchange"; private final static String NORMAL_EXCHANGE_NAME = "normal_exchange"; // private final static String DLX_ROUTINGKEY= "dlx"; private final static String NORMAL_ROUTINGKEY= "normal"; public static void main(String[] args) throws IOException { Channel channel = RabbitmqConfig.getChannel(); channel.exchangeDeclare(NORMAL_EXCHANGE_NAME,"direct",true); channel.exchangeDeclare(DLX_EXCHANGE_NAME,"direct",true); channel.basicPublish(NORMAL_EXCHANGE_NAME,NORMAL_ROUTINGKEY, MessageProperties.PERSISTENT_TEXT_PLAIN,"dlx test...".getBytes()); } } 消费者类:
package com.mq.test; import com.mq.test.config.RabbitmqConfig; import com.rabbitmq.client.*; import java.io.IOException; import java.util.HashMap; import java.util.Map; /** * Created by Administrator on 2018/6/7. */ public class MqConsumerTest { private final static String DLX_EXCHANGE_NAME = "dlx_exchange"; private final static String NORMAL_EXCHANGE_NAME = "normal_exchange"; private final static String DLX_ROUTINGKEY = "dlx"; private final static String NORMAL_ROUTINGKEY = "normal"; private final static String DLX_QUEUE = "dlx"; private final static String NORMAL_QUEUE = "normal"; public static void main(String[] args) throws IOException { final Channel channel = RabbitmqConfig.getChannel(); //创建正常队列和绑定,带参数的 Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME); arguments.put("x-dead-letter-routing-key", DLX_ROUTINGKEY); arguments.put("x-message-ttl",6000);//如果想实现延迟队列的功能可以设置,只要消息在队列达到设定的这个时间没有被消费,就自动发送到死信队列,注意这里是消息未被消费,而不是未被ack!!! channel.queueDeclare(NORMAL_QUEUE, true, false, false, arguments); channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE_NAME, NORMAL_ROUTINGKEY); //创建死信队列和绑定 channel.queueDeclare(DLX_QUEUE, true, false, false, null); channel.queueBind(DLX_QUEUE, DLX_EXCHANGE_NAME, DLX_ROUTINGKEY); channel.basicQos(64); channel.basicConsume(NORMAL_QUEUE, false, "myConsumerTag", new Consumer() { @Override public void handleConsumeOk(String s) { } @Override public void handleCancelOk(String s) { } @Override public void handleCancel(String s) throws IOException { } @Override public void handleShutdownSignal(String s, ShutdownSignalException e) { } @Override public void handleRecoverOk(String s) { } @Override public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = basicProperties.getContentType(); long deliveryTag = envelope.getDeliveryTag(); try { Thread.sleep(8000); } catch (InterruptedException e) { e.printStackTrace(); } // channel.basicReject(deliveryTag,false); } }); } } 延时或是死信队列实现:
package com.mq.test; import com.mq.test.config.RabbitmqConfig; import com.rabbitmq.client.*; import java.io.IOException; /** * Created by Administrator on 2018/6/7. */ public class MqdeplayQueueTest { private final static String DLX_QUEUE = "dlx"; public static void main(String[] args) throws IOException { final Channel channel = RabbitmqConfig.getChannel(); channel.basicConsume(DLX_QUEUE, true, "myDlxConsumer", new Consumer() { @Override public void handleConsumeOk(String s) { } @Override public void handleCancelOk(String s) { } @Override public void handleCancel(String s) throws IOException { } @Override public void handleShutdownSignal(String s, ShutdownSignalException e) { } @Override public void handleRecoverOk(String s) { } @Override public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException { long deliveryTag = envelope.getDeliveryTag(); System.out.println("这是延迟队列收到的消息:"+new String(bytes)); } }); } }
这就是全部的代码,欢迎大家交流!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。