当前位置:   article > 正文

RabbitMQ的部分模式

RabbitMQ的部分模式

1发布订阅模式

发送者

  1. package org.example;
  2. import com.alibaba.fastjson.JSON;
  3. import com.rabbitmq.client.BuiltinExchangeType;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.Connection;
  6. import com.rabbitmq.client.ConnectionFactory;
  7. import java.io.IOException;
  8. import java.util.HashMap;
  9. import java.util.concurrent.TimeoutException;
  10. public class PublishProduct {
  11. public static void main(String[] args) {
  12. // 创建连接工厂
  13. ConnectionFactory factory = new ConnectionFactory();
  14. // 设置 RabbitMQ 服务器的地址,我用的redis,RabbitMQ,现在redis里安装RabbitMQ
  15. factory.setHost("192.168.74.75");
  16. Connection connection = null;
  17. Channel channel = null;
  18. try {
  19. connection = factory.newConnection();
  20. // 创建一个通道
  21. channel = connection.createChannel();
  22. //创建交换机
  23. channel.exchangeDeclare("qy172-fanout-exchange", BuiltinExchangeType.FANOUT, true);
  24. //创建队列,如果存在则不会创建
  25. channel.queueDeclare("qy172-publish-queue01", true, false, false, null);
  26. channel.queueDeclare("qy172-publish-queue02", true, false, false, null);
  27. //交互机和队列绑定
  28. channel.queueBind("qy172-publish-queue01", "qy172-fanout-exchange", "");
  29. channel.queueBind("qy172-publish-queue02", "qy172-fanout-exchange", "");
  30. // 创建消息内容
  31. HashMap<String, Object> map = new HashMap<>();
  32. map.put("name", "张三");
  33. map.put("age", "22");
  34. //把数据给交换机,让他分发给队列
  35. channel.basicPublish("qy172-fanout-exchange", "", null, JSON.toJSONBytes(map));
  36. System.out.println("发送成功");
  37. } catch (IOException e) {
  38. // 发生 IO 异常时抛出运行时异常
  39. throw new RuntimeException(e);
  40. } catch (TimeoutException e) {
  41. // 发生超时异常时抛出运行时异常
  42. throw new RuntimeException(e);
  43. } finally {
  44. if (channel != null) {
  45. try {
  46. // 关闭通道
  47. channel.close();
  48. } catch (IOException | TimeoutException e) {
  49. // 发生 IO 或超时异常时抛出运行时异常
  50. throw new RuntimeException(e);
  51. }
  52. }
  53. if (connection != null) {
  54. try {
  55. // 关闭连接
  56. connection.close();
  57. } catch (IOException e) {
  58. // 发生 IO 异常时抛出运行时异常
  59. throw new RuntimeException(e);
  60. }
  61. }
  62. }
  63. }
  64. }

2订阅个订阅者

订阅者1

  1. package org.example;
  2. import com.alibaba.fastjson.JSON;
  3. import com.rabbitmq.client.*;
  4. import java.io.IOException;
  5. import java.util.Map;
  6. public class Consumer01 {
  7. public static void main(String[] args) throws Exception {
  8. // 创建连接工厂对象
  9. ConnectionFactory factory = new ConnectionFactory();
  10. // 设置 RabbitMQ 服务器的主机地址为 "192.168.74.75"
  11. factory.setHost("192.168.74.75");
  12. Connection connection = factory.newConnection();
  13. // 创建一个 RabbitMQ 连接
  14. Channel channel = connection.createChannel();
  15. // 创建一个通道,用于与 RabbitMQ 之间的通信
  16. com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
  17. // 创建一个消费者对象,并重写其方法
  18. @Override
  19. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  20. // 消费消息的处理方法
  21. String json = new String(body);
  22. // 将消息内容转换为字符串
  23. Map map = JSON.parseObject(json, Map.class);
  24. // 使用 JSON 解析成 Map 对象
  25. System.out.println("消息内容Consumer01"+map);
  26. // 输出消息内容
  27. }
  28. };
  29. channel.basicConsume("qy172-publish-queue01",true,consumer);
  30. }
  31. }

