当前位置:   article > 正文

RabbitMQ中五种常用的工作模式_rabbitmq消费模式

rabbitmq消费模式

1、简单队列模式(Simple Queue)

        只包含一个生产者以及一个消费者,生产者Producer将消息发送到队列中,消费者Consumer从该队列中接受消息,简单来说就是1v1的模式。

 上图中,P是我们的生产者,C是我们的消费者。

  1. /**
  2. * @description 获取RabbitMQ的连接工具类
  3. */
  4. public class MQConnecitonUtils {
  5. private static ConnectionFactory connectionFactory;
  6. static {
  7. connectionFactory = new ConnectionFactory();
  8. //我们把重量级资源通过单例模式加载
  9. connectionFactory.setHost("127.0.0.1");
  10. connectionFactory.setPort(5672);
  11. connectionFactory.setUsername("guest");
  12. connectionFactory.setPassword("guest");
  13. //设置虚拟主机
  14. connectionFactory.setVirtualHost("/test");
  15. }
  16. public static Connection getConnection() {
  17. try {
  18. return connectionFactory.newConnection();
  19. } catch (Exception e) {
  20. e.printStackTrace();
  21. }
  22. return null;
  23. }
  24. //定义关闭通道和关闭连接工具方法
  25. public static void closeConnectionAndChanel(Channel channel, Connection conn) {
  26. try {
  27. if (channel != null) {
  28. channel.close();
  29. }
  30. if (conn != null) {
  31. conn.close();
  32. }
  33. } catch (Exception e) {
  34. e.printStackTrace();
  35. }
  36. }
  37. }
【消息生产者】
  1. /**
  2. * TODO 描述:消费生产者
  3. */
  4. public class ProducerSIMPLE {
  5. private static final String SIMPLE_QUEUE_NAME = "MQ_SIMPLE_QUEUE";
  6. private static final String SIMPLE_QUEUE_MESSAGE = "简单消息123123!" + new Date();
  7. public static void main(String[] args) throws IOException {
  8. //获取MQ连接
  9. Connection connection = MQConnecitonUtils.getConnection();
  10. //创建通道 从连接中获取Channel通道对象
  11. Channel channel = connection.createChannel();
  12. //创建Queue队列
  13. //参数1:队列名称,参数2:是否持久化,参数3:是否独占队列 参数4:是否自动删除 参数5:其他属性
  14. channel.queueDeclare(SIMPLE_QUEUE_NAME, false, false, false, null);
  15. //发送消息到队列MQ_SIMPLE_QUEUE
  16. //basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
  17. channel.basicPublish("", SIMPLE_QUEUE_NAME, null, SIMPLE_QUEUE_MESSAGE.getBytes("UTF-8"));
  18. System.out.println("消息已发送!");
  19. MQConnecitonUtils.closeConnectionAndChanel(channel, connection);
  20. }
  21. }
【运行结果】

      再查看RabbitMQ控制台:

【消费者】
  1. /**
  2. * TODO 描述:1v1
  3. */
  4. public class ConsumerSIMPLE {
  5. private static final String SIMPLE_QUEUE_NAME = "MQ_SIMPLE_QUEUE";
  6. public static void main(String[] args) {
  7. //获取MQ连接对象
  8. Connection connection = MQConnecitonUtils.getConnection();
  9. Channel channel;
  10. try {
  11. //创建消息通道对象
  12. channel = connection.createChannel();
  13. //声明queue队列
  14. channel.queueDeclare(SIMPLE_QUEUE_NAME, false, false, false, null);
  15. //创建消费者对象7
  16. DefaultConsumer consumer = new DefaultConsumer(channel) {
  17. @Override
  18. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  19. //消息消费者获取消息
  20. String message = new String(body, "UTF-8");
  21. System.out.println("receive message: " + message);
  22. }
  23. };
  24. //监听消息队列
  25. channel.basicConsume(SIMPLE_QUEUE_NAME, true, consumer);
  26. } catch (IOException e) {
  27. e.printStackTrace();
  28. }
  29. }
  30. }
【运行结果】

2、工作队列模式(Work Queues)

        多个消费者绑定到同一个队列上,一条消息只能被一个消费者进行消费,工作队列有轮询分发和公平分发两种模式。有点类似于抢红包

  • 轮询分发:消费者依次轮着消费信息,直到消息消费完为止,按均分配。
  • 公平分发:根据消费能力进行分发,即处理快的消费就多,处理慢的就消费就少,能者多劳。
