当前位置:   article > 正文

RabbitMQ 交换机类型

RabbitMQ 交换机类型

常用交换机

发布订阅(Publish/Subscribe)交换机

 一个生产者给多个队列发送消息,X 代表交换机。

交换机的作用:类似网络路由器,主要提供转发功能,解决怎么把消息转发到不同的队列中,让消费者从不同队列取然后消费。

绑定:交换机和队列关联起来

发布订阅交换机,队列进行持久化,生产者发布消息,所有消费者都能接收到消息。

生产者代码

  1. public class FanoutProducer{
  2. private static final String EXCHANGE_NAME = "fanout_exchange";
  3. public static void main(String[] argv) throws Exception {
  4. ConnectionFactory factory = new ConnectionFactory();
  5. factory.setHost("localhost");
  6. try (Connection connection = factory.newConnection();
  7. Channel channel = connection.createChannel()) {
  8. //创建交换机,参数:交换机名称,交换机类型
  9. channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  10. Scanner scanner = new Scanner(System.in);
  11. while(scanner.hasNext()){
  12. String message = scanner.nextLine();
  13. //第二个参数是路由规则
  14. channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
  15. System.out.println(" [x] Sent '" + message + "'");
  16. }
  17. }
  18. }
  19. }

消费者代码

  1. public class FanoutConsumer {
  2. private static final String EXCHANGE_NAME = "fanout_exchange";
  3. public static void main(String[] argv) throws Exception {
  4. ConnectionFactory factory = new ConnectionFactory();
  5. factory.setHost("localhost");
  6. Connection connection = factory.newConnection();
  7. Channel channel = connection.createChannel();
  8. //绑定交换机,以及设置绑定规则
  9. channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  10. String queueName1 = "xiaowang";
  11. String queueName2 = "xiaoli";
  12. channel.queueDeclare(queueName1, true, false, false, null);
  13. channel.queueDeclare(queueName2, true, false, false, null);
  14. //创建队列,不指定队列,随机分配
  15. //String queueName = channel.queueDeclare().getQueue();
  16. channel.queueBind(queueName1, EXCHANGE_NAME, "");
  17. System.out.println(" [xiaowang] Waiting for messages. To exit press CTRL+C");
  18. //交换机绑定队列
  19. channel.queueBind(queueName2, EXCHANGE_NAME, "");
  20. System.out.println(" [xiaoli] Waiting for messages. To exit press CTRL+C");
  21. DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
  22. String message = new String(delivery.getBody(), "UTF-8");
  23. System.out.println( " [xiaowang] Received '" + message + "'");
  24. };
  25. DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
  26. String message = new String(delivery.getBody(), "UTF-8");
  27. System.out.println( " [xiaoli] Received '" + message + "'");
  28. };
  29. channel.basicConsume(queueName1, true, deliverCallback1, consumerTag -> {
  30. });
  31. channel.basicConsume(queueName2, true, deliverCallback2, consumerTag -> {
  32. });
  33. }
  34. }

channel 频道:理解为操作消息队列的 Client,通过 channel 收发消息,提供了和消息对了 server 建立通信的传输方法

channel.queueDeclare 方法参数:

queue:这是一个字符串参数,代表要声明的队列的名称。如果队列不存在,则会自动创建一个新的队列。

durable:这是一个布尔值参数,表示队列是否持久化。如果设置为true,则队列会在服务器重启后仍然存在;如果设置为false,则队列在服务器重启后会被删除。默认值为false。

exclusive:这也是一个布尔值参数,表示队列是否为独占模式。如果设置为true,则只有当前连接可以访问该队列;如果设置为false,则其他连接也可以访问该队列。默认值为false。

autoDelete:这是另一个布尔值参数,表示队列是否自动删除。如果设置为true,则当最后一个消费者取消订阅时,队列将被删除;如果设置为false,则队列将一直存在,直到手动删除或服务器重启。默认值为false。

arguments:这是一个可选参数,用于设置队列的其他属性,比如消息的最大长度、最大优先级等。

channel.basicPublish 参数:

exchange:这是一个字符串参数,代表交换机的名称。如果不需要使用特定的交换机,可以传递一个空字符串("")。交换机是RabbitMQ中用于接收生产者发送的消息并根据绑定规则路由到队列的组件。

routingKey:这也是一个字符串参数,它指定了发布消息的队列。无论通道绑定到哪个队列,最终发布的消息都会包含这个指定的路由键。路由键是用来确定消息应该发送到哪个队列的重要信息。

message:这是要发布的消息本身,通常是字节数组的形式。

