赞
踩
在使用mq作为中间件做异步消息推送时,可能会遇到一个场景,就是消息在消费后执行一系列的逻辑到一半,突然遇到异常或者是断电等之类问题,这时消息在mq的队列中已经出队列,而正常逻辑没有执行完就异常终止,这样就可能会造成数据的缺失和数据的不完整,如何解决这个问题?其实挺简单的,就是在消息进入消费者消费的同时做一个记录,再在逻辑执行完成后再删除这条记录或者是改变这条记录的状态,同时,在项目初始化时或者是执行一个定时任务扫描这个记录表,如果存在则产生一条相同的记录发送到activemq中(相同的队列)。这样就能解决这个问题。然而,今天在开发中我又思考到一个问题,如图:
假设消息1没有消费完成,则消息1要重新进入队列,此时有一条消息4跟消息1操作的是同一条数据,如果消息1再进入队列,则4会在1之前消费掉,此时数据就会发生错乱,如果要保证消息1在消息4之前消费,则就需要对重新入队列的消息1进行一些操作来使得消息1优先消费。
activemq是提供对队列设置为具有优先级消息属性的功能,那么下面就要来实现具有优先级属性的消息队列:
第一步,进入activemq的conf文件夹下,修改activemq.xml配置文件,在<policyEntries>标签下插入一个配置:
<policyEntry queue="queue1" strictOrderDispatch="true" useCache="false" queuePrefetch="1" prioritizedMessages="true" />
完成后保存,并重启mq服务。这条配置就配置了对于queue1这个队列内的消息是具有优先级的属性的。
第二步,整合一个springboot+activemq的demo项目用于测试。
第三步,就是具体的代码实现了,首先我们来关注下springboot整合activemq的源码:
1.JmsMessagingTemplate这个类是实现发送消息的主要类,正常我们只需要在producer中注入这个类,调用其中发送消息的方法就能够实现消息的发送。具体代码如下:
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.jms.core.JmsMessagingTemplate;
- import org.springframework.stereotype.Service;
-
- import javax.jms.Destination;
-
- @Service
- public class TestProducer {
-
- @Autowired
- private JmsMessagingTemplate jmsMessagingTemplate;
-
- public void sendMsg(Destination destination, String text){
- jmsMessagingTemplate.convertAndSend(destination,text);
- }
-
- }