订阅者2

  1. package com.aaa;
  2. import com.alibaba.fastjson.JSON;
  3. import com.rabbitmq.client.*;
  4. import java.io.IOException;
  5. import java.util.Map;
  6. import java.util.concurrent.TimeoutException;
  7. public class Consumer02 {
  8. public static void main(String[] args) {
  9. ConnectionFactory factory = new ConnectionFactory();
  10. factory.setHost("192.168.74.75");
  11. try {
  12. Connection connection = factory.newConnection();
  13. Channel channel = connection.createChannel();
  14. com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
  15. @Override
  16. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  17. String json = new String(body);
  18. Map map = JSON.parseObject(json, Map.class);
  19. System.out.println("消息内容Consumer02" + map);
  20. }
  21. };
  22. //订阅者2
  23. channel.basicConsume("qy172-publish-queue02",true,consumer);
  24. } catch (IOException | TimeoutException e) {
  25. // 处理连接、通道创建或消费消息时可能抛出的异常
  26. e.printStackTrace();
  27. }
  28. }
  29. }

2路由模式

发送者

  1. package org.example;
  2. import com.alibaba.fastjson.JSON;
  3. import com.rabbitmq.client.BuiltinExchangeType;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.Connection;
  6. import com.rabbitmq.client.ConnectionFactory;
  7. import java.io.IOException;
  8. import java.util.HashMap;
  9. import java.util.concurrent.TimeoutException;
  10. public class PublishProduct {
  11. public static void main(String[] args) {
  12. // 创建连接工厂
  13. ConnectionFactory factory = new ConnectionFactory();
  14. // 设置 RabbitMQ 服务器的地址,我用的redis,RabbitMQ,现在redis里安装RabbitMQ
  15. factory.setHost("192.168.74.75");
  16. Connection connection = null;
  17. Channel channel = null;
  18. try {
  19. connection = factory.newConnection();
  20. // 创建一个通道
  21. channel = connection.createChannel();
  22. //创建交换机,
  23. channel.exchangeDeclare("qy172-router-exchange", BuiltinExchangeType.DIRECT, true);
  24. //创建队列,如果存在则不会创建
  25. channel.queueDeclare("qy172-router-queue01", true, false, false, null);
  26. channel.queueDeclare("qy172-router-queue02", true, false, false, null);
  27. //交互机和队列绑定
  28. channel.queueBind("qy172-router-queue01", "qy172-router-exchange", "error");
  29. channel.queueBind("qy172-router-queue02", "qy172-router-exchange", "error");
  30. channel.queueBind("qy172-router-queue02", "qy172-router-exchange", "info");
  31. channel.queueBind("qy172-router-queue02", "qy172-router-exchange", "warning");
  32. // 创建消息内容
  33. HashMap<String, Object> map = new HashMap<>();
  34. map.put("name", "张三");
  35. map.put("age", "22");
  36. //把数据给交换机,让他分发给队列
  37. channel.basicPublish("qy172-router-exchange","error",null,JSON.toJSONBytes(map));
  38. // channel.basicPublish("qy172-router-exchange","info",null,JSON.toJSONBytes(map));
  39. System.out.println("发送成功");
  40. } catch (IOException e) {
  41. // 发生 IO 异常时抛出运行时异常
  42. throw new RuntimeException(e);
  43. } catch (TimeoutException e) {
  44. // 发生超时异常时抛出运行时异常
  45. throw new RuntimeException(e);
  46. } finally {
  47. if (channel != null) {
  48. try {
  49. // 关闭通道
  50. channel.close();
  51. } catch (IOException | TimeoutException e) {
  52. // 发生 IO 或超时异常时抛出运行时异常
  53. throw new RuntimeException(e);
  54. }
  55. }
  56. if (connection != null) {
  57. try {
  58. // 关闭连接
  59. connection.close();
  60. } catch (IOException e) {
  61. // 发生 IO 异常时抛出运行时异常
  62. throw new RuntimeException(e);
  63. }
  64. }
  65. }
  66. }
  67. }