properties:这是一个可选参数,用于设置消息的属性,比如消息的优先级、过期时间等。

在使用channel.basicPublish时,需要注意以下几点:

exchange和routingKey不能为空:在AMQImpl类中的实现要求这两个参数都不能为null,否则会抛出异常。

交换机类型:根据不同的需求,可以选择不同类型的交换机,如fanout、direct或topic。每种类型的交换机都有其特定的路由规则。

非命名队列:在某些情况下,比如日志系统,可以使用非命名队列,这样消费者可以接收到所有相关的日志消息,而不是特定的部分。

 channel.basicConsume 参数:

queue:这是一个字符串参数,代表要消费的队列的名称。如果队列不存在,则会抛出异常。
onMessage:这是一个回调函数,当有新的消息到达时会被调用。该函数需要接收两个参数:一个表示消息内容的Delivery对象和一个表示通道的Channel对象。
consumerTag:这是一个可选参数,用于标识消费者。如果没有指定,则会自动生成一个唯一的标识符。
autoAck:这是一个布尔值参数,表示是否自动确认消息。如果设置为true,则在消息被处理后会自动发送确认信息;如果设置为false,则需要手动发送确认信息。默认值为false。
arguments:这是一个可选参数,用于设置消费者的其他属性,比如消息的最大长度、最大优先级等。
在使用channel.basicConsume时,需要注意以下几点:

队列名称:队列名称应该是唯一的,否则会抛出异常。
消息处理:在onMessage回调函数中,需要对消息进行处理,并根据需要发送确认信息。
消费者标识符:可以通过设置consumerTag来标识消费者,以便在后续操作中进行识别和管理。
消费者属性:可以通过设置消费者的其他属性来控制消费者的行为,比如设置消息的最大长度、最大优先级等。

路由交换机 (Direct exchange)

和订阅发布的区别是在交换机和队列之间有一个路由键,用来控制消息发送到哪个队列中供消费者消费。生产者给交换机一个标识,让交换机给指定的队列转发消息。

生产者代码

  1. public class DirectProducer {
  2. private static final String EXCHANGE_NAME = "direct_exchange";
  3. public static void main(String[] argv) throws Exception {
  4. ConnectionFactory factory = new ConnectionFactory();
  5. factory.setHost("localhost");
  6. try (Connection connection = factory.newConnection();
  7. Channel channel = connection.createChannel()) {
  8. //创建交换机,交换机类型是 direct
  9. channel.exchangeDeclare(EXCHANGE_NAME, "direct");
  10. Scanner scanner = new Scanner(System.in);
  11. while (scanner.hasNext()){
  12. String userInput = scanner.nextLine();
  13. //输入的时候带着标识,标识就是路由键
  14. String[] strs = userInput.split(" ");
  15. if(strs.length<1){
  16. continue;
  17. }
  18. //消息
  19. String message = strs[0];
  20. //路由键
  21. String severity = strs[1];
  22. //发送消息时带着路由键
  23. channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
  24. System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
  25. }
  26. }
  27. }
  28. }

消费者代码

  1. public class DirectConsumer {
  2. private static final String EXCHANGE_NAME = "direct_exchange";
  3. public static void main(String[] argv) throws Exception {
  4. ConnectionFactory factory = new ConnectionFactory();
  5. factory.setHost("localhost");
  6. Connection connection = factory.newConnection();
  7. Channel channel = connection.createChannel();
  8. channel.exchangeDeclare(EXCHANGE_NAME, "direct");
  9. String queueName1 = "xiaohong";
  10. String queueName2 = "xiaobai";
  11. channel.queueDeclare(queueName1, true, false, false, null);
  12. channel.queueDeclare(queueName2, true, false, false, null);
  13. //交换机使用路由键绑定队列
  14. channel.queueBind(queueName1, EXCHANGE_NAME, "xiaohong");
  15. channel.queueBind(queueName2, EXCHANGE_NAME, "xiaobai");
  16. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  17. DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
  18. String message = new String(delivery.getBody(), "UTF-8");
  19. System.out.println(" [xiaohong] Received '" +
  20. delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
  21. };
  22. DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
  23. String message = new String(delivery.getBody(), "UTF-8");
  24. System.out.println(" [xiaobai] Received '" +
  25. delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
  26. };
  27. channel.basicConsume(queueName1, true, deliverCallback1, consumerTag -> {
  28. });
  29. channel.basicConsume(queueName2, true, deliverCallback2, consumerTag -> {
  30. });
  31. }
  32. }

主题交换机 (Topic exchange)

在路由交换机的基础上,消息会具有一个模糊的路由键转发给指定的对俄(一系列的路由键、一类的路由键)