但是,此消息是不具有优先级的,接着往下看源码。
2.在JmsMessagingTemplate类中实际上是封装了一个JmsTemplate这个类,实际上这个类才是真正实现消息发送的类,它只是将它进一步封装而已。
- package org.springframework.jms.core;
-
- import java.util.Map;
- import javax.jms.ConnectionFactory;
- import javax.jms.Destination;
- import javax.jms.JMSException;
- import javax.jms.Session;
- import org.springframework.beans.factory.InitializingBean;
- import org.springframework.jms.InvalidDestinationException;
- import org.springframework.jms.JmsException;
- import org.springframework.jms.support.converter.MessageConverter;
- import org.springframework.jms.support.converter.MessagingMessageConverter;
- import org.springframework.jms.support.destination.DestinationResolutionException;
- import org.springframework.lang.Nullable;
- import org.springframework.messaging.Message;
- import org.springframework.messaging.MessagingException;
- import org.springframework.messaging.converter.MessageConversionException;
- import org.springframework.messaging.core.AbstractMessagingTemplate;
- import org.springframework.messaging.core.MessagePostProcessor;
- import org.springframework.util.Assert;
-
- public class JmsMessagingTemplate extends AbstractMessagingTemplate<Destination> implements JmsMessageOperations, InitializingBean {
- @Nullable
- private JmsTemplate jmsTemplate;
- private MessageConverter jmsMessageConverter = new MessagingMessageConverter();
- private boolean converterSet;
- @Nullable
- private String defaultDestinationName;
-
- public JmsMessagingTemplate() {
- }
-
- public JmsMessagingTemplate(ConnectionFactory connectionFactory) {
- this.jmsTemplate = new JmsTemplate(connectionFactory);
- }
-
- public JmsMessagingTemplate(JmsTemplate jmsTemplate) {
- Assert.notNull(jmsTemplate, "JmsTemplate must not be null");
- this.jmsTemplate = jmsTemplate;
- }
-
- public void setConnectionFactory(ConnectionFactory connectionFactory) {
- if (this.jmsTemplate != null) {
- this.jmsTemplate.setConnectionFactory(connectionFactory);
- } else {
- this.jmsTemplate = new JmsTemplate(connectionFactory);
- }
-
- }
-
- @Nullable
- public ConnectionFactory getConnectionFactory() {
- return this.jmsTemplate != null ? this.jmsTemplate.getConnectionFactory() : null;
- }
-
- public void setJmsTemplate(@Nullable JmsTemplate jmsTemplate) {
- this.jmsTemplate = jmsTemplate;
- }
-
- @Nullable
- public JmsTemplate getJmsTemplate() {
- return this.jmsTemplate;
- }
-
- public void setJmsMessageConverter(MessageConverter jmsMessageConverter) {
- Assert.notNull(jmsMessageConverter, "MessageConverter must not be null");
- this.jmsMessageConverter = jmsMessageConverter;
- this.converterSet = true;
- }
-
- public MessageConverter getJmsMessageConverter() {
- return this.jmsMessageConverter;
- }
-
- public void setDefaultDestinationName(@Nullable String defaultDestinationName) {
- this.defaultDestinationName = defaultDestinationName;
- }
-
- @Nullable
- public String getDefaultDestinationName() {
- return this.defaultDestinationName;
- }
-
- public void afterPropertiesSet() {
- Assert.notNull(this.jmsTemplate, "Property 'connectionFactory' or 'jmsTemplate' is required");
- if (!this.converterSet && this.jmsTemplate.getMessageConverter() != null) {
- ((MessagingMessageConverter)this.jmsMessageConverter).setPayloadConverter(this.jmsTemplate.getMessageConverter());
- }
-
- }
-
- private JmsTemplate obtainJmsTemplate() {
- Assert.state(this.jmsTemplate != null, "No JmsTemplate set");
- return this.jmsTemplate;
- }
-
- public void send(Message<?> message) {
- Destination defaultDestination = (Destination)this.getDefaultDestination();
- if (defaultDestination != null) {
- this.send(defaultDestination, message);
- } else {
- this.send(this.getRequiredDefaultDestinationName(), message);
- }
-
- }
-
- public void convertAndSend(Object payload) throws MessagingException {
- this.convertAndSend((Object)payload, (MessagePostProcessor)null);
- }
-
- public void convertAndSend(Object payload, @Nullable MessagePostProcessor postProcessor) throws MessagingException {
- Destination defaultDestination = (Destination)this.getDefaultDestination();
- if (defaultDestination != null) {
- this.convertAndSend((Object)defaultDestination, payload, (MessagePostProcessor)postProcessor);
- } else {
- this.convertAndSend(this.getRequiredDefaultDestinationName(), payload, postProcessor);
- }
-
- }
-
- public void send(String destinationName, Message<?> message) throws MessagingException {
- this.doSend(destinationName, message);
- }
-
- public void convertAndSend(String destinationName, Object payload) throws MessagingException {
- this.convertAndSend(destinationName, payload, (Map)null);
- }
-
- public void convertAndSend(String destinationName, Object payload, @Nullable Map<String, Object> headers) throws MessagingException {
- this.convertAndSend(destinationName, payload, headers, (MessagePostProcessor)null);
- }
-
- public void convertAndSend(String destinationName, Object payload, @Nullable MessagePostProcessor postProcessor) throws MessagingException {
- this.convertAndSend(destinationName, payload, (Map)null, postProcessor);
- }
-
- public void convertAndSend(String destinationName, Object payload, @Nullable Map<String, Object> headers, @Nullable MessagePostProcessor postProcessor) throws MessagingException {
- Message<?> message = this.doConvert(payload, headers, postProcessor);
- this.send(destinationName, message);
- }
-
- @Nullable
- public Message<?> receive() {
- Destination defaultDestination = (Destination)this.getDefaultDestination();
- return defaultDestination != null ? this.receive(defaultDestination) : this.receive(this.getRequiredDefaultDestinationName());
- }
-
- @Nullable
- public <T> T receiveAndConvert(Class<T> targetClass) {
- Destination defaultDestination = (Destination)this.getDefaultDestination();
- return defaultDestination != null ? this.receiveAndConvert(defaultDestination, targetClass) : this.receiveAndConvert(this.getRequiredDefaultDestinationName(), targetClass);
- }
-
- @Nullable
- public Message<?> receive(String destinationName) throws MessagingException {
- return this.doReceive(destinationName);
- }
-
- @Nullable
- public <T> T receiveAndConvert(String destinationName, Class<T> targetClass) throws MessagingException {
- Message<?> message = this.doReceive(destinationName);
- return message != null ? this.doConvert(message, targetClass) : null;
- }
-
- @Nullable
- public Message<?> sendAndReceive(Message<?> requestMessage) {
- Destination defaultDestination = (Destination)this.getDefaultDestination();
- return defaultDestination != null ? this.sendAndReceive(defaultDestination, requestMessage) : this.sendAndReceive(this.getRequiredDefaultDestinationName(), requestMessage);
- }
-
- @Nullable
- public Message<?> sendAndReceive(String destinationName, Message<?> requestMessage) throws MessagingException {
- return this.doSendAndReceive(destinationName, requestMessage);
- }
-
- @Nullable
- public <T> T convertSendAndReceive(String destinationName, Object request, Class<T> targetClass) throws MessagingException {
- return this.convertSendAndReceive(destinationName, request, (Map)null, (Class)targetClass);
- }
-
- @Nullable
- public <T> T convertSendAndReceive(Object request, Class<T> targetClass) {
- return this.convertSendAndReceive((Object)request, (Class)targetClass, (MessagePostProcessor)null);
- }
-
- @Nullable
- public <T> T convertSendAndReceive(String destinationName, Object request, @Nullable Map<String, Object> headers, Class<T> targetClass) throws MessagingException {
- return this.convertSendAndReceive(destinationName, request, headers, targetClass, (MessagePostProcessor)null);
- }
-
- @Nullable
- public <T> T convertSendAndReceive(Object request, Class<T> targetClass, @Nullable MessagePostProcessor postProcessor) {
- Destination defaultDestination = (Destination)this.getDefaultDestination();
- return defaultDestination != null ? this.convertSendAndReceive((Object)defaultDestination, request, (Class)targetClass, (MessagePostProcessor)postProcessor) : this.convertSendAndReceive(this.getRequiredDefaultDestinationName(), request, targetClass, postProcessor);
- }
-
- @Nullable
- public <T> T convertSendAndReceive(String destinationName, Object request, Class<T> targetClass, @Nullable MessagePostProcessor requestPostProcessor) throws MessagingException {
- return this.convertSendAndReceive(destinationName, request, (Map)null, targetClass, requestPostProcessor);
- }
-
- @Nullable
- public <T> T convertSendAndReceive(String destinationName, Object request, @Nullable Map<String, Object> headers, Class<T> targetClass, @Nullable MessagePostProcessor postProcessor) {
- Message<?> requestMessage = this.doConvert(request, headers, postProcessor);
- Message<?> replyMessage = this.sendAndReceive(destinationName, requestMessage);
- return replyMessage != null ? this.getMessageConverter().fromMessage(replyMessage, targetClass) : null;
- }
-
- protected void doSend(Destination destination, Message<?> message) {
- try {
- this.obtainJmsTemplate().send(destination, this.createMessageCreator(message));
- } catch (JmsException var4) {
- throw this.convertJmsException(var4);
- }
- }
-
- protected void doSend(String destinationName, Message<?> message) {
- try {
- this.obtainJmsTemplate().send(destinationName, this.createMessageCreator(message));
- } catch (JmsException var4) {
- throw this.convertJmsException(var4);
- }
- }
-
- @Nullable
- protected Message<?> doReceive(Destination destination) {
- try {
- javax.jms.Message jmsMessage = this.obtainJmsTemplate().receive(destination);
- return this.convertJmsMessage(jmsMessage);
- } catch (JmsException var3) {
- throw this.convertJmsException(var3);
- }
- }
-
- @Nullable
- protected Message<?> doReceive(String destinationName) {
- try {
- javax.jms.Message jmsMessage = this.obtainJmsTemplate().receive(destinationName);
- return this.convertJmsMessage(jmsMessage);
- } catch (JmsException var3) {
- throw this.convertJmsException(var3);
- }
- }
-
- @Nullable
- protected Message<?> doSendAndReceive(Destination destination, Message<?> requestMessage) {
- try {
- javax.jms.Message jmsMessage = this.obtainJmsTemplate().sendAndReceive(destination, this.createMessageCreator(requestMessage));
- return this.convertJmsMessage(jmsMessage);
- } catch (JmsException var4) {
- throw this.convertJmsException(var4);
- }
- }
-
- @Nullable
- protected Message<?> doSendAndReceive(String destinationName, Message<?> requestMessage) {
- try {
- javax.jms.Message jmsMessage = this.obtainJmsTemplate().sendAndReceive(destinationName, this.createMessageCreator(requestMessage));
- return this.convertJmsMessage(jmsMessage);
- } catch (JmsException var4) {
- throw this.convertJmsException(var4);
- }
- }
-
- private JmsMessagingTemplate.MessagingMessageCreator createMessageCreator(Message<?> message) {
- return new JmsMessagingTemplate.MessagingMessageCreator(message, this.getJmsMessageConverter());
- }
-
- protected String getRequiredDefaultDestinationName() {
- String name = this.getDefaultDestinationName();
- if (name == null) {
- throw new IllegalStateException("No 'defaultDestination' or 'defaultDestinationName' specified. Check configuration of JmsMessagingTemplate.");
- } else {
- return name;
- }
- }
-
- @Nullable
- protected Message<?> convertJmsMessage(@Nullable javax.jms.Message message) {
- if (message == null) {
- return null;
- } else {
- try {
- return (Message)this.getJmsMessageConverter().fromMessage(message);
- } catch (Exception var3) {
- throw new MessageConversionException("Could not convert '" + message + "'", var3);
- }
- }
- }
-
- protected MessagingException convertJmsException(JmsException ex) {
- if (!(ex instanceof DestinationResolutionException) && !(ex instanceof InvalidDestinationException)) {
- return (MessagingException)(ex instanceof org.springframework.jms.support.converter.MessageConversionException ? new MessageConversionException(ex.getMessage(), ex) : new MessagingException(ex.getMessage(), ex));
- } else {
- return new org.springframework.messaging.core.DestinationResolutionException(ex.getMessage(), ex);
- }
- }
-
- private static class MessagingMessageCreator implements MessageCreator {
- private final Message<?> message;
- private final MessageConverter messageConverter;
-
- public MessagingMessageCreator(Message<?> message, MessageConverter messageConverter) {
- this.message = message;
- this.messageConverter = messageConverter;
- }
-
- public javax.jms.Message createMessage(Session session) throws JMSException {
- try {
- return this.messageConverter.toMessage(this.message, session);
- } catch (Exception var3) {
- throw new MessageConversionException("Could not convert '" + this.message + "'", var3);
- }
- }
- }
- }

