当前位置:   article > 正文

Rabbitmq的高级应用(过期时间、死信队列、延迟队列、消息追踪、消息日志)_rabbitmq 死信队列做日志

rabbitmq 死信队列做日志

过期消息

解释

消息队列中,有些消息可能一直不会被消费,可以设置过期时间,防止消息队列堵塞,可以通过两种方式设置,一种是xml配置的方式,一种是代码的方式,这里写一下代码配置的方式

/**
过期消息
* 该消息投递任何交换机或队列中的时候;如果到了过期时间则将从该队列中删除
*/
@Test
public void ttlMessageTest(){
MessageProperties messageProperties = new MessageProperties();
//设置消息的过期时间,5 秒
messageProperties.setExpiration("5000");
Message message = new Message("测试过期消息,5 秒钟过期".getBytes(), messageProperties);
//路由键与队列同名
rabbitTemplate.convertAndSend("my_ttl_queue", message);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

如果xml和代码中同时配置了过期时间,则两者中较小的那个才会起作用。
如果是xml的配置方式需要再启动类中注入配置文件
在这里插入图片描述

<rabbit:queue id="my_ttl_queue" name="my_ttl_queue" auto-declare="true">
<rabbit:queue-arguments>
<!--投递到该队列的消息如果没有消费都将在 6 秒之后被删除-->
<entry key="x-message-ttl" value-type="long" value="6000"/>
</rabbit:queue>
  • 1
  • 2
  • 3
  • 4
  • 5

死信队列

上面一种是直接设置消息在队列中存在多久以后直接删除,但是有些需求不能删除,需要做特殊处理,咋办?这就诞生了死信队列,死信队列是别人不要的丢到里面,然后进行特殊的处理。

常见两种丢到死信队列的情况

第一种:在规定时间内,没有被消费掉
第二种:消息队列太长了,设置一定的长度,达到长度以后,就会把先前的消息丢到死信队列,
配置如下:

    <!--定义定向交换机中的持久化死信队列,不存在则自动创建-->
    <rabbit:queue id="my_dlx_queue" name="my_dlx_queue" auto-declare="true"/>

    <!--定义广播类型交换机;并绑定上述两个队列-->
    <rabbit:direct-exchange id="my_dlx_exchange" name="my_dlx_exchange" auto-declare="true">
        <rabbit:bindings>
            <!--绑定路由键my_ttl_dlx、my_max_dlx,可以将过期的消息转移到my_dlx_queue队列-->
            <rabbit:binding key="my_ttl_dlx" queue="my_dlx_queue"/>
            <rabbit:binding key="my_max_dlx" queue="my_dlx_queue"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!--定义过期队列及其属性,不存在则自动创建-->
    <rabbit:queue id="my_ttl_dlx_queue" name="my_ttl_dlx_queue" auto-declare="true">
        <rabbit:queue-arguments>
            <!--投递到该队列的消息如果没有消费都将在6秒之后被投递到死信交换机-->
            <entry key="x-message-ttl" value-type="long" value="6000"/>
            <!--设置当消息过期后投递到对应的死信交换机-->
            <entry key="x-dead-letter-exchange" value="my_dlx_exchange"/>
        </rabbit:queue-arguments>
    </rabbit:queue>

    <!--定义限制长度的队列及其属性,不存在则自动创建-->
    <rabbit:queue id="my_max_dlx_queue" name="my_max_dlx_queue" auto-declare="true">
        <rabbit:queue-arguments>
            <!--投递到该队列的消息最多2个消息,如果超过则最早的消息被删除投递到死信交换机-->
            <entry key="x-max-length" value-type="long" value="2"/>
            <!--设置当消息过期后投递到对应的死信交换机-->
            <entry key="x-dead-letter-exchange" value="my_dlx_exchange"/>
        </rabbit:queue-arguments>
    </rabbit:queue>

    <!--定义定向交换机 根据不同的路由key投递消息-->
    <rabbit:direct-exchange id="my_normal_exchange" name="my_normal_exchange" auto-declare="true">
        <rabbit:bindings>
            <rabbit:binding key="my_ttl_dlx" queue="my_ttl_dlx_queue"/>
            <rabbit:binding key="my_max_dlx" queue="my_max_dlx_queue"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

在这里插入图片描述
测试代码:

    /**
     * 过期消息投递到死信队列
     * 投递到一个正常的队列,但是该队列有设置过期时间,到过期时间之后消息会被投递到死信交换机(队列)
     */
    @Test
    public void dlxTTLMessageTest(){
        rabbitTemplate.convertAndSend(
                "my_normal_exchange",
                "my_ttl_dlx",
                "测试过期消息;6秒过期后会被投递到死信交换机2222");
    }


    /**
     * 消息长度超过2,会投递到死信队列中
     */
    @Test
    public void dlxMaxMessageTest(){
        rabbitTemplate.convertAndSend(
                "my_normal_exchange",
                "my_max_dlx",
                "发送消息4:消息长度超过2,会被投递到死信队列中!");


        rabbitTemplate.convertAndSend(
                "my_normal_exchange",
                "my_max_dlx",
                "发送消息5:消息长度超过2,会被投递到死信队列中!");

        rabbitTemplate.convertAndSend(
                "my_normal_exchange",
                "my_max_dlx",
                "发送消息6:消息长度超过2,会被投递到死信队列中!");



    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
流程图

在这里插入图片描述

延迟队列

有时候不想让消费者立马拿到消息消费,就涉及到延迟队列的使用,使用场景比如典型的付款,提交订单以后,用户如果迟迟没有支付,设置一个过期时间半小时,半小时后进入死信队列进行处理,由系统自动取消订单,所以延迟队列的实现可以使用消息过期+死信队列的方式来实现。

消息确认机制

确认并且保证消息被送达,提供了两种方式:发布确认和事务。(两者不可同时使用)在 channel 为事务时,不可引入确认模式;同样 channel 为确认模式下,不可使用事务。

发布确认

几个需要用到的配置
第一:连接配置

   publisher-confirms="true" 事务是否发送出去确认
    publisher-returns="true"  事务处理成功或者失败返回
    -->
    <rabbit:connection-factory id="connectionFactory" host="192.168.126.131"
                               port="5672"
                               username="huali"
                               password="huali"
                               virtual-host="huali"
                                publisher-confirms="true"
    							publisher-returns="true"

    />
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

第二:消息处理类配置

 <!-- 消息回调处理类 -->
    <bean id="confirmCallback" class="com.huali.rabbitmq.MsgSendConfirmCallBack"/>
    <!-- 消息失败回调类 -->
    <bean id="sendReturnCallback" class="com.huali.rabbitmq.MsgSendReturnCallback"/>
  • 1
  • 2
  • 3
  • 4

第三:整合配置

   <!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
    <!-- confirm-callback="confirmCallback" 表示:消息失败回调 -->
    <!-- return-callback="sendReturnCallback" 表示:消息失败回调 ,同时需配置mandatory="true",否则消息则丢失-->
    <!-- channel-transacted="true" 表示:支持事务操作 -->
    <rabbit:template id="rabbitTemplate"
                     connection-factory="connectionFactory"
                     confirm-callback="confirmCallback"
                     return-callback="sendReturnCallback"
                     mandatory="true"
    />
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

第四:处理类

package com.huali.rabbitmq;

import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

/**
 * 消息确认与回退,确认消息是否发出去了
 */
public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            System.out.println("消息确认成功....");
        } else {
            //处理丢失的消息
            System.out.println("消息确认失败," + cause);
        }
    }
}

