当前位置:   article > 正文

RabbitMQ之主题模式(topic)_rabbitmq topic模式

rabbitmq topic模式

RabbitMQ之主题模式(topic)简介

主题(Topic):可理解为消息的key,用于匹配某条消息的生产者和消费者

将路由键和某种匹配模式一起使用

#表示多个

*表示1个

 

代码参考

生产者

  1. /**
  2. * topic 队列
  3. *
  4. * 将路由键和某种匹配模式一起使用
  5. *
  6. * #表示多个 *表示1个
  7. *
  8. * @author zhang
  9. *
  10. */
  11. public class TopicSend {
  12. public static final String TOPIC_NAME = "topic_test";
  13. public static void main(String[] args) throws IOException, TimeoutException {
  14. // 获取链接
  15. Connection connection = ConnectHelp.getConnect();
  16. // 建立通道
  17. Channel channel = connection.createChannel();
  18. // 建立转换机 指定类型
  19. channel.exchangeDeclare(TOPIC_NAME, "topic");
  20. // 数据的定义
  21. String msg = "send msg test &&";
  22. // 发送数据
  23. channel.basicPublish(TOPIC_NAME, "food.add", null, msg.getBytes());
  24. System.out.println("main :" + msg);
  25. // 关闭
  26. channel.close();
  27. connection.close();
  28. }
  29. }

消费者1

  1. /**
  2. * Topic 消费者1
  3. *
  4. * @author zhang
  5. *
  6. */
  7. public class TopicRevc1 {
  8. public static final String TOPIC_NAME = "topic_test";
  9. public static final String QUEUE_NAME = "topic_queue_1";
  10. public static void main(String[] args) throws IOException, TimeoutException {
  11. // 建立连接
  12. Connection connection = ConnectHelp.getConnect();
  13. // 建立通道
  14. final Channel channel = connection.createChannel();
  15. // 声明队列
  16. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  17. // 绑定队列
  18. channel.queueBind(QUEUE_NAME, TOPIC_NAME, "food.select");
  19. // 每次只接受一个数据
  20. channel.basicQos(1);
  21. // 获取数据
  22. DefaultConsumer consumer = new DefaultConsumer(channel) {
  23. @Override
  24. public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
  25. throws IOException {
  26. String msg = new String(body, "utf-8");
  27. System.out.println("TOP1:" + msg);
  28. // 可以接受下一个消息
  29. channel.basicAck(envelope.getDeliveryTag(), false);
  30. }
  31. };
  32. // 监听队列
  33. channel.basicConsume(QUEUE_NAME, false, consumer);
  34. }
  35. }

消费者2

  1. /**
  2. * Topic 消费者1
  3. *
  4. * @author zhang
  5. *
  6. */
  7. public class TopicRevc2 {
  8. public static final String TOPIC_NAME = "topic_test";
  9. public static final String QUEUE_NAME = "topic_queue_2";
  10. public static void main(String[] args) throws IOException, TimeoutException {
  11. // 建立连接
  12. Connection connection = ConnectHelp.getConnect();
  13. // 建立通道
  14. final Channel channel = connection.createChannel();
  15. // 声明队列
  16. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  17. // 绑定队列
  18. channel.queueBind(QUEUE_NAME, TOPIC_NAME, "food.#");
  19. // 每次只接受一个数据
  20. channel.basicQos(1);
  21. // 获取数据
  22. DefaultConsumer consumer = new DefaultConsumer(channel) {
  23. @Override
  24. public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
  25. throws IOException {
  26. String msg = new String(body, "utf-8");
  27. System.out.println("TOP2:" + msg);
  28. // 可以接受下一个消息
  29. channel.basicAck(envelope.getDeliveryTag(), false);
  30. }
  31. };
  32. // 监听队列
  33. channel.basicConsume(QUEUE_NAME, false, consumer);
  34. }
  35. }

 

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号