我们可以跟踪方法的执行,进入到JmsTemplate这个类中,发现里面336行封装着一个方法。
- protected void doSend(MessageProducer producer, Message message) throws JMSException {
- if (this.deliveryDelay >= 0L) {
- producer.setDeliveryDelay(this.deliveryDelay);
- }
-
- if (this.isExplicitQosEnabled()) {
- producer.send(message, this.getDeliveryMode(), this.getPriority(), this.getTimeToLive());
- } else {
- producer.send(message);
- }
-
- }
这里就可以看到producer这个对象调用的send方法中有设置几个参数:
(1)、message不用说了,就是要发送的消息内容;
(2)、Delivery Mode翻译过来就是发送模式发送模式;
(3)、priority!这就是我们要找的优先级,传入0-9整数,数字越大优先级越高;
(4)、timeToLive延时发送时间;
Ok,到这里我们就可以看到,如果要执行这个方法就只需要isExplicitQosEnabled()这个方法返回值为true。
再看代码的第175行的方法:
- public void setQosSettings(QosSettings settings) {
- Assert.notNull(settings, "Settings must not be null");
- this.setExplicitQosEnabled(true);
- this.setDeliveryMode(settings.getDeliveryMode());
- this.setPriority(settings.getPriority());
- this.setTimeToLive(settings.getTimeToLive());
- }
这个方法就是设置上面几个参数的设置,只需传入QosSettings这个对象就行。
来看JmsTemplate这个类的构造方法:
- public JmsTemplate() {
- this.transactionalResourceFactory = new JmsTemplate.JmsTemplateResourceFactory();
- this.messageIdEnabled = true;
- this.messageTimestampEnabled = true;
- this.pubSubNoLocal = false;
- this.receiveTimeout = 0L;
- this.deliveryDelay = -1L;
- this.explicitQosEnabled = false;
- this.deliveryMode = 2;
- this.priority = 4;
- this.timeToLive = 0L;
- this.initDefaultStrategies();
- }
可以看到几个参数的默认值,也就是没有执行任何操作时如果将explicitQosEnabled设置为true则该消息发送出去优先级默认为4。
3.所以我们将生产者代码进行修改:
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.jms.core.JmsMessagingTemplate;
- import org.springframework.jms.core.JmsTemplate;
- import org.springframework.jms.support.QosSettings;
- import org.springframework.stereotype.Service;
-
- import javax.jms.DeliveryMode;
- import javax.jms.Destination;
-
- @Service
- public class TestProducer {
-
- @Autowired
- private JmsMessagingTemplate jmsMessagingTemplate;
-
- public void sendMsg(Destination destination, String text, int priority){
- //获取jmsTemplate对象
- JmsTemplate jmsTemplate = jmsMessagingTemplate.getJmsTemplate();
- //创建QosSettings对象
- QosSettings settings = new QosSettings();
- //设置优先级
- settings.setPriority(priority);
- //设置发送模式
- settings.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- //设置延时发送时间
- settings.setTimeToLive(1000L);
- //将设置传入
- jmsTemplate.setQosSettings(settings);
- //发送消息
- jmsMessagingTemplate.convertAndSend(destination,text);
- }
-
- }

