当前位置:   article > 正文

扩展自定义mq组件,使用rabbitmq_delayed_message_exchange延迟组件,完善消息延迟消息精度问题

rabbitmq_delayed_message_exchange

一 引言

最近在项目中发现消息的延迟消费是通过redis的过期消息监听,存在很大的安全问题,由于服务的宕机或其他问题会导致消息的丢失,本想系采用延迟队列和死信队列完成消息的延迟消费,但这种方案存在一定的局限性,当队列中第一个消息未过期时,后面过期了的消息也不会投递到死信队列中,这样会造成消息的阻塞,这种方案对时间精度要求不是很高时,可以采用,但时间精度要求比较高时就会存在一定的局限性
rabbitmq官方给我提供了rabbitmq_delayed_message_exchange用于解决消息阻塞问题,本文重点介绍基于自定义springboot组件扩展mq实现延迟消息的消费问题.自定义mq组件扩展可以参考笔者的这篇文章自定义springboot组件–基于模板模式对原生springboot的rabbitmq组件进行扩展

二 引入mq延迟插件

2.1 下载插件

登录mq管理平台查看对应的版本号
在这里插入图片描述

在 RabbitMQ 的 3.5.7 版本之后,提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列 ,同时需保证 Erlang/OPT 版本为 18.0 之后。
我这里 MQ 的版本是 3.9.11,现在去 GitHub 上根据版本号下载插件
点击下载插件
根据自己的版本号自行下载即可
在这里插入图片描述

2.2 制作包含延迟插件的镜像

将下载好的插件安装mq,这里我们采用Dockerfile制作相应的镜像,避免每次启动docker时都去拷贝插件进行相应的插件部署

FROM rabbitmq:management
COPY ./rabbitmq_delayed_message_exchange-3.9.0.ez /plugins
#开启插件
RUN rabbitmq-plugins enable rabbitmq_delayed_message_exchange
#开启webUI
RUN rabbitmq-plugins enable rabbitmq_management
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

发布到自己的镜像仓库
在这里插入图片描述

三 扩展mq组件,集成延迟插件

申明自定义元数据

package com.cncloud.cncloud.common.mq.metadata;

import com.cncloud.cncloud.common.mq.common.ExchangeTypeEnum;

/**
 * @author likun
 * @date 2022年11月21日 17:21
 */
public abstract class CustomMessageMetadata implements MessageMetadata{

	@Override
	public ExchangeTypeEnum getExchangeType() {
		return ExchangeTypeEnum.CUSTOM;
	}

	/**
	 * 获取交换机名
	 * @return
	 */
	public abstract String getExchange();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

申明解析器

package com.cncloud.cncloud.common.mq.metadata.resolver;

import com.cncloud.cncloud.common.mq.common.ExchangeTypeEnum;
import com.cncloud.cncloud.common.mq.metadata.CustomMessageMetadata;
import com.cncloud.cncloud.common.mq.metadata.MessageMetadata;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;

import java.util.HashMap;
import java.util.Map;

/**
 * @author likun
 * @date 2022年11月21日 17:23
 */
public class CustomMessageMetadataResolver extends AbstractMessageMetadataResolver{
	public CustomMessageMetadataResolver(RabbitAdmin rabbitAdmin) {
		super(rabbitAdmin);
	}

	@Override
	public boolean isSupport(MessageMetadata messageMetadata) {
		return ExchangeTypeEnum.CUSTOM.equals(messageMetadata.getExchangeType());
	}