1. (*)标识匹配一个单词,比如 *.orange 表示 a.orange b.orange 都能匹配

2. (#)表示 0 个或多个单词,比如 a,#, a.a, a.b 都可以

生产者代码

  1. public class TopicProduce {
  2. private static final String EXCHANGE_NAME = "topic_exchange1";
  3. public static void main(String[] argv) throws Exception {
  4. ConnectionFactory factory = new ConnectionFactory();
  5. factory.setHost("localhost");
  6. try (Connection connection = factory.newConnection();
  7. Channel channel = connection.createChannel()) {
  8. channel.exchangeDeclare(EXCHANGE_NAME, "topic");
  9. Scanner scanner = new Scanner(System.in);
  10. while (scanner.hasNext()) {
  11. String userInput = scanner.nextLine();
  12. String[] strs = userInput.split(" ");
  13. if (strs.length < 1) {
  14. continue;
  15. }
  16. //消息
  17. String message = strs[0];
  18. //路由键
  19. String severity = strs[1];
  20. channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
  21. System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
  22. }
  23. }
  24. }
  25. }

消费者代码

  1. public class TopicConsumer {
  2. private static final String EXCHANGE_NAME = "topic_exchange1";
  3. public static void main(String[] argv) throws Exception {
  4. ConnectionFactory factory = new ConnectionFactory();
  5. factory.setHost("localhost");
  6. Connection connection = factory.newConnection();
  7. Channel channel = connection.createChannel();
  8. channel.exchangeDeclare(EXCHANGE_NAME, "topic");
  9. channel.exchangeDeclare(EXCHANGE_NAME, "topic");
  10. channel.exchangeDeclare(EXCHANGE_NAME, "topic");
  11. String queueName1 = "xiaohei";
  12. String queueName2 = "xiaolv";
  13. String queueName3 = "xiaohuang";
  14. channel.queueDeclare(queueName1, true, false, false, null);
  15. channel.queueDeclare(queueName2, true, false, false, null);
  16. channel.queueDeclare(queueName3, true, false, false, null);
  17. //交换机使用路由键绑定队列,路由键绑定在第三个参数
  18. channel.queueBind(queueName1, EXCHANGE_NAME, "#.前端.#");
  19. channel.queueBind(queueName2, EXCHANGE_NAME, "#.后端.#");
  20. channel.queueBind(queueName2, EXCHANGE_NAME, "#.产品.#");
  21. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  22. //收到消息后如何处理
  23. DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
  24. String message = new String(delivery.getBody(), "UTF-8");
  25. System.out.println(" [xiaohei] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
  26. };
  27. DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
  28. String message = new String(delivery.getBody(), "UTF-8");
  29. System.out.println(" [xiaolv] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
  30. };
  31. DeliverCallback deliverCallback3 = (consumerTag, delivery) -> {
  32. String message = new String(delivery.getBody(), "UTF-8");
  33. System.out.println(" [xiaohuang] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
  34. };
  35. channel.basicConsume(queueName1, true, deliverCallback1, consumerTag -> {
  36. });
  37. channel.basicConsume(queueName2, true, deliverCallback2, consumerTag -> {
  38. });
  39. channel.basicConsume(queueName3, true, deliverCallback3, consumerTag -> {
  40. });
  41. }
  42. }

核心机制

消息过期机制

官方文档:Preventing Unbounded Buffers with RabbitMQ | RabbitMQ

每个消息指定一个有效期,一段时间内没有被消费者处理,就过期了。

比如消费者挂了,消息一直不被处理,订单就会失效。

可以清理过期的数据,模拟延迟队列的实现。

给每条消息都设置过期时间:

  1. Map<String, Object> args = new HashMap<String, Object>();
  2. args.put("x-message-ttl", 60000);
  3. channel.queueDeclare(QUEUE_NAME, false, false, false, args);

给队列设置过期时间,设置在生产者中

  1. AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
  2. .expiration("1000")
  3. .build();

生产者代码

  1. public class TtlProducer {
  2. private final static String QUEUE_NAME = "Tll_queue";
  3. public static void main(String[] argv) throws Exception {
  4. //创建连接
  5. ConnectionFactory factory = new ConnectionFactory();
  6. //设置了本地连接,如果修改了用户名和密码,需要设置
  7. /*factory.setPassword();
  8. factory.setUsername();*/
  9. factory.setHost("localhost");
  10. //建立连接、创建频道
  11. //频道,类似客户端,用于调用server
  12. Connection connection = factory.newConnection();
  13. Channel channel = connection.createChannel();
  14. String message = "Hello World!";
  15. //发消息设置过期时间
  16. AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
  17. .expiration("1000")
  18. .build();
  19. //发送消息
  20. channel.basicPublish("",QUEUE_NAME,properties,message.getBytes(StandardCharsets.UTF_8));
  21. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  22. }
  23. }

消费者代码

  1. public class TtlConsumer {
  2. private final static String QUEUE_NAME = "Tll_queue";
  3. public static void main(String[] argv) throws Exception {
  4. ConnectionFactory factory = new ConnectionFactory();
  5. factory.setHost("localhost");
  6. Connection connection = factory.newConnection();
  7. Channel channel = connection.createChannel();
  8. //声明队列,同一个消息队列参数必须一致
  9. Map<String, Object> args = new HashMap<String, Object>();
  10. args.put("x-message-ttl", 60000);
  11. channel.queueDeclare(QUEUE_NAME, false, false, false, args);
  12. //定义了如何处理消息
  13. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  14. String message = new String(delivery.getBody(), "UTF-8");
  15. System.out.println(" [x] Received '" + message + "'");
  16. };
  17. //接收、消费消息 第二个参数 autoAck
  18. channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
  19. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  20. }
  21. }

死信队列

官方文档:Dead Letter Exchanges | RabbitMQ

为了保证消息的可靠性,比如每条消息都成功消费,需要提供一个容错机制,即:失效的消息怎么办?

死信:过期的消息、拒收的消息、消息队列满了、处理失败的消息的统称。

死信队列:处理死信的队列。

死信交换机:专门给死信队列转发消息的,存在路由绑定关系

实际就是设置一个普通的队列,专门将死信发送到这个队列中处理。

1. 创建死信交换机和死信队列

  1. //声明死信交换机
  2. String queueName = "laoban_dlx_queue";
  3. channel.queueDeclare(queueName, true, false, false, null);
  4. channel.queueBind(queueName, EXCHANGE_NAME, "laoban");

2. 给失败之后需要容错处理的队列绑定死信交换机

args2.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);

