当前位置:   article > 正文

【java】RabbitMQ-路由模式RoutingKey

【java】RabbitMQ-路由模式RoutingKey

生产者发送消息到交换机并指定一个路由key,消费者队列绑定到交换机时要制定路由key(key匹配就能接受消息,key不匹配就不能接受消息)

例如:我们可以把路由key设置为insert ,那么消费者队列key指定包含insert才可以接收消息,消费者队列key定义为update或者delete就不能接收消息。很好的控制了更新,插入和删除的操作。

采用交换机direct模式

生产者代码如下:

  1. package com.rabbitmqdemo.Producer;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmqdemo.Utils.MQConnectionUtils;
  5. import java.util.concurrent.TimeoutException;
  6. import java.io.IOException;
  7. public class DirectProducter {
  8. private static final String EXCHANGE_NAME = "direct_exchange";
  9. public static void main(String[] args) throws IOException, TimeoutException {
  10. // 1.创建新的连接
  11. Connection connection = MQConnectionUtils.newConnection();
  12. // 2.创建通道
  13. Channel channel = connection.createChannel();
  14. // 3.绑定的交换机 参数1交互机名称 参数2 exchange类型
  15. channel.exchangeDeclare(EXCHANGE_NAME, "direct");
  16. String routingKey = "info";
  17. String msg = "direct_exchange_msg" + routingKey;
  18. // 4.发送消息
  19. channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
  20. System.out.println("生产者发送msg:" + msg);
  21. // // 5.关闭通道、连接
  22. // channel.close();
  23. // connection.close();
  24. // 注意:如果消费没有绑定交换机和队列,则消息会丢失
  25. }
  26. }

 

短信消费者代码:

  1. package com.rabbitmqdemo.consumer;
  2. import com.rabbitmq.client.*;
  3. import com.rabbitmqdemo.Utils.MQConnectionUtils;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. public class DirectSMSConsumer {
  7. private static final String QUEUE_NAME = "consumer_direct_sms";
  8. private static final String EXCHANGE_NAME = "direct_exchange";
  9. public static void main(String[] args) throws IOException, TimeoutException {
  10. System.out.println("短信消费者启动");
  11. // 1.创建新的连接
  12. Connection connection = MQConnectionUtils.newConnection();
  13. // 2.创建通道
  14. Channel channel = connection.createChannel();
  15. // 3.消费者关联队列
  16. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  17. // 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
  18. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
  19. DefaultConsumer consumer = new DefaultConsumer(channel) {
  20. @Override
  21. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
  22. throws IOException {
  23. String msg = new String(body, "UTF-8");
  24. System.out.println("消费者获取生产者消息:" + msg);
  25. }
  26. };
  27. // 5.消费者监听队列消息
  28. channel.basicConsume(QUEUE_NAME, true, consumer);
  29. }
  30. }

 

邮件消费者代码:

  1. package com.rabbitmqdemo.consumer;
  2. import com.rabbitmq.client.*;
  3. import com.rabbitmqdemo.Utils.MQConnectionUtils;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. public class DirectEmailConsumer {
  7. private static final String QUEUE_NAME = "consumer_direct_email";
  8. private static final String EXCHANGE_NAME = "direct_exchange";
  9. public static void main(String[] args) throws IOException, TimeoutException {
  10. System.out.println("邮件消费者启动");
  11. // 1.创建新的连接
  12. Connection connection = MQConnectionUtils.newConnection();
  13. // 2.创建通道
  14. Channel channel = connection.createChannel();
  15. // 3.消费者关联队列
  16. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  17. // 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
  18. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
  19. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
  20. DefaultConsumer consumer = new DefaultConsumer(channel) {
  21. @Override
  22. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
  23. throws IOException {
  24. String msg = new String(body, "UTF-8");
  25. System.out.println("消费者获取生产者消息:" + msg);
  26. }
  27. };
  28. // 5.消费者监听队列消息
  29. channel.basicConsume(QUEUE_NAME, true, consumer);
  30. }
  31. }

 

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/507487
推荐阅读
相关标签
  

闽ICP备14008679号