赞
踩
发10条消息,到固定长度为6的普通队列,普通队列有哪些消息?死信队列有哪些消息?
普通队列满了之后继续添加消息,前面的消息会挤出到死信队列中。如: 普通队列长度为6,添加10个进去后,先进去的4个会到死信队列中,最后6个消息还在普通队列中。
发送的消息 | 普通队列的消息 | 死信队列的消息 |
9876543210 | 987654 | 3210 |
- public class RabbitmqUtils {
- private static Logger logger = LoggerFactory.getLogger(RabbitmqUtils.class);
-
- // 死信相关队列
- public static String NORMAL_EXCHANGE_NAME = "normal_exchange";
- public static String NORMAL_EXCHANGE_ROUTING_KEY = "normal_routing_key";
- public static String NORMAL_EXCHANGE_QUEUE_NAME = "normal_queue";
- public static String DEAD_EXCHANGE_NAME = "dead_exchange";
- public static String DEAD_EXCHANGE_QUEUE_NAME = "dead_queue";
- public static String DEAD_EXCHANGE_ROUTING_KEY = "dead_routing_key";
-
- // 队列持久化
- public static boolean DURABLE = true;
- public static Connection connection;
- public static Channel channel;
-
- static {
- logger.info("getChannel begin...");
- // 创建链接工厂
- ConnectionFactory factory = new ConnectionFactory();
- logger.info("getChannel factory:{}", factory.toString());
-
- factory.setHost("192.168.6.8");
- factory.setPort(5672);
- factory.setUsername("admin");
- factory.setPassword("123");
-
- // 创建链接
- try {
- connection = factory.newConnection();
- } catch (IOException e) {
- throw new RuntimeException(e);
- } catch (TimeoutException e) {
- throw new RuntimeException(e);
- }
- logger.info("getChannel connection:{}", connection.toString());
-
- try {
- channel = connection.createChannel();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- logger.info("getChannel channel:{}", channel.toString());
- logger.info("getChannel success!");
- }
-
- /**
- * 死信队列实战
- *
- * @return
- * @throws IOException
- */
- public static Channel createDeadExchangeAndQueue() throws IOException {
- // 删除已存在的交换机
- channel.exchangeDelete(RabbitmqUtils.NORMAL_EXCHANGE_NAME);
- channel.exchangeDelete(RabbitmqUtils.DEAD_EXCHANGE_NAME);
- // 删除已存在的队列
- channel.queueDelete(RabbitmqUtils.NORMAL_EXCHANGE_QUEUE_NAME);
- channel.queueDelete(RabbitmqUtils.DEAD_EXCHANGE_QUEUE_NAME);
-
- // 创建普通交换机
- channel.exchangeDeclare(
- RabbitmqUtils.NORMAL_EXCHANGE_NAME,
- BuiltinExchangeType.DIRECT,
- RabbitmqUtils.DURABLE);
- // 正常队列绑定死信队列信息并设置队列的长度
- HashMap<String, Object> arguments = new HashMap<>();
- arguments.put("x-dead-letter-exchange",RabbitmqUtils.DEAD_EXCHANGE_NAME);
- arguments.put("x-dead-letter-routing-key",RabbitmqUtils.DEAD_EXCHANGE_ROUTING_KEY);
- arguments.put("x-max-length",6);
-
- // 创建普通队列
- channel.queueDeclare(
- RabbitmqUtils.NORMAL_EXCHANGE_QUEUE_NAME,
- RabbitmqUtils.DURABLE,
- false,false,arguments);
- // 将不同队列绑定到普通交换机上
- channel.queueBind(
- RabbitmqUtils.NORMAL_EXCHANGE_QUEUE_NAME,
- RabbitmqUtils.NORMAL_EXCHANGE_NAME,
- RabbitmqUtils.NORMAL_EXCHANGE_ROUTING_KEY);
-
-
- // 创建死信交换机
- channel.exchangeDeclare(
- RabbitmqUtils.DEAD_EXCHANGE_NAME,
- BuiltinExchangeType.DIRECT,
- RabbitmqUtils.DURABLE);
- // 创建死信队列
- channel.queueDeclare(
- RabbitmqUtils.DEAD_EXCHANGE_QUEUE_NAME,
- RabbitmqUtils.DURABLE,
- false,false,null);
- // 将死信队列绑定到交换机上
- channel.queueBind(
- RabbitmqUtils.DEAD_EXCHANGE_QUEUE_NAME,
- RabbitmqUtils.DEAD_EXCHANGE_NAME,
- RabbitmqUtils.DEAD_EXCHANGE_ROUTING_KEY);
- return channel;
- }
-
- }
- public class DeadProducer {
- private static Logger logger = LoggerFactory.getLogger(DeadProducer.class);
- public static void main(String[] args) throws IOException {
-
- Channel channel = RabbitmqUtils.createDeadExchangeAndQueue();
-
- // 发送消息
- for (int i=0; i< 10; i++) {
- String message = "info"+i;
- channel.basicPublish(
- RabbitmqUtils.NORMAL_EXCHANGE_NAME,
- RabbitmqUtils.NORMAL_EXCHANGE_ROUTING_KEY,
- null,
- message.getBytes());
- logger.info("发送的消息内容为:{}",message);
- }
-
- }
- }
生产者输出日志:
- 发送的消息内容为:info0
- 发送的消息内容为:info1
- 发送的消息内容为:info2
- 发送的消息内容为:info3
- 发送的消息内容为:info4
- 发送的消息内容为:info5
- 发送的消息内容为:info6
- 发送的消息内容为:info7
- 发送的消息内容为:info8
- 发送的消息内容为:info9
生产者发送消息后的结果
- public class OrdinaryConsumer1 {
-
- private static Logger logger = LoggerFactory.getLogger(OrdinaryConsumer1.class);
-
- public static void main(String[] args) throws IOException {
- Channel channel = RabbitmqUtils.channel;
-
- DeliverCallback deliverCallback = (consumerTag, message) ->{
- String msg = new String(message.getBody(), "UTF-8");
- logger.info("普通队列接受到的消息是:{}", msg);
- };
- CancelCallback cancelCallback = (consumerTag)->{
- logger.info(consumerTag + "消息被中断");
- };
- channel.basicConsume(
- RabbitmqUtils.NORMAL_EXCHANGE_QUEUE_NAME,
- true,
- deliverCallback,
- cancelCallback);
- }
- }
普通消费者消费的日志:
- 普通队列接受到的消息是:info4
- 普通队列接受到的消息是:info5
- 普通队列接受到的消息是:info6
- 普通队列接受到的消息是:info7
- 普通队列接受到的消息是:info8
- 普通队列接受到的消息是:info9
- public class DeadConsumer2 {
-
- private static Logger logger = LoggerFactory.getLogger(DeadConsumer2.class);
-
- public static void main(String[] args) throws IOException {
- Channel channel = RabbitmqUtils.channel;
-
- DeliverCallback deliverCallback = (consumerTag, message) ->{
- String msg = new String(message.getBody(), "UTF-8");
- logger.info("死信队列接受到的消息是:{}", msg);
- };
- CancelCallback cancelCallback = (consumerTag)->{
- logger.info(consumerTag + "消息被中断");
- };
- channel.basicConsume(
- RabbitmqUtils.DEAD_EXCHANGE_QUEUE_NAME,
- true,
- deliverCallback,
- cancelCallback);
- }
- }
死信消费者消费的日志:
- 死信队列接受到的消息是:info0
- 死信队列接受到的消息是:info1
- 死信队列接受到的消息是:info2
- 死信队列接受到的消息是:info3
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。