这样生产者就写完了。
4.测试
首先,先写一个消费者,监听队列queue1:
- import org.springframework.jms.annotation.JmsListener;
- import org.springframework.stereotype.Component;
-
- @Component
- public class TestConsumer {
-
- @JmsListener(destination = "queue1")
- public void receiveTest(String text){
- System.out.println("接收到queue1发送的消息:"+text);
- }
- }
再写一个controller类可以通过前端调用发送消息的方法:
- import com.example.springboot.mq.demo.producer.TestProducer;
- import org.apache.activemq.command.ActiveMQQueue;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import javax.jms.Destination;
- import java.util.Random;
-
- @RestController
- public class TestController {
-
- @Autowired
- private TestProducer producer;
-
- @RequestMapping("/test")
- public String test(){
- Destination destination = new ActiveMQQueue("queue1");
- Random random = new Random(10);
- for (int i=0;i < 100;i++){
- //随机产生优先级数字
- int priority = random.nextInt(9);
- producer.sendMsg(destination,"消息No."+i+"优先级为:"+priority,priority);
- }
- return "发送成功";
- }
- }

然后启动项目进行测试,测试结果:
- 发送消息优先级为:0
- 发送消息优先级为:6
- 发送消息优先级为:6
- 发送消息优先级为:0
- 发送消息优先级为:4
- 发送消息优先级为:1
- 发送消息优先级为:1
- 发送消息优先级为:4
- 发送消息优先级为:3
- 发送消息优先级为:1
- 发送消息优先级为:1
- 发送消息优先级为:2
- 发送消息优先级为:6
- 发送消息优先级为:6
- 发送消息优先级为:3
- 发送消息优先级为:3
- 发送消息优先级为:1
- 发送消息优先级为:1
- 发送消息优先级为:4
- 发送消息优先级为:7
- 发送消息优先级为:8
- 发送消息优先级为:7
- 发送消息优先级为:6
- 发送消息优先级为:8
- 发送消息优先级为:5
- 发送消息优先级为:1
- 发送消息优先级为:5
- 发送消息优先级为:5
- 发送消息优先级为:8
- 发送消息优先级为:8
- 发送消息优先级为:2
- 发送消息优先级为:8
- 发送消息优先级为:6
- 发送消息优先级为:8
- 发送消息优先级为:7
- 发送消息优先级为:1
- 发送消息优先级为:7
- 发送消息优先级为:1
- 发送消息优先级为:0
- 发送消息优先级为:4
- 发送消息优先级为:8
- 发送消息优先级为:1
- 发送消息优先级为:8
- 发送消息优先级为:0
- 发送消息优先级为:7
- 发送消息优先级为:5
- 发送消息优先级为:6
- 发送消息优先级为:0
- 发送消息优先级为:2
- 发送消息优先级为:4