接收者1

  1. package org.example;
  2. import com.alibaba.fastjson.JSON;
  3. import com.rabbitmq.client.*;
  4. import java.io.IOException;
  5. import java.util.Map;
  6. public class Consumer01 {
  7. public static void main(String[] args) throws Exception {
  8. // 创建连接工厂对象
  9. ConnectionFactory factory = new ConnectionFactory();
  10. // 设置 RabbitMQ 服务器的主机地址为 "192.168.74.75"
  11. factory.setHost("192.168.74.75");
  12. Connection connection = factory.newConnection();
  13. // 创建一个 RabbitMQ 连接
  14. Channel channel = connection.createChannel();
  15. // 创建一个通道,用于与 RabbitMQ 之间的通信
  16. com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
  17. // 创建一个消费者对象,并重写其方法
  18. @Override
  19. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  20. // 消费消息的处理方法
  21. String json = new String(body);
  22. // 将消息内容转换为字符串
  23. Map map = JSON.parseObject(json, Map.class);
  24. // 使用 JSON 解析成 Map 对象
  25. System.out.println("消息内容Consumer01"+map);
  26. // 输出消息内容
  27. }
  28. };
  29. channel.basicConsume("qy172-router-queue01",true,consumer);
  30. }
  31. }

接收者2

  1. package org.example;
  2. import com.alibaba.fastjson.JSON;
  3. import com.rabbitmq.client.*;
  4. import java.io.IOException;
  5. import java.util.Map;
  6. public class Consumer01 {
  7. public static void main(String[] args) throws Exception {
  8. // 创建连接工厂对象
  9. ConnectionFactory factory = new ConnectionFactory();
  10. // 设置 RabbitMQ 服务器的主机地址为 "192.168.74.75"
  11. factory.setHost("192.168.74.75");
  12. Connection connection = factory.newConnection();
  13. // 创建一个 RabbitMQ 连接
  14. Channel channel = connection.createChannel();
  15. // 创建一个通道,用于与 RabbitMQ 之间的通信
  16. com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
  17. // 创建一个消费者对象,并重写其方法
  18. @Override
  19. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  20. // 消费消息的处理方法
  21. String json = new String(body);
  22. // 将消息内容转换为字符串
  23. Map map = JSON.parseObject(json, Map.class);
  24. // 使用 JSON 解析成 Map 对象
  25. System.out.println("消息内容Consumer01"+map);
  26. // 输出消息内容
  27. }
  28. };
  29. channel.basicConsume("qy172-router-queue01",true,consumer);
  30. }
  31. }

3主题模式

