发送方:
pom文件:
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>org.example</groupId>
- <artifactId>springrmqtopicsender</artifactId>
- <version>1.0-SNAPSHOT</version>
-
- <properties>
- <maven.compiler.source>8</maven.compiler.source>
- <maven.compiler.target>8</maven.compiler.target>
- </properties>
-
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.4.5</version>
- <relativePath/>
- </parent>
-
-
- <dependencies>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-devtools</artifactId>
- <scope>runtime</scope>
- <optional>true</optional>
- </dependency>
-
- </dependencies>
-
- </project>
yml配置:
- spring:
- rabbitmq:
- host: 127.0.0.1
- port: 5672
- username: guest
- password: guest
config配置:
- package org.example.config;
-
- import org.springframework.amqp.core.TopicExchange;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- /**
- * 主题交换机
- * topic策略可以根据routingKey的规则(通配符方式)进行去匹配队列进行转发规则为*.#.*
- */
- @Configuration
- public class RabbitTopicConfig
- {
-
-
- public final static String TOPIC_NAME = "amqp-topic";
-
-
- @Bean
- TopicExchange topicExchange()
- {
-
- return new TopicExchange(TOPIC_NAME,true,false);
-
- }
-
-
- }
发送代码:
- package org.example.sender;
-
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- /**
- * 消息生产者 发送消息
- */
- @Component
- public class MessageSender {
-
- @Autowired
- RabbitTemplate rabbitTemplate;
-
- /**
- * 发送消息
- * @param info
- */
- public void send(String info)
- {
-
- System.out.println("发送消息>>>"+info);
-
-
- rabbitTemplate.convertAndSend("amqp-topic","huawei.a",info);
- }
-
- }
启动服务发送:
- package org.example.controller;
-
- import org.example.sender.MessageSender;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- /**
- * @Auther: moerhai@qq.com
- * @Date: 2020/10/4 11:34
- */
- @RestController
- public class IndexController {
-
- @Autowired
- MessageSender messageSender;
-
- @RequestMapping("/index")
- public String index()
- {
-
- messageSender.send("中国——路由——华为");
- return "SUCCESS";
- }
-
- }
- package org.example;
-
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
-
- @SpringBootApplication
- public class HelloWorldApplication
- {
-
- public static void main(String[] args)
- {
-
- SpringApplication.run(HelloWorldApplication.class, args);
-
- }
- }
==================================================================
手动ack:
pom文件:
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>org.example</groupId>
- <artifactId>springrmqtopicreceiver</artifactId>
- <version>1.0-SNAPSHOT</version>
-
- <properties>
- <maven.compiler.source>8</maven.compiler.source>
- <maven.compiler.target>8</maven.compiler.target>
- </properties>
-
-
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.4.5</version>
- <relativePath/>
- </parent>
-
-
- <dependencies>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-devtools</artifactId>
- <scope>runtime</scope>
- <optional>true</optional>
- </dependency>
-
- </dependencies>
-
- </project>
yml文件:
- spring:
- rabbitmq:
- host: 127.0.0.1
- port: 5672
- username: guest
- password: guest
- #关闭自动ack,设置为手动ack
- listener:
- simple:
- acknowledge-mode: manual
配置文件:
- package org.example.config;
-
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.core.TopicExchange;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- /**
- * 主题交换机
- * topic策略可以根据routingKey的规则(通配符方式)进行去匹配队列进行转发规则为*.#.*
- */
- @Configuration
- public class RabbitTopicConfig {
-
- public final static String TOPIC_NAME = "amqp-topic";
-
- @Bean
- TopicExchange topicExchange(){
- return new TopicExchange(TOPIC_NAME,true,false);
- }
-
- @Bean
- Queue xiaomi(){
- return new Queue("xiaomi");
- }
-
- @Bean
- Queue huawei(){
- return new Queue("huawei");
- }
-
- @Bean
- Binding xiaomiBinding(){
- //xiaomi.#:表示消息的routingKey是以xiaomi开头的就会路由到xiaomi的队列
- return BindingBuilder.bind(xiaomi()).to(topicExchange()).with("xiaomi.#");
- }
-
- @Bean
- Binding huaweiBinding(){
- return BindingBuilder.bind(huawei()).to(topicExchange()).with("huawei.#");
- }
- }
接收信息,并手动签收:
- package org.example.receiver;
-
- import com.rabbitmq.client.Channel;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- import java.io.IOException;
-
- /**
- */
- @Component
- public class TopicReceiver {
-
- //分别监听名称为xiaomi、huawei的队列
- @RabbitListener(queues = "xiaomi")
- public void handlerXM(Message message,String msg, Channel channel) throws IOException {
-
- System.out.println("小米:"+msg);
-
- //手动签收,不启动批量签收
- channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
- }
-
-
- @RabbitListener(queues = "huawei")
- public void handlerHW(Message message,String msg, Channel channel) throws IOException {
-
-
- System.out.println("华为:"+msg);
-
- channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
- }
-
-
- }
启动服务:
- package org.example;
-
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
-
- @SpringBootApplication
- public class HelloWorldApplication
- {
-
- public static void main(String[] args)
- {
-
- SpringApplication.run(HelloWorldApplication.class, args);
-
- }
- }