3. 绑定交换机到死信队列

args2.put("x-dead-letter-routing-key", "waibao");

生产者代码

  1. public class DLXDirectProducer {
  2. private static final String EXCHANGE_NAME = "direct2_exchange";
  3. private static final String DEAD_EXCHANGE_NAME = "dlx_direct2_exchange";
  4. public static void main(String[] argv) throws Exception {
  5. ConnectionFactory factory = new ConnectionFactory();
  6. factory.setHost("localhost");
  7. try (Connection connection = factory.newConnection();
  8. Channel channel = connection.createChannel()) {
  9. //声明死信交换机
  10. channel.exchangeDeclare(DEAD_EXCHANGE_NAME, "direct");
  11. String queueName = "laoban_dlx_queue";
  12. channel.queueDeclare(queueName, true, false, false, null);
  13. channel.queueBind(queueName, DEAD_EXCHANGE_NAME, "laoban");
  14. String queueName2 = "waibao_dlx_queue";
  15. channel.queueDeclare(queueName2, true, false, false, null);
  16. channel.queueBind(queueName2, DEAD_EXCHANGE_NAME, "waibao");
  17. channel.exchangeDeclare(EXCHANGE_NAME, "direct");
  18. DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
  19. String message = new String(delivery.getBody(), "UTF-8");
  20. System.out.println(" [laoban] Received '" +
  21. delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
  22. };
  23. DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
  24. String message = new String(delivery.getBody(), "UTF-8");
  25. System.out.println(" [waibao] Received '" +
  26. delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
  27. };
  28. channel.basicConsume(queueName, false, deliverCallback1, consumerTag -> {
  29. });
  30. channel.basicConsume(queueName2, false, deliverCallback2, consumerTag -> {
  31. });
  32. Scanner scanner = new Scanner(System.in);
  33. while (scanner.hasNext()){
  34. String userInput = scanner.nextLine();
  35. String[] strs = userInput.split(" ");
  36. if(strs.length<1){
  37. continue;
  38. }
  39. //消息
  40. String message = strs[0];
  41. //路由键
  42. String severity = strs[1];
  43. channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
  44. System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
  45. }
  46. }
  47. }
  48. }