发送者

  1. package org.example;
  2. import com.alibaba.fastjson.JSON;
  3. import com.rabbitmq.client.BuiltinExchangeType;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.Connection;
  6. import com.rabbitmq.client.ConnectionFactory;
  7. import java.io.IOException;
  8. import java.util.HashMap;
  9. import java.util.concurrent.TimeoutException;
  10. public class PublishProduct {
  11. public static void main(String[] args) {
  12. // 创建连接工厂
  13. ConnectionFactory factory = new ConnectionFactory();
  14. // 设置 RabbitMQ 服务器的地址,我用的redis,RabbitMQ,现在redis里安装RabbitMQ
  15. factory.setHost("192.168.74.75");
  16. Connection connection = null;
  17. Channel channel = null;
  18. try {
  19. connection = factory.newConnection();
  20. // 创建一个通道
  21. channel = connection.createChannel();
  22. //创建交换机,
  23. channel.exchangeDeclare("qy172-topic-exchange", BuiltinExchangeType.TOPIC, true);
  24. //创建队列,如果存在则不会创建
  25. channel.queueDeclare("qy172-topic-queue01", true, false, false, null);
  26. channel.queueDeclare("qy172-topic-queue02", true, false, false, null);
  27. //交互机和队列绑定
  28. //主题匹配给这个
  29. channel.queueBind("qy172-topic-queue01", "qy172-topic-exchange", "*.orange.*");
  30. //主题,也匹配给这个
  31. channel.queueBind("qy172-topic-queue02", "qy172-topic-exchange", "*.*.rabbit");
  32. channel.queueBind("qy172-topic-queue02", "qy172-topic-exchange", "lazy.#");
  33. // 创建消息内容
  34. HashMap<String, Object> map = new HashMap<>();
  35. map.put("name", "张三");
  36. map.put("age", "22");
  37. //把数据给交换机,让他分发给队列
  38. channel.basicPublish("qy172-topic-exchange","lazy.orange.rabbit",null,JSON.toJSONBytes(map));
  39. System.out.println("发送成功");
  40. } catch (IOException e) {
  41. // 发生 IO 异常时抛出运行时异常
  42. throw new RuntimeException(e);
  43. } catch (TimeoutException e) {
  44. // 发生超时异常时抛出运行时异常
  45. throw new RuntimeException(e);
  46. } finally {
  47. if (channel != null) {
  48. try {
  49. // 关闭通道
  50. channel.close();
  51. } catch (IOException | TimeoutException e) {
  52. // 发生 IO 或超时异常时抛出运行时异常
  53. throw new RuntimeException(e);
  54. }
  55. }
  56. if (connection != null) {
  57. try {
  58. // 关闭连接
  59. connection.close();
  60. } catch (IOException e) {
  61. // 发生 IO 异常时抛出运行时异常
  62. throw new RuntimeException(e);
  63. }
  64. }
  65. }
  66. }
  67. }

接收者1

  1. package org.example;
  2. import com.alibaba.fastjson.JSON;
  3. import com.rabbitmq.client.*;
  4. import java.io.IOException;
  5. import java.util.Map;
  6. public class Consumer01 {
  7. public static void main(String[] args) throws Exception {
  8. // 创建连接工厂对象
  9. ConnectionFactory factory = new ConnectionFactory();
  10. // 设置 RabbitMQ 服务器的主机地址为 "192.168.74.75"
  11. factory.setHost("192.168.74.75");
  12. Connection connection = factory.newConnection();
  13. // 创建一个 RabbitMQ 连接
  14. Channel channel = connection.createChannel();
  15. // 创建一个通道,用于与 RabbitMQ 之间的通信
  16. com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
  17. // 创建一个消费者对象,并重写其方法
  18. @Override
  19. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  20. // 消费消息的处理方法
  21. String json = new String(body);
  22. // 将消息内容转换为字符串
  23. Map map = JSON.parseObject(json, Map.class);
  24. // 使用 JSON 解析成 Map 对象
  25. System.out.println("消息内容Consumer01"+map);
  26. // 输出消息内容
  27. }
  28. };
  29. channel.basicConsume("qy172-topic-queue01",true,consumer);
  30. }
  31. }

接收者2

  1. package com.aaa;
  2. import com.alibaba.fastjson.JSON;
  3. import com.rabbitmq.client.*;
  4. import java.io.IOException;
  5. import java.util.Map;
  6. import java.util.concurrent.TimeoutException;
  7. public class Consumer02 {
  8. public static void main(String[] args) {
  9. ConnectionFactory factory = new ConnectionFactory();
  10. factory.setHost("192.168.74.75");
  11. try {
  12. Connection connection = factory.newConnection();
  13. Channel channel = connection.createChannel();
  14. com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
  15. @Override
  16. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  17. String json = new String(body);
  18. Map map = JSON.parseObject(json, Map.class);
  19. System.out.println("消息内容Consumer02" + map);
  20. }
  21. };
  22. //订阅者2
  23. channel.basicConsume("qy172-topic-queue02",true,consumer);
  24. } catch (IOException | TimeoutException e) {
  25. // 处理连接、通道创建或消费消息时可能抛出的异常
  26. e.printStackTrace();
  27. }
  28. }
  29. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/349283
推荐阅读
相关标签
  

闽ICP备14008679号