	@Override
	public boolean doResolve(MessageMetadata messageMetadata) {
		CustomMessageMetadata customMessageMetadata = (CustomMessageMetadata) messageMetadata;
		Map<String, Object> args = messageMetadata.getQueueArgs();
		if (args==null){
			args=new HashMap();
			args.put("x-delayed-type", "direct");
		}

		CustomExchange exchange = new CustomExchange(customMessageMetadata.getExchange(), "x-delayed-message", true, false, args);
		Queue queue = new Queue(customMessageMetadata.getQueue(), true, false, false, customMessageMetadata.getQueueArgs());

		Binding binding = BindingBuilder.bind(queue).to(exchange).with(messageMetadata.getQueue()).noargs();
		declareQueue(queue);
		declareExchange(exchange);
		declareBinding(binding);
		return true;
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

申明消息发射器

package com.cncloud.cncloud.common.mq.producer;

import com.alibaba.fastjson.JSON;
import com.cncloud.cncloud.admin.api.entity.MqSendLog;
import com.cncloud.cncloud.admin.api.feign.RemoteUserService;
import com.cncloud.cncloud.common.core.constant.SecurityConstants;
import com.cncloud.cncloud.common.core.message.base.DelayMessage;
import com.cncloud.cncloud.common.core.util.SpringContextHolder;
import com.cncloud.cncloud.common.core.util.UniqueIdUtil;
import com.cncloud.cncloud.common.mq.common.Constant;
import com.cncloud.cncloud.common.mq.common.ExchangeTypeEnum;
import com.cncloud.cncloud.common.mq.metadata.CustomMessageMetadata;
import com.cncloud.cncloud.common.mq.metadata.MessageMetadata;
import com.cncloud.cncloud.common.mq.metadata.resolver.MessageMetadataResolver;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.lang.NonNull;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;

/**
 * @author likun
 * @date 2021/6/23 15:22
 */
public class CustomMessageDelivery extends AbstractMessageDelivery<CustomMessageMetadata> {

    public CustomMessageDelivery(@NonNull MessageMetadataResolver messageMetadataResolver, @NonNull RabbitTemplate rabbitTemplate) {
        super(messageMetadataResolver, rabbitTemplate);
    }

    public CustomMessageDelivery(@NonNull MessageMetadataResolver messageMetadataResolver, @NonNull RabbitTemplate rabbitTemplate, @NonNull MessageNotDeliveredCallback messageNotDeliveredCallback, boolean autoDeclareMQ) {
        super(messageMetadataResolver, rabbitTemplate, messageNotDeliveredCallback, autoDeclareMQ);
    }

    @Override
    public boolean isSupport(MessageMetadata messageMetadata) {
        return ExchangeTypeEnum.CUSTOM.equals(messageMetadata.getExchangeType());
    }

    @Override
    protected <T> boolean doDeliver(CustomMessageMetadata messageMetadata, T data){
    	// 保存消息投递记录
		Message message = createMessage(data);
		String msgClsName = data.getClass().getName();
		String msgContent = JSON.toJSONString(data);
		MqSendLog mqSendLog = new MqSendLog();
		mqSendLog.setId(UniqueIdUtil.genId());
		mqSendLog.setExchange(messageMetadata.getExchange());
		mqSendLog.setMsgContent(msgContent);
		mqSendLog.setMsgClsName(msgClsName);
		mqSendLog.setMsgid(message.getMessageProperties().getCorrelationId());
		mqSendLog.setQueue(messageMetadata.getQueue());

		Date date = new Date(System.currentTimeMillis() + 1000 * 60 * Constant.MSG_TIMEOUT);
		Instant instant = date.toInstant();
		ZoneId zoneId = ZoneId.systemDefault();
		LocalDateTime localDateTime = instant.atZone(zoneId).toLocalDateTime();
		mqSendLog.setTrytime(localDateTime);
		RemoteUserService remoteUserService = SpringContextHolder.getBean(RemoteUserService.class);
		remoteUserService.saveMqSendLog(mqSendLog, SecurityConstants.FROM_IN);
		rabbitTemplate.convertAndSend(messageMetadata.getExchange(), messageMetadata.getQueue(),message,new CorrelationData(message.getMessageProperties().getCorrelationId()));

		return true;
    }

    @Override
	public Message createMessage(Object data){
		String msgId = String.valueOf(UniqueIdUtil.genId());
		SimpleMessageConverter simpleMessageConverter = new SimpleMessageConverter();
		MessageProperties messageProperties = new MessageProperties();
		messageProperties.setCorrelationId(msgId);
		// 自定义MessageProperties扩展
		if (data instanceof DelayMessage){
			DelayMessage delayMessage = (DelayMessage) data;
			Message message = simpleMessageConverter.toMessage(data, messageProperties);
			message.getMessageProperties().setHeader("x-delay", delayMessage==null?0L:delayMessage.getDelayTime());
			return message;
		}
		Message message = simpleMessageConverter.toMessage(data, messageProperties);
		return message;
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88

完善相应的静态代理

在这里插入图片描述

四 客户端实现消息的延迟消费

申明自定义message,必须实现DelayMessage, Serializable这两个接口

DelayMessage 用于组件中获得延迟时间,Serializable接口便于消息投递时消息的序列化

package com.cncloud.cncloud.common.core.message.base;

/**
 * @author likun
 * @date 2022年11月21日 17:43
 */
public interface DelayMessage {
	/**
	 * 获得延迟执行时间
	 * @return
	 */
	Long getDelayTime();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
package com.cncloud.cncloud.common.core.message.dto;

import com.cncloud.cncloud.common.core.message.base.DelayMessage;
import lombok.Data;

import java.io.Serializable;

/**
 * @author likun
 * @date 2022年11月21日 17:47
 */
@Data
public class TestDelayMessage implements DelayMessage, Serializable {

	private static final long serialVersionUID = -516414641899843714L;

	/**
	 * 消息
	 */
	private String msg;

	/**
	 * 延迟时间
	 */
	private Long millisecond;

	@Override
	public Long getDelayTime() {
		return this.millisecond;
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

申明队列的元数据中响应的配置

public class DelayMessageMetadata extends CustomMessageMetadata {
	@Override
	public String getExchange() {
		return MessageMetadataConstant.DelayMessageMetadataConstant.EXCHANGE;
	}

	@Override
	public String getQueue() {
		return MessageMetadataConstant.DelayMessageMetadataConstant.QUEUE;
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
package com.cncloud.cncloud.admin.message.config;


import com.cncloud.cncloud.admin.producer.matadata.CallDeadMessageMetadata;
import com.cncloud.cncloud.admin.producer.matadata.CallTtlMessageMetadata;
import com.cncloud.cncloud.admin.producer.matadata.DelayMessageMetadata;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author likun
 * @date 2021/6/23 16:14
 */
@Configuration
public class MessageConfig {
	/**
	 * 通话延迟队列元数据
	 * @return
	 */
	@Bean
	public CallTtlMessageMetadata callTtlMessageMetadata(){
		return new CallTtlMessageMetadata();
	}

	/**
	 * 通话死信队列元数据
	 * @return
	 */
	@Bean
	public CallDeadMessageMetadata callDeadMessageMetadata(){
		return new CallDeadMessageMetadata();
	}

	@Bean
	public DelayMessageMetadata delayMessageMetadata(){
		return new DelayMessageMetadata();
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

申明消息监听器

package com.cncloud.cncloud.admin.message.consum;

import com.cncloud.cncloud.admin.producer.matadata.DelayMessageMetadata;
import com.cncloud.cncloud.common.core.message.dto.TestDelayMessage;
import com.cncloud.cncloud.common.mq.consumer.SimpleDynamicMessageListener;
import com.cncloud.cncloud.common.mq.metadata.resolver.MessageMetadataResolver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * @author likun
 * @date 2022年11月21日 17:54
 */
@Component
@Slf4j
public class DelayMessageListener extends SimpleDynamicMessageListener<TestDelayMessage> {
	public DelayMessageListener(MessageMetadataResolver messageMetadataResolver) {
		super(new DelayMessageMetadata(), messageMetadataResolver);
	}

	@Override
	public void onMessage(TestDelayMessage testDelayMessage) {
		log.info("监听到客户端消息:{},延迟时间为:{}",testDelayMessage.getMsg(),testDelayMessage.getDelayTime());
	}
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

五 延迟消息测试

package com.cncloud.cncloud.admin.controller;


import com.cncloud.cncloud.admin.producer.matadata.DelayMessageMetadata;
import com.cncloud.cncloud.common.core.message.dto.TestDelayMessage;
import com.cncloud.cncloud.common.core.util.R;
import com.cncloud.cncloud.common.mq.producer.MessageDelivery;
import com.cncloud.cncloud.common.security.annotation.Inner;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;


@RestController
@RequestMapping("/demo/message")
public class MessageDemoController {

    @Autowired
    private MessageDelivery messageDelivery;

    @Autowired
	private DelayMessageMetadata delayMessageMetadata;

	@GetMapping("/message")
	@Inner(value = false)
	public R message(@RequestParam("message") String message) throws ClassNotFoundException {
		TestDelayMessage testDelayMessage1 = new TestDelayMessage();
		testDelayMessage1.setMsg("数据测试1");
		testDelayMessage1.setMillisecond(10000L);
		TestDelayMessage testDelayMessage2 = new TestDelayMessage();
		testDelayMessage2.setMsg("数据测试2");
		testDelayMessage2.setMillisecond(5000L);
		messageDelivery.deliver(delayMessageMetadata,testDelayMessage1);
		messageDelivery.deliver(delayMessageMetadata,testDelayMessage2);
		return R.ok();
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

测试结果:
在这里插入图片描述
10s的延迟消息先投递5s的延迟消息后投递,但是客户端先收到5s的延迟消息,说明消息并没有发生相应的阻塞

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/126970
推荐阅读
相关标签
  

闽ICP备14008679号