赞
踩
RabbitMQ是一个接收,存储和转发消息数据的消息中间件。主要有四大核心部分,生产者、交换机、队列和消费者,工作原理如下图:
Broker:接收和分发消息的应用
Connection:publisher/consumer和broker之间的TCP连接
Channel:在connection内部建立的逻辑连接,作为轻量级的connection极大减少了操作系统建立TCP connection的开销
Exchange:交换机,message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去,常用的类型有:direct(point-to-point)、topic(publish-subscribe)和fanout(multicast)
Queue:消息最终被送到这里等待consumer取走
Binding:exchange和queue之间的虚拟连接,binding中科院包含routing key,Binding信息被保存到exchange中的查询表中,用于message的分发依据
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
application:
name: springboot-rabbitmq
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
rabbitmq:
host: xx.xx.xx.xx
port: 5672
virtual-host: /
publisher-confirm-type: correlated #发布消息成功到交换机后会触发回调方法
publisher-returns: true #确认消息发送到队列
username: admin
password: 123456
@Configuration public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class); @PostConstruct private void init() { //true:交换机无法将消息进行路由时,会将该消息返回给生产者;false:如果发现消息无法进行路由,则直接丢弃 rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String id = correlationData != null ? correlationData.getId() : ""; if (ack) { logger.info("交换机接收到id为:{}的消息确认成功!", id); } else { logger.info("id为:{}的消息未成功投递到交换机,原因是:{}", id, cause); } } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { logger.error("消息:{}被交换机:{}退回,退回码是:{},退回原因是:{},路由是:{}", message.getBody(), exchange, replyCode, replyText, routingKey); } }
交换机一方面接收来自生产者的消息,另一方面将消息推入队列,交换机必须明确应该把这些消息放入特定队列还是放入多个队列,或者直接丢弃。
交换机的类型有:直接(direct)、主题(topic)、标题(headers)、扇出(fanout)
消息只会投递到绑定的routingKey队列之中,如下图:
X交换机绑定了两个队列,队列Q1绑定键为orange,队列Q2绑定键为black和green。在这种绑定情况下,生产者发布消息到交换机,绑定键位orange的消息会被发布到队列Q1,绑定键位black和green的消息会被发布到队列Q2,其他消息则会被直接丢弃。
如果绑定多个队列的键都相同,则跟fanout类似,跟广播一样
@Configuration public class RabbitDirectConfig { private static final String DIRECT_EXCHANGE_NAME = "direct-exchange"; private static final String DIRECT_QUEUE_NAME = "direct-queue"; private static final String ROUTING_KEY = "direct.route.key"; /** * 声明Exchange * @return */ @Bean(name = "directExchange") public DirectExchange directExchange() { return new DirectExchange(DIRECT_EXCHANGE_NAME); } /** * 声明队列 * @return */ @Bean(name = "directQueue") public Queue directQueue() { return QueueBuilder.durable(DIRECT_QUEUE_NAME).build(); } /** * 声明确认队列绑定关系 * @param queue * @param exchange * @return */ @Bean public Binding queueBinding(@Qualifier("directQueue") Queue queue, @Qualifier("directExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY); }
@RestController @RequestMapping("/direct") public class DirectProducerController { private static final String DIRECT_EXCHANGE_NAME = "direct-exchange"; private static final String ROUTING_KEY = "direct.route.key"; @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("sendMessage/{message}") public void sendMessage(@PathVariable String message) { String uuid = UUID.randomUUID().toString(); CorrelationData correlationData = new CorrelationData(uuid); rabbitTemplate.convertAndSend(DIRECT_EXCHANGE_NAME, ROUTING_KEY, message + ROUTING_KEY, correlationData); } }
@Component
public class DirectConsumer {
private static final String DIRECT_QUEUE_NAME = "direct-queue";
private static Logger logger = LoggerFactory.getLogger(ConfirmConsumer.class);
@RabbitListener(queues = DIRECT_QUEUE_NAME)
public void receiveMsg(Message message) {
String msg = new String(message.getBody());
logger.info("接收到队列direct.queue消息:{}", msg);
}
}
topic交换机的routingKey必须满足一定的规则,即以点号分隔开的单词列表,最多不超过255个字节。
(星号)代替一个单词
#(井号)代替零个或多个单词
例如Q1绑定的routingKey是(topic.*.*),Q2绑定的routingKey是(*.*.rabbit)
当routingKey中以topic开头的消息都会被Q1接收,同理,以rabbit结尾的消息都会被Q2接收。当一个队列绑定键routingKey是#时,那么这个队列将接收所有数据,类似于fanout,如果队列绑定routingKey中没有#和,那么该队列绑定类型就是direct.
@Configuration public class RabbitTopicConfig { private static final String TOPIC_EXCHANGE_NAME = "topic-exchange"; private static final String TOPIC_QUEUE_NAME_A = "topic-queue-A"; private static final String TOPIC_QUEUE_NAME_B = "topic-queue-B"; private static final String ROUTING_KEY_A = "topic.*.*"; private static final String ROUTING_KEY_B = "*.*.rabbit"; /** * 声明Exchange * @return */ @Bean(name = "topicExchange") public TopicExchange topicExchange() { return new TopicExchange(TOPIC_EXCHANGE_NAME); } /** * 声明队列 * @return */ @Bean(name = "topicQueueA") public Queue topicQueueA() { return QueueBuilder.durable(TOPIC_QUEUE_NAME_A).build(); } /** * 声明队列 * @return */ @Bean(name = "topicQueueB") public Queue topicQueueB() { return QueueBuilder.durable(TOPIC_QUEUE_NAME_B).build(); } /** * 声明确认队列绑定关系 * @param queue * @param exchange * @return */ @Bean public Binding topicQueueBindingA(@Qualifier("topicQueueA") Queue queue, @Qualifier("topicExchange") TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_A); } /** * 声明确认队列绑定关系 * @param queue * @param exchange * @return */ @Bean public Binding topicQueueBindingB(@Qualifier("topicQueueB") Queue queue, @Qualifier("topicExchange") TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_B); } }
@RestController @RequestMapping("/topic") public class TopicProducerController { private static final String TOPIC_EXCHANGE_NAME = "topic-exchange"; private static final String ROUTING_KEY_A = "topic.*.*"; private static final String ROUTING_KEY_B = "*.*.rabbit"; @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("sendMessage/{message}") public void sendMessage(@PathVariable String message) { String uuid = UUID.randomUUID().toString(); CorrelationData correlationData = new CorrelationData(uuid); rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY_A, message, correlationData); } }
@Component public class TopicConsumer { private static final String TOPIC_QUEUE_NAME_A = "topic-queue-A"; private static final String TOPIC_QUEUE_NAME_B = "topic-queue-B"; private static Logger logger = LoggerFactory.getLogger(TopicConsumer.class); @RabbitListener(queues = TOPIC_QUEUE_NAME_A) public void receiveMsgA(Message message) { String msg = new String(message.getBody()); logger.info("接收到队列topic.queueA消息:{}", msg); } @RabbitListener(queues = TOPIC_QUEUE_NAME_B) public void receiveMsgB(Message message) { String msg = new String(message.getBody()); logger.info("接收到队列topic.queueB消息:{}", msg); } }
接收到所有消息广播到它知道的所有队列中
@Configuration public class RabbitFanoutConfig { private static final String FANOUT_EXCHANGE_NAME = "fanout-exchange"; private static final String FANOUT_QUEUE_NAME_A = "fanout-queue-A"; private static final String FANOUT_QUEUE_NAME_B = "fanout-queue-B"; private static final String FANOUT_QUEUE_NAME_C = "fanout-queue-C"; /** * 声明Exchange * @return */ @Bean(name = "fanoutExchange") public FanoutExchange fanoutExchange() { return new FanoutExchange(FANOUT_EXCHANGE_NAME); } /** * 声明队列 * @return */ @Bean(name = "fanoutQueueA") public Queue fanoutQueueA() { return QueueBuilder.durable(FANOUT_QUEUE_NAME_A).build(); } /** * 声明队列 * @return */ @Bean(name = "fanoutQueueB") public Queue fanoutQueueB() { return QueueBuilder.durable(FANOUT_QUEUE_NAME_B).build(); } /** * 声明队列 * @return */ @Bean(name = "fanoutQueueC") public Queue fanoutQueueC() { return QueueBuilder.durable(FANOUT_QUEUE_NAME_C).build(); } /** * 声明确认队列绑定关系 * @param queue * @param exchange * @return */ @Bean public Binding queueBindingA(@Qualifier("fanoutQueueA") Queue queue, @Qualifier("fanoutExchange") FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } /** * 声明确认队列绑定关系 * @param queue * @param exchange * @return */ @Bean public Binding queueBindingB(@Qualifier("fanoutQueueB") Queue queue, @Qualifier("fanoutExchange") FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } /** * 声明确认队列绑定关系 * @param queue * @param exchange * @return */ @Bean public Binding queueBindingC(@Qualifier("fanoutQueueC") Queue queue, @Qualifier("fanoutExchange") FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } }
@RestController @RequestMapping("/fanout") public class FanoutProducerController { private static final String FANOUT_EXCHANGE_NAME = "fanout-exchange"; @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("sendMessage/{message}") public void sendMessage(@PathVariable String message) { String uuid = UUID.randomUUID().toString(); CorrelationData correlationData = new CorrelationData(uuid); rabbitTemplate.convertAndSend(FANOUT_EXCHANGE_NAME, null, message, correlationData); } }
@Component public class FanoutConsumer { private static final String FANOUT_QUEUE_NAME_A = "fanout-queue-A"; private static final String FANOUT_QUEUE_NAME_B = "fanout-queue-B"; private static final String FANOUT_QUEUE_NAME_C = "fanout-queue-C"; private static Logger logger = LoggerFactory.getLogger(ConfirmConsumer.class); @RabbitListener(queues = FANOUT_QUEUE_NAME_A) public void receiveMsgA(Message message) { String msg = new String(message.getBody()); logger.info("接收到队列fanout.queueA消息:{}", msg); } @RabbitListener(queues = FANOUT_QUEUE_NAME_B) public void receiveMsgB(Message message) { String msg = new String(message.getBody()); logger.info("接收到队列fanout.queueB消息:{}", msg); } @RabbitListener(queues = FANOUT_QUEUE_NAME_C) public void receiveMsgC(Message message) { String msg = new String(message.getBody()); logger.info("接收到队列fanout.queueC消息:{}", msg); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。