赞
踩
目录
Message Queue (MQ)是一种跨进程的通信机制,用于在系统之间进行传递消息。MQ作为消息中间件,可以进行异步处理请求,从而减少请求响应时间和解耦
多应用间通过消息队列对同一消息进行处理,避免调用接口失败导致整个过程失败。
应用间并发处理消息,相比串行处理,减少处理时间;
比如用户注册,系统需要发送注册邮件,验证短信。MQ作为消息中间件,可以进行异步处理请求,从而减少请求响应时间。
广泛应用于高并发系统中,避免流量过大导致处理请求阻塞的情况
RabbitMQ是支持多种消息协议,易于部署和使用的开源消息代理服务器,用于在分布式系统中存储转发消息。
由以高性能、健壮以及可伸缩性出名的Erlang语言编写; |
提供了成熟的高并发,高可用的解决方案 可以根据实际业务情况动态地扩展集群节点。 |
在集群中的机器上设置镜像,使得在部分节点出现问题的情况下仍然可用。 |
支持多种客户端语言,如:Python、Ruby、.NET、Java等,支持AJAX。 |
RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息、集群中的节点等。 |
MQ产品 | 语言支持 | 并发量 | 消息回溯支持 | 性能 | 协议 | 可靠性 |
---|---|---|---|---|---|---|
RocketMQ | Java, C++ | 较高 | 支持 | 良好 | 自定义TCP | 良好 |
Kafka | Java | 非常高 | 不支持 | 卓越 | 自定义TCP | 良好 |
ActiveMQ | Java | 万级 | 不支持 | 一般 | JMS规范 | 较差 |
1、Producer:生产者,即消息的提供者
2、Consumer:消费者,即消息的使用者
3、Message:消息,即进程之间进行通信的数据
4、Queue:队列,即消息存放的容器,消息以先进先出的方式进行存储
5、Vhost:虚拟主机,用于存储消息队列
虚拟主机vhost是AMQP概念的基础,RabbitMQ包含了开箱即用的默认虚拟主机"/“
vhost之间是绝对隔离,保障了队列和交换机的安全性。
RabbitMQ: easy to use, flexible messaging and streaming — RabbitMQhttp://www.rabbitmq.com/
建议直接使用启动
在RabbitMQ的文件路径打开cmd,启动RabbitMQ
net start RabbitMQ
关闭RabbitMQ
net stop RabbitMQ
注意:第一次使用RabbitMQ需要激活
- #进入sbin目录下打开cmd
- #启用管理控制台
- rabbitmq-plugins.bat enable rabbitmq_management
通过浏览器进行访问
- #管理控制台的端口是15672
- http://localhost:15672/
- #默认提供的管理员用户
- 用户名:guest 密码:guest
新版本的配置好像都叫 advanced.config,位置如下
C:\Users\你的用户名\AppData\Roaming\RabbitMQ
- [
- {rabbit, [
- {tcp_listeners, [{"127.0.0.1", 5672}]},
- {loopback_users, []}
- ]},
- {rabbitmq_management, [
- {listener, [
- {port, 15672},
- {ip, "0.0.0.0"}
- ]}
- ]}
- ].
建议新创建一个管理员账号在非本地登录
一个生产者对应一个消费者
一对多,每个消费者得到的消息唯一(work模式下最大发挥每个消费者的性能)
一对多,生产者将消息发送给交换机,交换机再将消息转发给与之绑定的队列,发布订阅模式对应的交换机类型是FANOUT
一对多,可以根据指定的路由key,将消息发送给不同的队列,交换机类型是DIRECT
一对多,在Routing路由模式的基础上,可以使用通配符定义路由key
* 匹配单个字符
# 匹配任意个字符
- spring:
- rabbitmq:
- virtual-host: /myhost
- port: 5672
- host: localhost
- username: admin
- password: admin
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.springframework.amqp</groupId>
- <artifactId>spring-rabbit-test</artifactId>
- <scope>test</scope>
- </dependency>
- @Configuration
- public class MqConfig {
- public static final String MSG_QUEUE = "msg-queue";
- public static final String MSG_EXCHANGE = "msg-exchange";
- public static final String MSG_ROUTE_KEY = "msg.key";
- /**
- * 声明队列
- */
- @Bean
- public Queue msgQueue() {
- return new Queue(MSG_QUEUE,true,false,false);
- }
- /**
- * 声明交换机
- */
- @Bean
- public TopicExchange msgExchage() {
- //参数2是否持久化,参数3是否自动删除
- return new TopicExchange(MSG_EXCHANGE,true,false);
- }
- /**
- * 将队列和交换机进行绑定
- */
- @Bean
- public Binding bindMsgQueue() {
- return BindingBuilder.bind(msgQueue()).to(msgExchage()).with(MSG_ROUTE_KEY);
- }
- }
- @Component
- public class MsgProducer {
- @Autowired
- private RabbitTemplate rabbitTemplate;
- public void sendMsg(String message) {
- rabbitTemplate.convertAndSend(MqConfig.MSG_EXCHANGE,MqConfig.MSG_ROUTE_KEY,message);
- }
- }
同样导入依赖、配置yml
- @Component
- public class MsgConsumer {
- public static final String MSG_QUEUE = "msg-queue";
- public static final String MSG_EXCHANGE = "msg-exchange";
- //消息处理的方法
- @RabbitHandler
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name=MSG_QUEUE,declare = "true",durable = "true",exclusive = "false",autoDelete = "false"),
- exchange = @Exchange(name = MSG_EXCHANGE,type = ExchangeTypes.TOPIC),
- key = "msg.*"
- ))
- //@Payload表示消费者处理的消息
- //@Headers注解表示接收的消息头将会被绑定到`headers`参数上
- public void receiveMsg(@Payload String msg, Channel channel, @Headers Map headers) {
- System.out.println("消费者处理消息:" + msg);
- Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
- try {
- channel.basicAck(tag,false);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
@RabbitHandler注解:表示该方法是一个消息处理方法
bindings属性用于指定队列和交换机的绑定关系。
@RabbitListener注解:表示该类是一个消息监听器,用于监听指定的队列
value属性用于指定队列的属性,包括队列的名称、是否需要声明、是否持久化、是否排他、是否自动删除等
exchange属性用于指定交换机的名称和类型
key属性用于指定消息的路由键
- spring:
- rabbitmq:
- virtual-host: /myhost
- port: 5672
- host: localhost
- username: admin
- password: admin
- publisher-confirm-type: correlated
- publisher-returns: true
ConfirmType | |
---|---|
NONE | 禁用发布确认模式,是默认值。 |
CORRELATED | 将消息成功发布到交换器后触发回调方法。 |
SIMPLE | 与CORRELATED相似,也会在将消息成功发布到交换器后触发回调方法。 |
- RabbitTemplate.ConfirmCallback callback = new RabbitTemplate.ConfirmCallback() {
- @Override
- public void confirm(CorrelationData correlationData, boolean isAck, String cause) {
- if (!isAck) {
- System.out.println("拒收的原因:" + cause);
- } else {
- if (correlationData != null) {
- System.out.println("broker接收消息自定义ID:" + correlationData.getId());
- }
- }
- }
- };
- @Bean
- public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {
- RabbitTemplate template = new RabbitTemplate();
- template.setConnectionFactory(factory);
- template.setConfirmCallback(callback);
- return template;
- }
- @Component
- public class MsgProducer {
- @Autowired
- private RabbitTemplate rabbitTemplate;
- public void sendMsg(String message) {
- //自定义的附加数据
- CorrelationData data = new CorrelationData();
- data.setId("10001");
- rabbitTemplate.convertAndSend(MqConfig.MSG_EXCHANGE,MqConfig.MSG_ROUTE_KEY,message,data);
- }
- }
在配置文件中添加 publisher-returns: true 配置消息回退
- RabbitTemplate.ReturnsCallback returnsCallback = new RabbitTemplate.ReturnsCallback() {
- @Override
- public void returnedMessage(ReturnedMessage msg) {
- System.out.println("--------消息路由失败------------");
- System.out.println("消息主体:" + msg.getMessage());
- System.out.println("返回编码:" + msg.getReplyCode());
- System.out.println("描述信息:" + msg.getReplyText());
- System.out.println("交换机:" + msg.getExchange());
- System.out.println("路由key:" + msg.getExchange());
- System.out.println("------------------------------");
- }
- };
- @Bean
- public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {
- RabbitTemplate template = new RabbitTemplate();
- template.setConnectionFactory(factory);
- template.setConfirmCallback(callback);
- template.setReturnsCallback(returnsCallback);
- // true 表示消息通过交换机无法路由到队列时候,会把消息返回给生产者
- // false 消息无法路由到队列就直接丢弃
- template.setMandatory(true);
- return template;
- }
常见情况 | |
---|---|
生产者消息没到交换机 | 生产者丢失消息 |
交换机没有把消息路由到队列 | 生产者丢失消息 |
RabbitMQ 宕机导致队列、队列中的消息丢失 | RabbitMQ 丢失消息 |
消费者消费出现异常,业务没执行 | 消费者丢失消息 |
解决办法为上面的异步监听confirm-type、publisher-returns,3.3.2.1的代码
设置持久化将消息写出磁盘,否则RabbitMQ重启后所有队列和消息都会丢失
消费者丢数据一般是因为采用了自动确认消息模式。MQ收到确认消息后会删除消息,如果这时消费者异常了,那消息就没了
使用ack机制,默认情况下自动应答,可以使用手动ack
- listener:
- simple:
- acknowledge-mode: manual #开启消费者手动确认模式 (channel.bacisAck)
如果消息消费失败,不执行消息确认代码,用channel的basicNack方法拒收
- public void receiveMsg(@Payload String msg, Channel channel, @Headers Map headers) {
- System.out.println("消费者处理消息:" + msg);
- Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
- try {
- System.out.println(2/0);
- //其中`tag`是消息的唯一标识,`false`表示只确认当前消息,不确认之前的所有未确认消息。
- channel.basicAck(tag,false);
- } catch (Exception e) {
- System.out.println("签收失败");
- try {
- //其中`tag`是消息的唯一标识,`false`表示只拒绝当前消息,`true`表示该消息将重新进入队列等待被消费。
- channel.basicNack(tag,false,true);
- } catch (IOException ex) {
- System.out.println("拒收失败");
- }
- }
- }
通常的代码报错并不能因为重试而解决,可能会造成死循环
解决办法: |
---|
当消费失败后将此消息存到 Redis,记录消费次数,如果消费了三次还是失败,就丢弃掉消息,记录日志落库保存 |
basicNack方法的参数3直接填 false ,不重回队列,记录日志、发送邮件等待开发手动处理 |
不启用手动 ack ,使用 SpringBoot 提供的消息重试 |
- listener:
- simple:
- retry:
- enabled: true
- max-attempts: 3 #重试次数
注意:要抛异常,因为SpringBoot 触发重试是根据方法中发生未捕捉的异常来决定的
- public void receiveMsg(@Payload String msg, Channel channel, @Headers Map headers) {
- System.out.println("消费者处理消息:" + msg);
- Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
- try {
- System.out.println(2/0);
- //channel.basicAck(tag,false);
- } catch (Exception e) {
- System.out.println("签收失败");
- //记录日志、发送邮件、保存消息到数据库,落库之前判断如果消息已经落库就不保存
- throw new RuntimeException(e);
- }
- }
使用手动恢复MQ解决了消息在消费者端丢失的问题,但是如果消费者处理消息成功后,由于网络波动导致手动回复MQ失败,该条消息还保存在消息队列中,由于MQ消息的重发机制,该消息会被重复消费,造成不好的后果
使用 redis 将消费过的消息唯一标识存储起来,然后在消费端业务执行之前判断 redis 中是否已经存在这个标识
如果消费端业务是新增操作,我们可以利用数据库的唯一键约束,比如优惠券流水表的优惠券编号,如果重复消费将会插入两条相同的优惠券编号记录,数据库会给我们报错,可以保证数据库数据不会插入两条
前面我们发送的消息都是字符串,如果想发送对象,就要用到消息转换器
消息转换器(Message Converter)是用于将消息在生产者和消费者之间进行序列化和反序列化的组件。在消息传递过程中,生产者将消息对象转换为字节流发送到消息队列,而消费者则将接收到的字节流转换回消息对象进行处理。
对象需要序列化
- @Component
- public class OrderProducer {
- @Autowired
- private RabbitTemplate rabbitTemplate;
- public void sendOrder(OrderDTO orderDTO) {
- rabbitTemplate.convertAndSend(MqConfig.ORDER_EXCHANGE,MqConfig.ORDER_KEY,orderDTO);
- }
- }
- @Test
- void testSendOrder() {
- OrderDTO orderDTO = new OrderDTO();
- orderDTO.setOrderSn(UUID.randomUUID().toString());
- orderDTO.setUsername("user");
- orderDTO.setAmount(new BigDecimal("200"));
- orderDTO.setCreateDate(new Date());
- orderProducer.sendOrder(orderDTO);
- }
控制台查看消息:
上面采用的是JDK序列化方式,可以看出虽然获得了对象,但是得到的数据体积大,可读性差,为了解决这个问题,我们可以通过SpringAMQP的MessageConverter来处理
Spring AMQP提供了多种消息转换器(Message Converter)这些消息转换器使得消息的发送和接收可以使用不同的消息格式,如JSON、XML等,从而更灵活地处理消息数据
- @Bean
- public MessageConverter messageConverter() {
- return new Jackson2JsonMessageConverter();
- }
- @Bean
- public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {
- RabbitTemplate template = new RabbitTemplate();
- template.setConnectionFactory(factory);
- template.setConfirmCallback(callback);
- template.setReturnsCallback(returnsCallback);
- // true 表示消息通过交换机无法路由到队列时候,会把消息返回给生产者
- // false 消息无法路由到队列就直接丢弃
- template.setMandatory(true);
- template.setMessageConverter(messageConverter());
- return template;
- }
添加依赖
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
配置消息转换器
- @Configuration
- public class MqConfig implements RabbitListenerConfigurer {
- @Resource
- private ObjectMapper objectMapper;
- //将消息转换为JSON格式
- public MappingJackson2MessageConverter messageConverter(){
- MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
- converter.setObjectMapper(objectMapper);
- return converter;
- }
-
- @Bean
- public MessageHandlerMethodFactory messageHandlerMethodFactory(){
- DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
- factory.setMessageConverter(messageConverter());
- return factory;
- }
- @Override
- public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) {
- rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
- }
- }
创建消费者处理消息方法
- @RabbitHandler
- @RabbitListener(queues = "msg-queue")
- public void receiveOrder(@Payload OrderDTO orderDTO, Channel channel, @Headers Map map){
- System.out.println("Order消息处理:"+orderDTO);
- Long tag = (Long)map.get(AmqpHeaders.DELIVERY_TAG);
- try {
- channel.basicAck(tag,false);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
延时队列就是用来存放需要在指定时间内被处理的消息的队列,是死信队列的一种
应用场景: |
---|
1、订单在十分钟之内未支付则自动取消 |
2、预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议 |
3、用户发起退款,如果三天内没有得到处理则通知 |
4、用户注册成功后,如果三天内没有登陆则进行短信提醒 |
死信队列(Dead Letter Queue,简称DLQ)是一种用于处理消息处理失败或被拒绝的消息的特殊队列。当消息在队列中满足一定条件时,例如消息被消费者拒绝、消息过期、消息处理超时等,这些消息将被发送到死信队列而不是直接被丢弃或忽略
TTL(Time To Live)指定消息在队列中存活的时间,超过指定的时间后如果消息还未被消费者消费,则该消息会被自动丢弃或转移到死信队列
- @Bean
- public Queue ttlQueue() {
- Map map = new HashMap();
- map.put("x-message-ttl",5000);
- return new Queue("ttl-queue",false,false,false,map);
- }
- @Bean
- public TopicExchange ttlExchange() {
- return new TopicExchange("ttl-exchange");
- }
- @Bean
- public Binding bindTllQueue() {
- return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl.*");
- }
- public void sendMessage() {
- rabbitTemplate.convertAndSend("ttl-exchange","ttl.msg","hello world");
- }
- @Test
- void testSendMessage() {
- msgProducer.sendMessage();;
- }
配置死信队列、交换机并把二者绑定,修改TTL队列失效时放到死信交换机中进而存到死信队列中,而不是直接销毁
- @Bean
- public Queue ttlQueue(){
- Map map = new HashMap<>();
- map.put("x-message-ttl",5000);
- map.put("x-dead-letter-exchange","dead-exchange"); //死信交换机
- map.put("x-dead-letter-routing-key","dead.msg"); //发送消息时携带路由key
- return new Queue("ttl-queue",false,false,false,map);
- }
- @Bean
- public TopicExchange deadExchange() {
- return new TopicExchange("dead-exchange");
- }
- @Bean
- public Queue deadQueue() {
- return new Queue("dead-queue");
- }
- @Bean
- public Binding bindDeadQueue() {
- return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead.#");
- }
- @RabbitHandler
- @RabbitListener(queues = {"dead-queue"})
- public void receiveDeadMsg(@Payload String msg, Channel channel, @Headers Map headers) {
- Long tag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
- System.out.println("处理了已经超时的消息:" + msg);
- try{
- channel.basicAck(tag,false);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
RabbitMQ实现延迟消息的方式有两种,一种是使用死信队列,另一种是使用延迟插件。
通过安装插件,自定义交换机,让交换机拥有延迟发送消息的能力,从而实现延迟消息,相较于死信队列延迟插件只需创建一个交换机和一个队列,使用起来简单
Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHubDelayed Messaging for RabbitMQ. Contribute to rabbitmq/rabbitmq-delayed-message-exchange development by creating an account on GitHub.https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases 将插件文件复制到RabbitMQ安装目录的plugins目录下,然后进入RabbitMQ安装目录的sbin目录下,使用如下命令启用延迟插件;
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- /**
- * 订单延迟插件消息队列所绑定的交换机
- */
- @Bean
- DirectExchange orderCancelExchange() {
- return ExchangeBuilder.directExchange("order-delay-exchange")
- .delayed().durable(true)
- .build();
- }
- /**
- * 订单延迟插件队列
- */
- @Bean
- public Queue orderCancelQueue() {
- return new Queue("order-delay-queue");
- }
- /**
- * 将订单延迟插件队列绑定到交换机
- */
- @Bean
- public Binding bindOrderCancelQueue() {
- return BindingBuilder.bind(orderCancelQueue())
- .to(orderCancelExchange()).with("delay.order.key");
- }
通过给消息设置x-delay头来设置消息从交换机发送到队列的延迟时间
- @Component
- public class CancelOrderSender {
- @Resource
- private RabbitTemplate rabbitTemplate;
- public void sendMessage(Long orderId,Long delayTime) {
- rabbitTemplate.convertAndSend("order-delay-exchange", "delay.order.key", orderId, new MessagePostProcessor() {
- @Override
- public Message postProcessMessage(Message message) throws AmqpException {
- //给消息设置延迟毫秒值
- message.getMessageProperties().setHeader("x-delay",delayTime);
- return message;
- }
- });
- }
- }
- @Service
- public class OrderServiceImpl {
- @Resource
- private CancelOrderSender cancelOrderSender;
- public void createOrder() {
- System.out.println("下单后生成订单ID");
- Long orderId = 1001L;
- sendDelayMessageCancelOrder(orderId);
- }
- public void sendDelayMessageCancelOrder(Long orderId) {
- //获取订单超时时间,假设为5秒
- long delayTimes = 5 * 1000;
- cancelOrderSender.sendMessage(orderId,delayTimes);
- }
- }
- @RestController
- @RequestMapping("/order")
- public class OrderController {
- @Resource
- private OrderServiceImpl orderService;
- @PostMapping
- public String create() {
- orderService.createOrder();
- return "success";
- }
- }
- @Service
- public class OrderServiceImpl {
- public void cancelOrder(Long orderId) {
- System.out.println("查询订单编号为:" + orderId + "订单状态,如果是待支付状态,则更新为已失效");
- }
- }
- @Component
- public class CancelOrderReceiver {
- @Autowired
- private OrderServiceImpl orderService;
- @RabbitHandler
- @RabbitListener(queues = {"order-delay-queue"})
- public void handle(@Payload Long orderId, Channel channel, @Headers Map headers) {
- orderService.cancelOrder(orderId);
- Long tag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
- try {
- channel.basicAck(tag,false);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。