package com.huali.rabbitmq;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

//消息发送回调-确认处理成功或者失败
public class MsgSendReturnCallback implements RabbitTemplate.ReturnCallback {
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
        String msgJson  = new String(message.getBody());
        System.out.println("Returned Message:"+msgJson);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

第五:测试

    /**
     * 消息确认,测试消息发送确认代码是否正确
     */
    @Test
    public void queueTest(){
        //路由键与队列同名
        rabbitTemplate.convertAndSend("spring_queue", "只发队列spring_queue的消息。");
    }

//消息回调测试
    @Test
    public void testFailQueueTest() throws InterruptedException {
        //exchange 正确,queue 错误 ,confirm被回调, ack=true; return被回调 replyText:NO_ROUTE
        rabbitTemplate.convertAndSend("test_fail_exchange", "", "测试消息发送失败进行确认应答。");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

事务支持

场景:业务处理伴随消息的发送,业务处理失败(事务回滚)后要求消息不发送。rabbitmq 使用调用者的外部事务,通常是首选,因为它是非侵入性的(低耦合)。方法比较简单,在上面配置的接触上,去掉第一的 publisher-confirms=“true” publisher-returns=“true”,因为二者不能共存,然后在第三 整合配置的时候加上支持事务, channel-transacted=“true”,再加平台事务管理。然后在代码方法上加上事务注解即可、

第一:配置

  <rabbit:connection-factory id="connectionFactory" host="192.168.126.131"
                               port="5672"
                               username="huali"
                               password="huali"
                               virtual-host="huali"

    />

    <!-- 消息回调处理类 -->
    <bean id="confirmCallback" class="com.huali.rabbitmq.MsgSendConfirmCallBack"/>
    <!-- 消息失败回调类 -->
    <bean id="sendReturnCallback" class="com.huali.rabbitmq.MsgSendReturnCallback"/>

    <!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
    <!-- confirm-callback="confirmCallback" 表示:消息失败回调 -->
    <!-- return-callback="sendReturnCallback" 表示:消息失败回调 ,同时需配置mandatory="true",否则消息则丢失-->
    <!-- channel-transacted="true" 表示:支持事务操作 -->
    <rabbit:template id="rabbitTemplate"
                     connection-factory="connectionFactory"
                     confirm-callback="confirmCallback"
                     return-callback="sendReturnCallback"
                     mandatory="true"
                     channel-transacted="true"
    />

    <!--平台事务管理器-->
    <bean id="transactionManager" class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
        <property name="connectionFactory" ref="connectionFactory"/>
    </bean>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

第二:测试类

   @Test
    @Transactional //开启事务
    //@Rollback(false)//在测试的时候,需要手动的方式制定回滚的策略
    public void queueTest2(){
        //路由键与队列同名
        rabbitTemplate.convertAndSend("spring_queue", "只发队列spring_queue的消息--02222222222222222222。");
        //System.out.println("----------------dosoming:可以是数据库的操作,也可以是其他业务类型的操作---------------");
        //模拟业务处理失败
        //System.out.println(1/0);
        //rabbitTemplate.convertAndSend("spring_queue", "只发队列spring_queue的消息--02。");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

消息追踪

消息中心的消息追踪需要使用 Trace 实现,Trace 是 Rabbitmq 用于记录每一次发送的消息,方便使用 Rabbitmq 的开发者调试、排错。可通过插件形式提供可视化界面。Trace 启动后会自动创建系统 Exchange:amq.rabbitmq.trace ,每个队列会自动绑定该 Exchange,绑定后发送到队列的消息都会记录到 Trace 日志。

消息开启

需要先 用rabbitmq 启用插件,再打开开关才能使用

命令集描述
rabbitmq-plugins list查看插件列表
rabbitmq-plugins enable rabbitmq_tracingrabbitmq 启用 trace 插件
rabbitmqctl trace_on打开 trace 的开关
rabbitmqctl trace_on -p huali打开 trace 的开关(huali为需要日志追踪的 vhost)
rabbitmqctl trace_off关闭 trace 的开关
rabbitmq-plugins disable rabbitmq_tracingrabbitmq 关闭 Trace 插件
rabbitmqctl set_user_tags heima administrator只有 administrator 的角色才能查看日志界面

安装插件并开启 trace_on 之后,会发现多个 exchange:amq.rabbitmq.trace ,类型为:topic。
 在这里插入图片描述
在这里插入图片描述
手动文件log文件,然后发送一条消息

测试

发送一条消息

rabbitTemplate.convertAndSend("spring_queue", "只发队列 spring_queue 的消息--01。");
  • 1

在这里插入图片描述
即可看到刚刚发送的消息,这里乱码了,暂时不管。

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号