消费者代码

  1. public class DLXDirectConsumer {
  2. private static final String EXCHANGE_NAME = "direct2_exchange";
  3. private static final String DEAD_EXCHANGE_NAME = "dlx_direct2_exchange";
  4. public static void main(String[] argv) throws Exception {
  5. ConnectionFactory factory = new ConnectionFactory();
  6. factory.setHost("localhost");
  7. Connection connection = factory.newConnection();
  8. Channel channel = connection.createChannel();
  9. channel.exchangeDeclare(EXCHANGE_NAME, "direct");
  10. //死信交换机绑定工作队列,当信息错误就从工作队列发送到死信交换机
  11. Map<String, Object> args1 = new HashMap<String, Object>();
  12. //指定绑定哪个交换机
  13. args1.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
  14. //死信要发送到哪个队列
  15. args1.put("x-dead-letter-routing-key", "laoban");
  16. String queueName1 = "doghuang";
  17. channel.queueDeclare(queueName1, true, false, false, args1);
  18. channel.queueBind(queueName1, EXCHANGE_NAME, "doghuang");
  19. //绑定cat 队列
  20. String queueName2 = "catbai";
  21. Map<String, Object> args2 = new HashMap<String, Object>();
  22. args2.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
  23. args2.put("x-dead-letter-routing-key", "waibao");
  24. channel.queueDeclare(queueName2, true, false, false, args2);
  25. channel.queueBind(queueName2, EXCHANGE_NAME, "catbai");
  26. //交换机使用路由键绑定队列
  27. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  28. DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
  29. String message = new String(delivery.getBody(), "UTF-8");
  30. channel.basicNack(delivery.getEnvelope().getDeliveryTag(),false,false);
  31. System.out.println(" [doghuang] Received '" +
  32. delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
  33. };
  34. DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
  35. String message = new String(delivery.getBody(), "UTF-8");
  36. System.out.println(" [catbai] Received '" +
  37. delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
  38. };
  39. channel.basicConsume(queueName1, false, deliverCallback1, consumerTag -> {
  40. });
  41. channel.basicConsume(queueName2, false, deliverCallback2, consumerTag -> {
  42. });
  43. }
  44. }

项目实战

项目中使用可以选择两种方法

1. 官方的客户端,兼容性好,灵活,需要自己维护管理

2. 使用封装好的客户端,比如 Spring Boot RabbitMQ Starter

优点:简单易用

缺点:不够灵活,被框架限制

小项目使用封装好的足够

1. 依赖引入

引入和自己 Spring Boot 版本相同的依赖,避免出现不能运行的错误

  1. <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. <version>2.6.13</version>
  6. </dependency>

2. 引入配置

  1. rabbitmq:
  2. host: localhost
  3. port: 5672
  4. password: guest
  5. username: guset

3. 创建交换机和消息队列,这个只需要启动一次创建即可

  1. /**
  2. * 只启动一次,创建交换机和消息队列
  3. */
  4. public class MqInitMain {
  5. private static final String EXCHANGE_NAME = "code_exchange";
  6. public static void main(String[] args) throws Exception {
  7. ConnectionFactory factory = new ConnectionFactory();
  8. factory.setHost("localhost");
  9. Connection connection = factory.newConnection();
  10. Channel channel = connection.createChannel();
  11. channel.exchangeDeclare(EXCHANGE_NAME, "direct");
  12. //绑定一个队列
  13. String queueName = "code_queue";
  14. channel.queueDeclare(queueName,true,false,false,null);
  15. channel.queueBind(queueName,EXCHANGE_NAME,"BI_routingKey");
  16. }
  17. }

4. 生产者

  1. /**
  2. * 生产者
  3. */
  4. @Component
  5. public class MyMessageProducer {
  6. @Resource
  7. private RabbitTemplate rabbitTemplate;
  8. //1.交换机名称2. 交换机路由键3.发送的消息
  9. public void sendMessage(String exchange, String routingKey,String message){
  10. rabbitTemplate.convertAndSend(exchange,routingKey,message);
  11. }
  12. }

5. 消费者

  1. /**
  2. * 消费者
  3. */
  4. @Component
  5. @Slf4j
  6. public class MessageConsumer {
  7. @RabbitListener(queues = {"code_queue"},ackMode = "MANUAL")
  8. public void receiveMessage(String message, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long delivery){
  9. log.info("receiveMessage message={}",message);
  10. try {
  11. channel.basicAck(delivery,false);
  12. } catch (IOException e) {
  13. throw new RuntimeException(e);
  14. }
  15. }
  16. }

6. 测试

  1. @SpringBootTest
  2. class MyMessageProducerTest {
  3. @Resource
  4. private MyMessageProducer myMessageProducer;
  5. @Test
  6. void sendMessage() {
  7. myMessageProducer.sendMessage("code_exchange","BI_routingKey","你好吗");
  8. }
  9. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/597462
推荐阅读
相关标签
  

闽ICP备14008679号