赞
踩
public enum QueueEnum { /** * 各种异步消息频道 */ TEST(1,"test","队列频道"), DELAY_TEST(2,"delay_test","延迟延迟频道"), ; private Integer code; private String channel; private String desc; QueueEnum(Integer code, String channel, String desc) { this.code = code; this.channel = channel; this.desc = desc; } public Integer getCode() { return code; } public void setCode(Integer code) { this.code = code; } public String getChannel() { return channel; } public void setChannel(String channel) { this.channel = channel; } public String getDesc() { return desc; } public void setDesc(String desc) { this.desc = desc; } public static String findChannelByCode(Integer code) { QueueEnum[] queueEnums = QueueEnum.values(); for (QueueEnum queueEnum : queueEnums) { if (code == queueEnum.getCode()) { return queueEnum.getChannel(); } } return ""; } }
import java.io.Serializable; import java.time.LocalDate; /** * * 队列消息 * * 注意:涉及序列化问题,请勿将此类移动与修改 * @author linjianhui */ public class QueueMessage implements Serializable { private static final long serialVersionUID = 1L; //自定义的队列枚举 private QueueEnum queueEnum; private String activityId; /** * 任务日期- yyyy-MM-dd * 任务日期- yyyy-MM-dd HH:mm:ss */ private String taskDate; private String msgId; public String getActivityId() { return activityId; } public String getTaskDate() { return taskDate==null? LocalDate.now().toString():taskDate; } public void setQueueEnum(QueueEnum queueEnum) { this.queueEnum = queueEnum; } public void setActivityId(String activityId) { this.activityId = activityId; } public void setTaskDate(String taskDate) { this.taskDate = taskDate; } public String getMsgId() { return msgId; } public void setMsgId(String msgId) { this.msgId = msgId; } public QueueEnum getQueueEnum() { return queueEnum; } public QueueMessage() { } public QueueMessage(QueueEnum queueEnum, String activityId) { this.queueEnum = queueEnum; this.activityId = activityId; } public QueueMessage(QueueEnum queueEnum, String activityId,String msgId) { this.queueEnum = queueEnum; this.activityId = activityId; this.msgId=msgId; } @Override public String toString() { final StringBuilder sb = new StringBuilder("QueueMessage{"); sb.append("queueEnum=").append(queueEnum); sb.append(", activityId='").append(activityId).append('\''); sb.append(", taskDate='").append(taskDate).append('\''); sb.append(", mgsId='").append(msgId).append('\''); sb.append('}'); return sb.toString(); }
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.annotation.Order; import java.util.HashMap; @Configuration //保证队列的创建优先于监听队列 @Order(1) public class TestRabbitConfig { @Bean("testQueue") public Queue testQueue() { return new Queue(QueueEnum.TEST.getChannel()); } @Bean("testExchange") public DirectExchange testExchange() { return new DirectExchange(QueueEnum.TEST.getChannel()); } /** * 将队列绑定到exchange,使用指定的路由key * @return */ @Bean Binding bindingtestQueueToExchange(@Qualifier("testQueue") Queue testQueue, @Qualifier("testExchange")DirectExchange testExchange) { return BindingBuilder.bind(testQueue).to(testExchange).with(QueueEnum.TEST.getChannel()); } /** * 描述:定义延迟更新队列【死信队列】 * 当队列到期后就会通过死信交换机和路由key,路由到指定队列 * x-message-ttl 消息定时时间 * x-max-length 队列最大长度 * x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange * x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送 * @param * @return */ @Bean("delayTestQueue") public Queue delayTestQueue() { HashMap<String, Object> arguments = new HashMap<>(4); //设置延15天 // arguments.put("x-message-ttl", 15*24*6*10*60*1000);//需要时可以打开 // x-message-ttl这个设置对队列中所有的消息有效【属于队列级别】 //如果你想要【为每个消息动态设置过期时间】,你需要在【消息级别】设置Time To Live (TTL)。在Spring AMQP中,你可以通过设置MessageProperties的expiration属性来实现这一点: //在convertAndSend传入MessagePostProcessor实现类,覆盖其方法postProcessMessage(Message message),使用message.getMessageProperties().setExpiration(delayInMs+"");来为消息单独设置过期时间 // arguments.put("x-message-ttl", 10*60*1000);//10分钟 arguments.put("x-max-length", 500000); arguments.put("x-dead-letter-exchange", QueueEnum.TEST.getChannel()); arguments.put("x-dead-letter-routing-key", QueueEnum.TEST.getChannel()); return new Queue(QueueEnum.DELAY_TEST.getChannel(), true, false, false, arguments); } /** * 描述:定义延迟更新队列交换机 * @param * @return */ @Bean("delayTestExchange") public DirectExchange delayTestExchange() { return new DirectExchange(QueueEnum.DELAY_TEST.getChannel()); } /** * 描述:绑定延迟更新队列到exchange * @param * @return */ @Bean Binding bindingDelayTestQueueToExchange(@Qualifier("delayTestQueue")Queue delayTestQueue, @Qualifier("delayTestExchange")DirectExchange delayTestExchange) { return BindingBuilder.bind(delayTestQueue).to(delayTestExchange).with(QueueEnum.DELAY_TEST.getChannel()); }
import com.alibaba.fastjson.JSON; import com.project.utils.StringUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.time.Duration; import java.time.LocalDateTime; /** * 描述:发送消息 */ @Component @Slf4j(topic = "sendMqTask") public class SendMqMessage { @Autowired RabbitTemplate rabbitTemplate; public void sendTestMessage(QueueMessage queueMessage) { String messageId = StringUtil.getUniqueId("mq-"); queueMessage.setMsgId(messageId); rabbitTemplate.convertAndSend(queueMessage.getQueueEnum().getChannel(), queueMessage.getQueueEnum().getChannel(), queueMessage, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // 计算时间差 long delayInMs = Duration.between(LocalDateTime.now(), DateTimeUtil.fromString2LocalDateTime(queueMessage.getTaskDate())).toMillis(); //如果你想要为每个消息动态设置过期时间,你需要在【消息级别:更加细粒度控制】设置Time To Live (TTL)。在Spring AMQP中,你可以通过设置MessageProperties的expiration属性来实现这一点: //在convertAndSend传入MessagePostProcessor实现类,覆盖其方法postProcessMessage(Message message),使用message.getMessageProperties().setExpiration(delayInMs+"");来为消息单独设置过期时间 //这里,expiration属性的值是以毫秒为单位的过期时间戳。当这个时间戳过去后,消息就会变为死信 //这样每条消息都有自己的过期时间,不用受死信队列的x-message-ttl的影响,死信队列的x-message-ttl这个设置对队列中所有的消息有效【队列级别】 //在RabbitMQ中,如果同时在队列级别和消息级别设置了TTL(x-message-ttl 和 expiration 属性),那么将会遵循以下原则: // 1. 消息级别的TTL(expiration)优先:如果消息自身携带了TTL属性,那么即使队列设置了x-message-ttl,也会以消息本身的TTL为准。消息过期后,会被当作死信处理。 // 2. 队列级别的TTL(x-message-ttl)作为默认值:只有当消息没有携带TTL属性时,才会使用队列级别的x-message-ttl作为消息的过期时间。 // 因此,在你的场景中,如果同时设置了队列级别的x-message-ttl和消息级别的message.getMessageProperties().setExpiration(delayInMs+""),那么将会以消息级别的TTL为准。 //设置消息多长时间后过期 message.getMessageProperties().setExpiration(delayInMs+""); return message; } }); } }
import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.apache.ibatis.exceptions.PersistenceException; import org.mybatis.spring.MyBatisSystemException; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.core.annotation.Order; import org.springframework.jdbc.CannotGetJdbcConnectionException; import org.springframework.stereotype.Component; import java.io.IOException; import java.time.LocalDateTime; import java.util.Arrays; import java.util.concurrent.TimeUnit; /** * 描述:消息消费监听 */ @Component @Order(2) @Slf4j(topic = "receiveMqTask") public class ReceiveMqMessage { // private static final Logger MQ_LOG = LoggerFactory.getLogger("mqTask"); @Value("${spring.profiles.active}") private String active; /** * 判断是否是正式环境 * * @return */ private boolean isProdEnv() { return "prod".equals(active); } /** * 判断是否是测试环境 * * @return */ private boolean isTestEnv() { return "test".equals(active); } /** * 监听消息队列 * @param queueMessage * @param message : org.springframework.amqp.core.Message * @param channel : com.rabbitmq.client.Channel */ @RabbitListener(queues = ApiConstants.TEST) @RabbitHandler public void test(QueueMessage queueMessage, Message message, Channel channel) { String env=isProdEnv()?"正式":isTestEnv()?"测试":active; log.info("====={}== test Mq Message={}",env, queueMessage); // String consumerTag = message.getMessageProperties().getConsumerTag(); long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { System.out.println("发送时间是:"+ queueMessage.getTaskDate()); System.out.println("当前时间是:"+ LocalDateTime.now().toLocalDate()+" "+LocalDateTime.now().toLocalTime()); // 手动ACK try { channel.basicAck(deliveryTag, false); } catch (IOException e) { log.error("MQ手动ACK错误: ", e); } } catch (Exception e) { log.error("test queue 失败"); } } }
/** * 日期工具类 */ public class DateTimeUtil { /** * yyyy-MM-dd HH:mm:ss */ public static final String FORMAT_DATETIME = "yyyy-MM-dd HH:mm:ss"; /** * discription: */ public static String getLocalDateTime(LocalDateTime localDateTime) { DateTimeFormatter df = DateTimeFormatter.ofPattern(DateTimeUtil.FORMAT_DATETIME); if (localDateTime != null) { String localTime = df.format(localDateTime); return localTime; } return null; } }
@RestController @RequestMapping(value = "/test") public class TestController { @Autowired private SendMqMessage sendMqMessage; @RequestMapping(value = "/testMqMessage", method = RequestMethod.GET) public ResultEntity testMqMessage(@RequestParam(value = "second",defaultValue = "20",required = false) Long second){ QueueMessage queueMessage = new QueueMessage(QueueEnum.DELAY_TEST,"123"); //设置20秒后更新【默认】 queueMessage.setTaskDate(DateTimeUtil.getLocalDateTime(LocalDateTime.now().plusSeconds(second))); sendMqMessage.sendTestMessage(queueMessage); return "发送成功"; } }
//如果你想要为每个消息动态设置过期时间,你需要在【消息级别:更加细粒度控制】设置Time To Live (TTL)。在Spring AMQP中,你可以通过设置MessageProperties的expiration属性来实现这一点:
//在convertAndSend传入MessagePostProcessor实现类,覆盖其方法postProcessMessage(Message message),使用message.getMessageProperties().setExpiration(delayInMs+"");来为消息单独设置过期时间
//这里,expiration属性的值是以毫秒为单位的过期时间戳。当这个时间戳过去后,消息就会变为死信
//这样每条消息都有自己的过期时间,不用受死信队列的x-message-ttl的影响,死信队列的x-message-ttl这个设置对队列中所有的消息有效【队列级别】
//在RabbitMQ中,如果同时在队列级别和消息级别设置了TTL(x-message-ttl 和 expiration 属性),那么将会遵循以下原则:
// 1. 消息级别的TTL(expiration)优先:如果消息自身携带了TTL属性,那么即使队列设置了x-message-ttl,也会以消息本身的TTL为准。消息过期后,会被当作死信处理。
// 2. 队列级别的TTL(x-message-ttl)作为默认值:只有当消息没有携带TTL属性时,才会使用队列级别的x-message-ttl作为消息的过期时间。
// 因此,在你的场景中,如果同时设置了队列级别的x-message-ttl和消息级别的message.getMessageProperties().setExpiration(delayInMs+""),那么将会以消息级别的TTL为准。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。