赞
踩
主要是基于springboot框架,配置好rabbitMQ服务器,创建两个微服务分别实现消费者和生产者。
1.交换机的创建
2.队列的创建
3.交换机与队列信息的绑定
前三步都是基于RabbitMQ的网页版管理页面实现,基本有手就行,但是创建队列与交换机时会有一些参数的选择,我来简述一下各个参数的含义
type:代表交换机的种类,主要区别就是在绑定队列,根据路由键选取队列传递信息上的区别。
直接交换机(Direct Exchange): 直接交换机将消息发送到与指定路由键完全匹配的队列。只有当消息的路由键与队列的绑定路由键完全相同时,消息才会被路由到队列。这是一种点对点的消息路由模式。
扇出交换机(Fanout Exchange): 扇出交换机将消息发送到与交换机绑定的所有队列,无论消息的路由键是什么。这种模式适用于广播消息给多个消费者的场景。
主题交换机(Topic Exchange): 主题交换机根据消息的路由键和绑定队列的通配符模式来进行匹配。这允许更复杂的消息路由策略,可以根据路由键的一部分进行匹配。例如,可以使用通配符*
(匹配一个单词)和#
(匹配零个或多个单词)来定义路由键的模式。
头交换机(Headers Exchange): 头交换机使用消息的属性(而不是路由键)来匹配队列。发送者在发送消息时可以定义一组键值对属性,这些属性将与队列的绑定条件进行匹配。这种模式适用于基于消息属性的路由。
Durability:是否持久化,如果你将 Durability
设置为 true
,那么创建的交换机会被标记为持久的(durable),这意味着它的定义会被保存到磁盘上。即使 RabbitMQ 服务器在创建交换机后重启,该交换机的定义也会被保留,不会丢失。这是一种防止定义丢失的机制。
4.定义生产者
生产者这里首先要开启微服务,利用springboot中application.yaml上的配置信息连接到exchange上,然后实现消息的序列化,回调函数的实现,以及消息发送。
5.定义消费者
消费者就比较简单了,连接到RabbitMQ服务,然后利用注解设立RabbitMQ消费者,连接到对应的队列,就可以实现消息的接收,连接到队列后,RabbitMQ控制端会显示出连接信息,包括消费者的地址信息,tag等。
后面实现信息消费时,会根据这个信息发送到对应的消费者。
- spring:
- rabbitmq:
- host: localhost
- port: 5672
- username: guest
- password: guest
- publisherConfirms: true
- publisherReturns: true
- template:
- mandatory: true
序列化信息以及获取回调信息
- @Bean
- public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
- final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
- rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
- rabbitTemplate.setMandatory(true);
- rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
- @Override
- public void returnedMessage(Message message, int replyCode, String replyText,
- String exchange, String routingKey) {
- try {
- System.out.println("--------收到无法路由回发的消息--------");
- System.out.println("ReturnCallback: " + "消息:" + message);
- System.out.println("ReturnCallback: " + "回应码:" + replyCode);
- System.out.println("ReturnCallback: " + "回应信息:" + replyText);
- System.out.println("ReturnCallback: " + "交换机:" + exchange);
- System.out.println("ReturnCallback: " + "路由键:" + routingKey);
- System.out.println("properties:" + message.getMessageProperties());
- System.out.println("body:" + new String(message.getBody(), "UTF-8"));
- } catch (UnsupportedEncodingException e) {
- e.printStackTrace();
- }
- }
- });
-
- rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- System.out.println("--------收到服务端异步确认--------");
- System.out.println("ConfirmCallback: " + "相关数据:" + correlationData);
- System.out.println("ConfirmCallback: " + "确认情况:" + ack);
- System.out.println("ConfirmCallback: " + "原因:" + cause);
- }
- });
- return rabbitTemplate;
- }
序列化主要是转为json实现消息的传递
returnedMessage:实现交换机无法接收信息后的处理逻辑
ConfirmCallback:服务器端异步确认的实现
这么设计主要由于RabbitMQ保证可靠性的机制,确保生产者已经将消息交给RabbitMQ:
发送消息:
- final String directExchange="exchange2";
-
- // 自定义的模板,所有的消息都会转换成JSON发送
- @Autowired
- AmqpTemplate amqpTemplate;
-
-
-
- public void send(String message) {
- // 发送JSON字符串
- ObjectMapper mapper = new ObjectMapper();
- String sycMsg = null;
- try {
- sycMsg = mapper.writeValueAsString(message);
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- }
- //System.out.println(sycMsg);
- amqpTemplate.convertAndSend(directExchange,"spring.#", sycMsg);
- }
convertAndSend方法的三个传参分别是,交换机名称,路由键,消息内容
消费者实现:
- @RabbitHandler
- @RabbitListener(queues = "queue4")
- public void Show2(Object message){
- System.out.println("queue4订阅的消息为:"+message);
- }
@RabbitListener
注解: 这个注解用于标记一个方法,表示它是一个消息监听器,会监听指定队列中的消息。在这个注解中,你可以指定队列的名称,表示这个监听器关注哪个队列。在你的例子中,@RabbitListener(queues = "queue4")
表示这个监听器关注名为 "queue4" 的队列。
@RabbitHandler
注解: 这个注解用于标记被 @RabbitListener
注解的方法中的具体处理方法。如果一个消息监听器监听的队列可能接收多种不同类型的消息,你可以在同一个监听器类中定义多个处理方法,并用 @RabbitHandler
标记每个方法来处理不同类型的消息。@RabbitHandler
可以根据消息的类型来选择调用哪个处理方法。
消费者处理消息的流程:
1.基于什么协议实现消息的传递
RabbitMQ基于AMQP(Advanced Message Queuing Protocol)协议实现消息的传递
2.RabbitMQ队列的种类有哪些
普通队列(Standard Queue): 普通队列是最基本的队列类型。消息会按照先进先出(FIFO)的顺序排列在队列中。多个消费者可以从队列中获取消息,但每条消息只会被一个消费者接收。
优先级队列(Priority Queue): 优先级队列允许消息设置不同的优先级。具有更高优先级的消息将在队列中排在具有较低优先级的消息之前,以确保高优先级消息尽快被处理。
延迟队列(Delayed Queue): 延迟队列允许将消息延迟发送到队列中。消息会在指定的延迟时间之后才可被消费者获取。这对于处理定时任务和调度任务非常有用。
死信队列(Dead Letter Queue,DLQ): 死信队列用于处理无法被消费者成功处理的消息。如果消息因为某种原因无法被消费,它们可以被重新路由到死信队列,以便后续处理或分析失败的消息。
绑定队列(Binding Queue): 绑定队列是一种特殊的队列,它不用于存储消息,而是用于表示队列和交换机之间的绑定关系。消息不会在绑定队列中排队等待,而是直接路由到与之绑定的队列。
镜像队列(Mirrored Queue): 镜像队列是一种高可用性队列,它将队列的数据镜像到多个节点,以防止单个节点的故障导致消息丢失。
自动删除队列(Auto-Delete Queue): 自动删除队列是一种在不再使用时自动删除自身的队列类型。当最后一个消费者断开连接后,队列会被自动删除。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。