当前位置:   article > 正文

java——spring boot集成RabbitMQ——如何实现手动ack——topic——路由模式——代码展示...

springboot rabbitmq 开启手动ack

发送方:

pom文件:

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>org.example</groupId>
  7. <artifactId>springrmqtopicsender</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <properties>
  10. <maven.compiler.source>8</maven.compiler.source>
  11. <maven.compiler.target>8</maven.compiler.target>
  12. </properties>
  13. <parent>
  14. <groupId>org.springframework.boot</groupId>
  15. <artifactId>spring-boot-starter-parent</artifactId>
  16. <version>2.4.5</version>
  17. <relativePath/>
  18. </parent>
  19. <dependencies>
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-web</artifactId>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.springframework.boot</groupId>
  26. <artifactId>spring-boot-starter-amqp</artifactId>
  27. </dependency>
  28. <dependency>
  29. <groupId>org.springframework.boot</groupId>
  30. <artifactId>spring-boot-devtools</artifactId>
  31. <scope>runtime</scope>
  32. <optional>true</optional>
  33. </dependency>
  34. </dependencies>
  35. </project>

yml配置:

  1. spring:
  2. rabbitmq:
  3. host: 127.0.0.1
  4. port: 5672
  5. username: guest
  6. password: guest

config配置:

  1. package org.example.config;
  2. import org.springframework.amqp.core.TopicExchange;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. /**
  6. * 主题交换机
  7. * topic策略可以根据routingKey的规则(通配符方式)进行去匹配队列进行转发规则为*.#.*
  8. */
  9. @Configuration
  10. public class RabbitTopicConfig
  11. {
  12. public final static String TOPIC_NAME = "amqp-topic";
  13. @Bean
  14. TopicExchange topicExchange()
  15. {
  16. return new TopicExchange(TOPIC_NAME,true,false);
  17. }
  18. }

发送代码:

  1. package org.example.sender;
  2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * 消息生产者 发送消息
  7. */
  8. @Component
  9. public class MessageSender {
  10. @Autowired
  11. RabbitTemplate rabbitTemplate;
  12. /**
  13. * 发送消息
  14. * @param info
  15. */
  16. public void send(String info)
  17. {
  18. System.out.println("发送消息>>>"+info);
  19. rabbitTemplate.convertAndSend("amqp-topic","huawei.a",info);
  20. }
  21. }

启动服务发送:

  1. package org.example.controller;
  2. import org.example.sender.MessageSender;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.web.bind.annotation.RequestMapping;
  5. import org.springframework.web.bind.annotation.RestController;
  6. /**
  7. * @Auther: moerhai@qq.com
  8. * @Date: 2020/10/4 11:34
  9. */
  10. @RestController
  11. public class IndexController {
  12. @Autowired
  13. MessageSender messageSender;
  14. @RequestMapping("/index")
  15. public String index()
  16. {
  17. messageSender.send("中国——路由——华为");
  18. return "SUCCESS";
  19. }
  20. }

  1. package org.example;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class HelloWorldApplication
  6. {
  7. public static void main(String[] args)
  8. {
  9. SpringApplication.run(HelloWorldApplication.class, args);
  10. }
  11. }

 

 

==================================================================

手动ack:

pom文件:

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>org.example</groupId>
  7. <artifactId>springrmqtopicreceiver</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <properties>
  10. <maven.compiler.source>8</maven.compiler.source>
  11. <maven.compiler.target>8</maven.compiler.target>
  12. </properties>
  13. <parent>
  14. <groupId>org.springframework.boot</groupId>
  15. <artifactId>spring-boot-starter-parent</artifactId>
  16. <version>2.4.5</version>
  17. <relativePath/>
  18. </parent>
  19. <dependencies>
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-web</artifactId>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.springframework.boot</groupId>
  26. <artifactId>spring-boot-starter-amqp</artifactId>
  27. </dependency>
  28. <dependency>
  29. <groupId>org.springframework.boot</groupId>
  30. <artifactId>spring-boot-devtools</artifactId>
  31. <scope>runtime</scope>
  32. <optional>true</optional>
  33. </dependency>
  34. </dependencies>
  35. </project>

yml文件:

  1. spring:
  2. rabbitmq:
  3. host: 127.0.0.1
  4. port: 5672
  5. username: guest
  6. password: guest
  7. #关闭自动ack,设置为手动ack
  8. listener:
  9. simple:
  10. acknowledge-mode: manual

配置文件:

  1. package org.example.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.amqp.core.TopicExchange;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. /**
  9. * 主题交换机
  10. * topic策略可以根据routingKey的规则(通配符方式)进行去匹配队列进行转发规则为*.#.*
  11. */
  12. @Configuration
  13. public class RabbitTopicConfig {
  14. public final static String TOPIC_NAME = "amqp-topic";
  15. @Bean
  16. TopicExchange topicExchange(){
  17. return new TopicExchange(TOPIC_NAME,true,false);
  18. }
  19. @Bean
  20. Queue xiaomi(){
  21. return new Queue("xiaomi");
  22. }
  23. @Bean
  24. Queue huawei(){
  25. return new Queue("huawei");
  26. }
  27. @Bean
  28. Binding xiaomiBinding(){
  29. //xiaomi.#:表示消息的routingKey是以xiaomi开头的就会路由到xiaomi的队列
  30. return BindingBuilder.bind(xiaomi()).to(topicExchange()).with("xiaomi.#");
  31. }
  32. @Bean
  33. Binding huaweiBinding(){
  34. return BindingBuilder.bind(huawei()).to(topicExchange()).with("huawei.#");
  35. }
  36. }

接收信息,并手动签收:

  1. package org.example.receiver;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. import java.io.IOException;
  7. /**
  8. */
  9. @Component
  10. public class TopicReceiver {
  11. //分别监听名称为xiaomi、huawei的队列
  12. @RabbitListener(queues = "xiaomi")
  13. public void handlerXM(Message message,String msg, Channel channel) throws IOException {
  14. System.out.println("小米:"+msg);
  15. //手动签收,不启动批量签收
  16. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  17. }
  18. @RabbitListener(queues = "huawei")
  19. public void handlerHW(Message message,String msg, Channel channel) throws IOException {
  20. System.out.println("华为:"+msg);
  21. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  22. }
  23. }

启动服务:

  1. package org.example;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class HelloWorldApplication
  6. {
  7. public static void main(String[] args)
  8. {
  9. SpringApplication.run(HelloWorldApplication.class, args);
  10. }
  11. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/127121
推荐阅读
相关标签