2.1、轮询循分发方式(默认):
【消息生产者】
  1. /**
  2. * 说明:
  3. * 消费者1与消费者2处理的消息是均分的,而且消息是轮询分发的(轮询分发 round-robin)
  4. */
  5. public class ProducerWORK {
  6. private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";
  7. private static final String WORK_QUEUE_MESSAGE = "工作队列消息!! ------> " + new Date();
  8. public static void main(String[] args) throws IOException, InterruptedException {
  9. //获取MQ连接
  10. Connection connection = MQConnecitonUtils.getConnection();
  11. //从连接中获取Channel通道对象
  12. Channel channel = null;
  13. channel = connection.createChannel();
  14. //创建Queue队列
  15. channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
  16. //发送10条消息到工作队列
  17. for (int i = 1; i <= 10; i++) {
  18. //模拟延迟
  19. Thread.sleep(2000);
  20. StringBuilder msg = new StringBuilder(WORK_QUEUE_MESSAGE).append("---").append(i);
  21. //发送消息
  22. channel.basicPublish("", WORK_QUEUE_NAME, null, msg.toString().getBytes());
  23. System.out.println(msg);
  24. }
  25. System.out.println("=============工作队列消息发送完毕=============");
  26. MQConnecitonUtils.closeConnectionAndChanel(channel, connection);
  27. }
  28. }
【消费者A】
  1. /**
  2. * TODO 描述:消费者A
  3. *
  4. * @author ShuZL
  5. * @date 2023/9/15 0015 13:44
  6. */
  7. public class Consumer_A {
  8. private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";
  9. public static void main(String[] args) {
  10. //获取MQ连接对象
  11. Connection connection = MQConnecitonUtils.getConnection();
  12. Channel channel = null;
  13. try {
  14. //创建消息通道对象
  15. channel = connection.createChannel();
  16. //声明queue队列
  17. channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
  18. //创建消费者对象
  19. DefaultConsumer consumer = new DefaultConsumer(channel) {
  20. @Override
  21. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  22. //消息消费者获取消息
  23. String message = new String(body, "UTF-8");
  24. System.out.println("【CustomConsumer-A】receive message: " + message);
  25. try {
  26. //模拟延迟
  27. Thread.sleep(2000);
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. };
  33. //监听消息队列
  34. channel.basicConsume(WORK_QUEUE_NAME, true, consumer);
  35. } catch (IOException e) {
  36. e.printStackTrace();
  37. }
  38. }
  39. }
【消费者B】
  1. public class Consumer_B {
  2. private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";
  3. public static void main(String[] args) {
  4. //获取MQ连接对象
  5. Connection connection = MQConnecitonUtils.getConnection();
  6. Channel channel = null;
  7. try {
  8. //创建消息通道对象
  9. channel = connection.createChannel();
  10. //声明queue队列
  11. channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
  12. //创建消费者对象
  13. DefaultConsumer consumer = new DefaultConsumer(channel) {
  14. @Override
  15. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  16. //消息消费者获取消息
  17. String message = new String(body, "UTF-8");
  18. System.out.println("【CustomConsumer-B】receive message: " + message);
  19. try {
  20. //模拟延迟
  21. Thread.sleep(1000);
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. }
  26. };
  27. //监听消息队列
  28. channel.basicConsume(WORK_QUEUE_NAME, true, consumer);
  29. } catch (IOException e) {
  30. e.printStackTrace();
  31. }
  32. }
  33. }
【运行结果】

先运行消费者A和消费者B,再运行消息生产者

查看控制台已经被消费完了

由以上运行结果可知,消费者A和消费者B处理消费是均匀的,也就是消费条数是一样的,而且消息是轮询分发的,也就是说同一条消息只能被一个消费者消费。由此也引出了一个问题,上面的消费者A和消费者B处理消息的效率不同,但是最后两者接受的到的消息还是一样多,如果我们需要让工作效率高的消费者消费更多的消息,那可以使用公平分发,也就是多劳多得。

2.1、公平分发方式:

好处:防止消息堆积(多consumer(消费者)+能者多劳)

【消息生产者】
  1. public class ProducerWORK {
  2. private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";
  3. private static final String WORK_QUEUE_MESSAGE = "工作队列消息!! ------> " + new Date();
  4. public static void main(String[] args) throws IOException, InterruptedException {
  5. //获取MQ连接
  6. Connection connection = MQConnecitonUtils.getConnection();
  7. //从连接中获取Channel通道对象
  8. Channel channel = null;
  9. channel = connection.createChannel();
  10. //创建Queue队列
  11. channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
  12. //发送10条消息到工作队列
  13. for (int i = 1; i <= 20; i++) {
  14. //模拟延迟
  15. // Thread.sleep(2000);
  16. StringBuilder msg = new StringBuilder(WORK_QUEUE_MESSAGE).append("---").append(i);
  17. //发送消息
  18. channel.basicPublish("", WORK_QUEUE_NAME, null, msg.toString().getBytes());
  19. System.out.println(msg);
  20. }
  21. System.out.println("=============工作队列消息发送完毕=============");
  22. MQConnecitonUtils.closeConnectionAndChanel(channel, connection);
  23. }
  24. }
【消费者A】

使用basicQos方法prefetchCount = 1设置,使用公平分发必须关闭自动应答(autoAck:true自动返回结果,false手动返回),以上两个设置就是告诉RabbitMQ必须要让消费者处理并手动确认了前一个消息,才会发送新的消息。

  1. public class Consumer_A {
  2. private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";
  3. public static void main(String[] args) throws IOException {
  4. //获取MQ连接对象
  5. Connection connection = MQConnecitonUtils.getConnection();
  6. //创建消息通道对象
  7. final Channel channel = connection.createChannel();
  8. //声明queue队列
  9. channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
  10. channel.basicQos(1);
  11. //创建消费者对象
  12. DefaultConsumer consumer = new DefaultConsumer(channel) {
  13. @Override
  14. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  15. //消息消费者获取消息
  16. String message = new String(body, "UTF-8");
  17. System.out.println("【CustomConsumer-A】receive message: " + message);
  18. try {
  19. //模拟延迟
  20. Thread.sleep(1000);
  21. //消费完一条消息需要自动发送确认消息给MQ
  22. channel.basicAck(envelope.getDeliveryTag(), false);
  23. } catch (InterruptedException e) {
  24. e.printStackTrace();
  25. }
  26. }
  27. };
  28. //使用公平分发必须关闭自动应答(autoAck:true自动返回结果,false手动返回)
  29. boolean autoAck = false;
  30. //监听消息队列
  31. channel.basicConsume(WORK_QUEUE_NAME, autoAck, consumer);
  32. }
  33. }
