赞
踩
发布/订阅模式(Publish/Subscribe)是 RabbitMQ 中常见的一种消息传递模式,用于将消息广播给多个消费者。在这种模式中,消息发送者(发布者)将消息发送到一个交换机(exchange),交换机将消息广播到所有与之绑定的队列,然后消费者(订阅者)可以从这些队列中接收消息。
生产者发送消息: 生产者将消息发送到一个交换机,而不是直接发送到队列。
交换机将消息广播: 交换机接收到消息后,根据预定义的规则将消息广播到所有与之绑定的队列。这个过程称为消息路由。
多个消费者监听队列: 多个消费者可以分别监听不同的队列,或者监听同一个队列。
消息处理: 每个消费者接收到广播的消息后,进行相应的处理。每个消息只会被消费一次,但是可以被多个消费者同时处理。
发布/订阅模式适用于需要将消息广播给多个消费者的场景,例如实时通知、日志记录、事件处理等。
这里为了方便和速度就不配置yml文件中,直接编辑,这里配置两个队列
交换机名称: exchange_sub
队列一名称: queue_sub_01
队列一名称: queue_sub_02
在SubConfig文件中配置
这里方便区分,新建了文件SubConfig,每个工作模式创建队列和交换机的过程区分开,全都配置到RabbitmqConfig文件中也是可以的,同时也可以通过RabbitAdmin进行绑定(另一种方式)。
- package com.model.config;
-
- import org.springframework.amqp.core.*;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- /**
- * @Author: Haiven
- * @Time: 2024/4/19 16:29
- * @Description: TODO
- */
- @Configuration
- public class SubConfig {
-
- /**
- * 发布/订阅模式的交换机
- * @return exchange
- */
- @Bean(name = "subExchange")
- public Exchange getSubExchange(){
- return ExchangeBuilder
- .fanoutExchange("exchange_sub")
- .durable(true)
- .build();
- }
-
- /**
- * 发布/订阅模式的队列 1
- * @return 队列 1
- */
- @Bean(name = "subQueue01")
- public Queue getSubQueue01(){
- return QueueBuilder
- .durable("queue_sub_01")
- .build();
- }
-
- /**
- * 发布/订阅模式的队列 2
- * @return 队列 2
- */
- @Bean(name = "subQueue02")
- public Queue getSubQueue02(){
- return QueueBuilder
- .durable("queue_sub_02")
- .build();
- }
-
- /**
- * 绑定队列01
- * @return binding
- */
- @Bean
- public Binding getSubBinding01(){
- return BindingBuilder
- .bind(getSubQueue01())
- .to(getSubExchange())
- // 通配符模式 要匹配的路由键 此处为发布/订阅模式 填""就可以
- .with("")
- .noargs();
- }
-
- /**
- * 绑定队列02
- * @return binding
- */
- @Bean
- public Binding getSubBinding02(){
- return BindingBuilder
- .bind(getSubQueue02())
- .to(getSubExchange())
- // 通配符模式 要匹配的路由键 此处为发布/订阅模式 填""就可以
- .with("")
- .noargs();
- }
- }
SubConsumer
- package com.model.listener;
-
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- /**
- * @Author: Haiven
- * @Time: 2024/4/19 16:44
- * @Description: TODO
- */
- @Component
- public class SubConsumer {
-
- @RabbitListener(queues = {"queue_sub_01"})
- public void subConsumer01(String msg){
- System.out.println("消费者 -01- 接收消息:" + msg);
- }
-
- @RabbitListener(queues = {"queue_sub_02"})
- public void subConsumer02(String msg){
- System.out.println("消费者 -02- 接收消息:" + msg);
- }
- }
- package com.model.controller;
-
- import com.code.domain.Response;
- import com.model.service.RabbitService;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import javax.annotation.Resource;
-
-
- /**
- * @Author: Haiven
- * @Time: 2024/4/19 9:46
- * @Description: TODO
- */
- @RestController
- @RequestMapping("/producer")
- public class ProducerController {
-
- @Resource
- private RabbitService rabbitService;
-
- @GetMapping("/simple")
- public Response<Void> simple(String msg){
- boolean res = rabbitService.simple(msg);
- return res ? Response.success() : Response.fail();
- }
-
- @GetMapping("/work")
- public Response<Void> work(String msg){
- boolean res = rabbitService.work(msg);
- return res ? Response.success() : Response.fail();
- }
-
- @GetMapping("/sub")
- public Response<Void> sub(String msg){
- boolean res = rabbitService.sub(msg);
- return res ? Response.success() : Response.fail();
- }
- }
- package com.model.service.impl;
-
- import com.model.service.RabbitService;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Service;
-
- import javax.annotation.Resource;
-
- /**
- * @Author: Haiven
- * @Time: 2024/4/19 10:51
- * @Description: TODO
- */
- @Service
- @Slf4j
- public class RabbitServiceImpl implements RabbitService {
-
- @Resource
- private RabbitTemplate rabbitTemplate;
-
- @Value("${rabbitmq.simple.queue}")
- private String simpleQueue;
-
- @Value("${rabbitmq.work.queue}")
- private String workQueue;
-
- @Override
- public boolean simple(String msg) {
- try {
- rabbitTemplate.convertAndSend(simpleQueue, msg);
- return true;
- }catch (Exception e){
- e.printStackTrace();
- return false;
- }
- }
-
- @Override
- public boolean work(String msg) {
- try {
- rabbitTemplate.convertAndSend(workQueue, msg);
- return true;
- }catch (Exception e){
- e.printStackTrace();
- return false;
- }
- }
-
- @Override
- public boolean sub(String msg) {
- try {
- //路由模式就不能直接发送消息到队列了, 而是发送到交换机,由交换机进行广播, routingKey为路由Key 订阅模式给""
- rabbitTemplate.convertAndSend("exchange_sub","", msg);
- return true;
- }catch (Exception e){
- e.printStackTrace();
- return false;
- }
- }
- }
发送成功
可以发现,发布/订阅模式下,推送到交换机的消息,会被所有绑定了交换机的队列接收
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。