赞
踩
导入Rabbit依赖包
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.dataformat</groupId>
- <artifactId>jackson-dataformat-xml</artifactId>
- <version>2.9.10</version>
- </dependency>
配置文件进行配置Rabbit
- spring:
- rabbitmq:
- host: # 主机名
- port: # 端口
- virtual-host: / # 虚拟主机
- username: # 用户名
- password: # 密码
- listener:
- simple:
- prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
-
- # publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
- # publisher-returns: true #确认消息已发送到队列(Queue)
- # template:
- # mandatory: true
- listener:
- simple:
- prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
- # acknowledge-mode: auto
- # retry:
- # enabled: true # 开启消费者失败重试
- # initial-interval: 1000 # 初识的失败等待时长为1秒
- # multiplier: 3 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
- # max-attempts: 3 # 最大重试次数
- # stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
- # concurrency: 2000
先配置rabbit,设置队列,交换机,key
- @Configuration
- public class RabbitConfig {
-
- //创建队列
-
- @Bean
-
- public Queue queueMessage() {
-
- return new Queue("Queue1",false,false,false,null);
-
- }
-
- //创建队列
-
- @Bean
-
- public Queue queueMessages() {
-
- return new Queue("Queue2",false,false,false,null);
-
- }
-
- //创建交换器
-
- @Bean
-
- TopicExchange exchange() {
-
- return new TopicExchange("topicExchange",false,false,null);
-
- }
-
- //对列绑定并关联到ROUTINGKEY
-
- @Bean
-
- Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
-
- return BindingBuilder.bind(queueMessage).to(exchange).with("key.#");
-
- }
-
- //对列绑定并关联到ROUTINGKEY
-
- @Bean
-
- Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
-
- return BindingBuilder.bind(queueMessages).to(exchange).with("key2.#");//*表示一个词,#表示零个或多个词
-
- }
-
- }
项目发送消息队列
- @Autowired
- private RabbitTemplate rabbitTemplate;
- //"topicExchange"为交换机,key1为key,msg为发送的消息
- rabbitTemplate.convertAndSend("topicExchange","key1",msg);
消费者进行监听
- @Component
- public class TopicListener {
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "Queue1"),
- exchange = @Exchange(name = "topicExchange", type = ExchangeTypes.TOPIC),
- key = "key1"
- ))
- public void listenTopicQueue1(String msg){
- System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
- }
-
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "Queue2"),
- exchange = @Exchange(name = "topicExchange", type = ExchangeTypes.TOPIC),
- key = "key2"
- ))
- public void listenTopicQueue2(String msg){
- System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
- }
- }
完成 localhost:15672/#/exchanges 账号密码一般都为 guest(个人设定)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。