【消费者B】
  1. public class Consumer_B {
  2. private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";
  3. public static void main(String[] args) throws IOException {
  4. //获取MQ连接对象
  5. Connection connection = MQConnecitonUtils.getConnection();
  6. //创建消息通道对象
  7. final Channel channel = connection.createChannel();
  8. //声明queue队列
  9. channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
  10. channel.basicQos(1);
  11. //创建消费者对象
  12. DefaultConsumer consumer = new DefaultConsumer(channel) {
  13. @Override
  14. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  15. //消息消费者获取消息
  16. String message = new String(body, "UTF-8");
  17. System.out.println("【CustomConsumer-B】receive message: " + message);
  18. try {
  19. //模拟延迟
  20. Thread.sleep(3000);
  21. //消费完一条消息需要自动发送确认消息给MQ
  22. channel.basicAck(envelope.getDeliveryTag(), false);
  23. } catch (InterruptedException e) {
  24. e.printStackTrace();
  25. }
  26. }
  27. };
  28. //使用公平分发必须关闭自动应答(autoAck:true自动返回结果,false手动返回)
  29. boolean autoAck = false;
  30. //监听消息队列
  31. channel.basicConsume(WORK_QUEUE_NAME, autoAck, consumer);
  32. }
  33. }
【运行结果】

生产者运行结果如下:

消费者A运行结果如下: 

 消费者B运行结果如下:

由以上运行结果可见,消费者A的效率相对较高,所以消费者A消费消息比消费者B多一些,这样就可以 充分发挥处理消息的能力。也能一定的避免消息堆积问题

3、发布-订阅模式(Publish/Subscribe)

        生产者将消息发送到交换机,然后交换机绑定到多个队列,监听该队列的所以消费者消费消息。