接收到的消息:
- 消息内容:1优先级为:6
- 消息内容:20优先级为:8
- 消息内容:23优先级为:8
- 消息内容:28优先级为:8
- 消息内容:29优先级为:8
- 消息内容:31优先级为:8
- 消息内容:33优先级为:8
- 消息内容:40优先级为:8
- 消息内容:42优先级为:8
- 消息内容:19优先级为:7
- 消息内容:21优先级为:7
- 消息内容:34优先级为:7
- 消息内容:36优先级为:7
- 消息内容:44优先级为:7
- 消息内容:2优先级为:6
- 消息内容:12优先级为:6
- 消息内容:13优先级为:6
- 消息内容:22优先级为:6
- 消息内容:32优先级为:6
- 消息内容:46优先级为:6
- 消息内容:24优先级为:5
- 消息内容:26优先级为:5
- 消息内容:27优先级为:5
- 消息内容:45优先级为:5
- 消息内容:4优先级为:4
- 消息内容:7优先级为:4
- 消息内容:18优先级为:4
- 消息内容:39优先级为:4
- 消息内容:49优先级为:4
- 消息内容:8优先级为:3
- 消息内容:14优先级为:3
- 消息内容:15优先级为:3
- 消息内容:11优先级为:2
- 消息内容:30优先级为:2
- 消息内容:48优先级为:2
- 消息内容:5优先级为:1
- 消息内容:6优先级为:1
- 消息内容:9优先级为:1
- 消息内容:10优先级为:1
- 消息内容:16优先级为:1
- 消息内容:17优先级为:1
- 消息内容:25优先级为:1
- 消息内容:35优先级为:1
- 消息内容:37优先级为:1
- 消息内容:41优先级为:1
- 消息内容:0优先级为:0
- 消息内容:3优先级为:0
- 消息内容:38优先级为:0
- 消息内容:43优先级为:0
- 消息内容:47优先级为:0

结果显示,除第一条外,其他消息的接收全部是按消息的优先级出队列。所以大功告成,至于第一个消息为什么没有按优先级出队列后续我再研究一下。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。