【生产者】
  1. /**
  2. * 说明:可实现一条消息被多个消费者消费
  3. * a. 一个生产者,多个消费者;
  4. * b. 每一个消费者都有自己的消息队列;
  5. * c. 生产者没有把消息发送到队列,而是发送到交换器exchange上;
  6. * d. 每个队列都需要绑定到交换机上;
  7. * e. 生产者生产的消息先经过交换机然后到达队列,一个消息可以被多个消费者消费;
  8. */
  9. public class Product {
  10. //定义交换机名称
  11. private static final String PUBLISH_SUBSCRIBE_EXCHANGE_NAME = "publish_subscribe_exchange_fanout";
  12. //类型:分发
  13. private static final String PUBLISH_SUBSCRIBE_EXCHANGE_TYPE = "fanout";
  14. public static void main(String[] args) throws IOException {
  15. //获取MQ连接
  16. Connection connection = MQConnecitonUtils.getConnection();
  17. //从连接中获取Channel通道对象
  18. Channel channel = connection.createChannel();
  19. //创建交换机对象publish_subscribe_exchange_fanout
  20. channel.exchangeDeclare(PUBLISH_SUBSCRIBE_EXCHANGE_NAME, PUBLISH_SUBSCRIBE_EXCHANGE_TYPE);
  21. //发送消息到交换机exchange上
  22. String msg = "hello world!!!";
  23. channel.basicPublish(PUBLISH_SUBSCRIBE_EXCHANGE_NAME, "", null, msg.getBytes());
  24. System.out.println("=============工作队列消息发送完毕=============");
  25. MQConnecitonUtils.closeConnectionAndChanel(channel, connection);
  26. }
  27. }
【消费者A】
  1. public class ConsumerA {
  2. private static final String PUBLIC_SUBSCRIBE_QUEUE_NAME = "public_subscribe_queue_name01";
  3. private static final String PUBLISH_SUBSCRIBE_EXCHANGE_NAME = "publish_subscribe_exchange_fanout";
  4. public static void main(String[] args) {
  5. //获取MQ连接对象
  6. Connection connection = MQConnecitonUtils.getConnection();
  7. try {
  8. //创建消息通道对象
  9. final Channel channel = connection.createChannel();
  10. //创建队列
  11. channel.queueDeclare(PUBLIC_SUBSCRIBE_QUEUE_NAME, false, false, false, null);
  12. //将队列绑定到交换机上
  13. channel.queueBind(PUBLIC_SUBSCRIBE_QUEUE_NAME, PUBLISH_SUBSCRIBE_EXCHANGE_NAME, "");
  14. //创建消费者对象
  15. DefaultConsumer consumer = new DefaultConsumer(channel) {
  16. @Override
  17. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  18. //消息消费者获取消息
  19. String message = new String(body, "UTF-8");
  20. System.out.println("【CustomConsumer-A】receive message: " + message);
  21. //消费完一条消息需要自动发送确认消息给MQ
  22. channel.basicAck(envelope.getDeliveryTag(), false);
  23. }
  24. };
  25. //监听消息队列
  26. channel.basicConsume(PUBLIC_SUBSCRIBE_QUEUE_NAME, false, consumer);
  27. } catch (IOException e) {
  28. e.printStackTrace();
  29. }
  30. }
  31. }
【消费者B】
  1. public class ConsumerB {
  2. private static final String PUBLIC_SUBSCRIBE_QUEUE_NAME = "public_subscribe_queue_name02";
  3. private static final String PUBLISH_SUBSCRIBE_EXCHANGE_NAME = "publish_subscribe_exchange_fanout";
  4. public static void main(String[] args) {
  5. //获取MQ连接对象
  6. Connection connection = MQConnecitonUtils.getConnection();
  7. try {
  8. //创建消息通道对象
  9. final Channel channel = connection.createChannel();
  10. //创建队列
  11. channel.queueDeclare(PUBLIC_SUBSCRIBE_QUEUE_NAME, false, false, false, null);
  12. //将队列绑定到交换机上
  13. channel.queueBind(PUBLIC_SUBSCRIBE_QUEUE_NAME, PUBLISH_SUBSCRIBE_EXCHANGE_NAME, "");
  14. //创建消费者对象
  15. DefaultConsumer consumer = new DefaultConsumer(channel) {
  16. @Override
  17. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  18. //消息消费者获取消息
  19. String message = new String(body, "UTF-8");
  20. System.out.println("【CustomConsumer-B】receive message: " + message);
  21. //消费完一条消息需要自动发送确认消息给MQ
  22. channel.basicAck(envelope.getDeliveryTag(), false);
  23. }
  24. };
  25. //监听消息队列
  26. channel.basicConsume(PUBLIC_SUBSCRIBE_QUEUE_NAME, false, consumer);
  27. } catch (IOException e) {
  28. e.printStackTrace();
  29. }
  30. }
  31. }
【运行结果】

由此可见,一条消息被两个消费者同时消费。

4、路由模式(Routing)

生产者将消息发送到direct交换机,它会把消息路由到那些binding key和routing key完全匹配的Queue中,这样就能实现消费者有选择性的去消费消息了。 

【生产者】
  1. public class ProducerRouting {
  2. //自定义交换机名称
  3. private static final String EXCHANGE_NAME = "publish_subscribe_exchange_direct";
  4. //交换机类型:direct
  5. private static final String EXCHANGE_TYPE = "direct";
  6. //指定一个routing key
  7. private static final String EXCHANGE_ROUTE_KEY = "info";
  8. public static void main(String[] args) throws IOException {
  9. //获取MQ连接
  10. Connection connection = MQConnecitonUtils.getConnection();
  11. //从连接中获取Channel通道对象
  12. Channel channel = connection.createChannel();
  13. //创建交换机对象
  14. channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
  15. //发送消息到交换机exchange上
  16. String msg = "路由模式消息111 !!!!";
  17. //指定routing key为info
  18. channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTE_KEY, null, msg.getBytes());
  19. System.out.println("=============路由模式消息发送完毕=============");
  20. MQConnecitonUtils.closeConnectionAndChanel(channel, connection);
  21. }
  22. }
【消费者A】
  1. public class Consumer_A {
  2. //定义队列名称
  3. private static final String QUEUE_NAME = "routing_direct_queue_name_1";
  4. //绑定生产者给定的交换机名称
  5. private static final String EXCHANGE_NAME = "publish_subscribe_exchange_direct";
  6. //binding key
  7. private static final String EXCHANGE_ROUTE_KEY = "error";
  8. public static void main(String[] args) {
  9. //获取MQ连接对象
  10. Connection connection = MQConnecitonUtils.getConnection();
  11. try {
  12. //创建消息通道对象
  13. final Channel channel = connection.createChannel();
  14. //创建队列
  15. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  16. //将队列绑定到交换机上,并且指定routing_key
  17. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY);
  18. //创建消费者对象
  19. DefaultConsumer consumer = new DefaultConsumer(channel) {
  20. @Override
  21. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  22. //消息消费者获取消息
  23. String message = new String(body, "UTF-8");
  24. System.out.println("【CustomConsumer-A】receive message: " + message);
  25. //消费完一条消息需要自动发送确认消息给MQ
  26. channel.basicAck(envelope.getDeliveryTag(), false);
  27. }
  28. };
  29. //监听消息队列
  30. channel.basicConsume(QUEUE_NAME, false, consumer);
  31. } catch (IOException e) {
  32. e.printStackTrace();
  33. }
  34. }
  35. }
【消费者B】
  1. public class Consumer_B {
  2. //定义队列名称
  3. private static final String QUEUE_NAME = "routing_direct_queue_name_2";
  4. //绑定生产者给定的交换机名称
  5. private static final String EXCHANGE_NAME = "publish_subscribe_exchange_direct";
  6. //binding key
  7. //binding key
  8. private static final String EXCHANGE_ROUTE_KEY01 = "error";
  9. private static final String EXCHANGE_ROUTE_KEY02 = "info";
  10. private static final String EXCHANGE_ROUTE_KEY03 = "warning";
  11. public static void main(String[] args) {
  12. //获取MQ连接对象
  13. Connection connection = MQConnecitonUtils.getConnection();
  14. try {
  15. //创建消息通道对象
  16. final Channel channel = connection.createChannel();
  17. //创建队列
  18. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  19. //将队列绑定到交换机上,并且指定routing_key
  20. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY01);
  21. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY02);
  22. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY03);
  23. //创建消费者对象
  24. DefaultConsumer consumer = new DefaultConsumer(channel) {
  25. @Override
  26. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  27. //消息消费者获取消息
  28. String message = new String(body, "UTF-8");
  29. System.out.println("【CustomConsumer-B】receive message: " + message);
  30. //消费完一条消息需要自动发送确认消息给MQ
  31. channel.basicAck(envelope.getDeliveryTag(), false);
  32. }
  33. };
  34. //监听消息队列
  35. channel.basicConsume(QUEUE_NAME, false, consumer);
  36. } catch (IOException e) {
  37. e.printStackTrace();
  38. }
  39. }
  40. }
【运行结果】

        因为生产者发布消息的时候指定了routing key为info, 消费者A绑定队列的时候指定的binding key 为error,显然消费者A接收不到此消息,因为消费者B绑定队列的时候指定了binding key为error、info、warning,所以消费者B能够成功接收该消息进行消费。

5、主题(Topic)模式

类似于正则表达式匹配的一种模式。主要使用#、*进行匹配。

通配符规则:

' # ':匹配一个或多个词

' * ':匹配不多不少恰好1个词

例如:

`audit.#`:能够匹配`audit.irs.corporate` 或者 `audit.irs`

`audit.*`:只能匹配`audit.irs`

【生产者】
  1. /**
  2. * 说明:
  3. * #: 代表一个或者多个
  4. * *: 代表一个
  5. * <p>
  6. * 举例:
  7. * 比如发送消息的时候指定了routing key为news.insert,
  8. * 如果消费者指定binding key 为news.* 或者news.#都能接收到该消息;
  9. */
  10. public class Producer {
  11. private static final String EXCHANGE_NAME = "exchange_topic";
  12. //交换机类型:topic 类似正则匹配模式
  13. private static final String EXCHANGE_TYPE = "topic";
  14. //指定routing key
  15. private static final String EXCHANGE_ROUTE_KEY = "news.insert";
  16. public static void main(String[] args) throws IOException {
  17. //获取MQ连接
  18. Connection connection = MQConnecitonUtils.getConnection();
  19. //从连接中获取Channel通道对象
  20. Channel channel = connection.createChannel();
  21. //创建交换机对象
  22. channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
  23. //发送消息到交换机exchange上
  24. String msg = "主题(Topic)!!!";
  25. channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTE_KEY, null, msg.getBytes());
  26. System.out.println("=============主题模式消息发送完毕=============");
  27. MQConnecitonUtils.closeConnectionAndChanel(channel, connection);
  28. }
  29. }
【消费者A】
  1. public class Consumer_A {
  2. //定义队列名称
  3. private static final String QUEUE_NAME = "topic_queue_name1";
  4. //交换机名称
  5. private static final String EXCHANGE_NAME = "exchange_topic";
  6. //binding key
  7. private static final String EXCHANGE_ROUTE_KEY = "news.*";
  8. public static void main(String[] args) {
  9. //获取MQ连接对象
  10. Connection connection = MQConnecitonUtils.getConnection();
  11. try {
  12. //创建消息通道对象
  13. final Channel channel = connection.createChannel();
  14. //创建队列
  15. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  16. //将队列绑定到交换机上,并且指定routing_key
  17. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY);
  18. //创建消费者对象
  19. DefaultConsumer consumer = new DefaultConsumer(channel) {
  20. @Override
  21. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  22. //消息消费者获取消息
  23. String message = new String(body, "UTF-8");
  24. System.out.println("【CustomConsumer01】receive message: " + message);
  25. //消费完一条消息需要自动发送确认消息给MQ
  26. channel.basicAck(envelope.getDeliveryTag(), false);
  27. }
  28. };
  29. //监听消息队列
  30. channel.basicConsume(QUEUE_NAME, false, consumer);
  31. } catch (IOException e) {
  32. e.printStackTrace();
  33. }
  34. }
  35. }
【消费者B】
  1. public class Consumer_B {
  2. private static final String QUEUE_NAME = "topic_queue_name2";
  3. private static final String EXCHANGE_NAME = "exchange_topic";
  4. //binding key
  5. private static final String EXCHANGE_ROUTE_KEY = "news.#";
  6. public static void main(String[] args) {
  7. //获取MQ连接对象
  8. Connection connection = MQConnecitonUtils.getConnection();
  9. try {
  10. //创建消息通道对象
  11. final Channel channel = connection.createChannel();
  12. //创建队列
  13. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  14. //将队列绑定到交换机上,并且指定routing_key
  15. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY);
  16. //创建消费者对象
  17. DefaultConsumer consumer = new DefaultConsumer(channel) {
  18. @Override
  19. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  20. //消息消费者获取消息
  21. String message = new String(body, "UTF-8");
  22. System.out.println("【CustomConsumer02】receive message: " + message);
  23. //消费完一条消息需要自动发送确认消息给MQ
  24. channel.basicAck(envelope.getDeliveryTag(), false);
  25. }
  26. };
  27. //监听消息队列
  28. channel.basicConsume(QUEUE_NAME, false, consumer);
  29. } catch (IOException e) {
  30. e.printStackTrace();
  31. }
  32. }
  33. }
【运行结果】

        生产者发送消息绑定的routing key 为news.insert;消费者A监听的队列和交换器binding key 为news.insert;消费者B监听的队列和交换器bindingkey为news.#,很显然,两个消费者都将接收到该消息。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/949089
推荐阅读
相关标签
  

闽ICP备14008679号