当前位置:   article > 正文

使用队列ActiveMQ处理超时订单_activemq 超时

activemq 超时
背景:

在商城项目中,有这样的需求:针对用户已下单但是一直未去付款的订单做超时处理,因为如果一个商品有库存数量的概念,在用户每次下单时做库存减操作,在用户取消订单时做库存返还操作。这时候为了防止恶意刷库存,就需要针对已下单但是长时间未支付的订单做超时处理,返还库存的操作,此时,就要涉及到了订单超时的处理。

解决方式:

方式一:定时任务
思路:
使用定时任务每分钟轮询数据库,查询出超时的订单,进行update

代码示例:

// 在Spirng中以注解的方式创建定时任务:
@Scheduled(cron = "0 */1 * * * ?") // 每分钟执行一次
// 业务代码
Date date = new Date();
Date dateyq = new Date(date.getTime() - (1000 * 60 * 15));//推迟15分钟
String dateyqStr = TimeUtil.nowDate2ToString1(dateyq);
Example example = new Example(ConsumptionOrder.class);
example.createCriteria().andEqualTo("orderState", BaseConstant.USER_CONSU_ORDER_AWAIT_PAY)//订单状态 1:待付款 
      .andLessThan("createTime", dateyqStr);
List<ConsumptionOrder> list = consumptionOrderMapper.selectByExample(example);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

方式二:使用队列ActiveMQ处理超时订单

思路:
使用activeMQ的延时投递功能,在创建订单成功之后,向队列中存入订单id,设置延时发送,延时时长为15分钟,此时订单id将会在15分钟后存入到activeMq的消息队列中,然后mq监听器监听到消息,拿到队列中的订单id进行处理,拿到订单id以后去查询该订单是否完成付款,如果未付款,则更新为订单超时交易关闭。

Property nametypedescription
AMQ_SCHEDULED_DELAYlong延迟投递的时间
AMQ_SCHEDULED_PERIODlong重复投递的时间间隔
AMQ_SCHEDULED_REPEATint重复投递次数
AMQ_SCHEDULED_CRONStringCron表达式

1、修改activemq配.xml配置文件,启用延时投递。
在这里插入图片描述

    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">
  • 1

2、项目中配置 applicationContext-activemq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:util="http://www.springframework.org/schema/util"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
       http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <!-- 自动扫描MQ包 ,将带有注解的类 纳入spring容器管理 -->
    <context:component-scan base-package="com.pzq.haisong.order.service.impl.activeMq"/>

    <!-- 配置MQ生产者 -->
    <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
        <property name="connectionFactory">
            <!-- 真正可以产生Connection的ConnectionFactory,由对应的JMS服务厂商提供 -->
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL" value="tcp://192.168.1.93:61616"></property>
            </bean>
        </property>
    </bean>

    <!-- 这个是队列 -->
    <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg index="0" value="spring-active-queue"></constructor-arg>
    </bean>

    <!-- 这个是主题 -->
    <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg index="0" value="spring-active-topic"></constructor-arg>
    </bean>

    <!-- Spring提供的JMS工具类,它可以进行消息发送,接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="jmsFactory"></property>
        <property name="defaultDestination" ref="destinationQueue"></property>
        <property name="messageConverter">
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"></bean>
        </property>
    </bean>

     <!--配置监听程序-->
    <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="jmsFactory"></property>
        <property name="destination" ref="destinationQueue"></property>
        <!-- 需要实现一个myMessageListener接口 -->
        <property name="messageListener" ref="myMessageListener"></property>
    </bean>

</beans>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52

使用的这里需要把mq配置文件加入spring管理
在这里插入图片描述
3、创建activeMq生产者和消息监听器

/**
 * 方法描述:activeMq消息生产者
 * 创建时间:2019-11-30 15:01:27
 */
@Service("delayOrderService")
public class DelayOrderServiceImpl implements DelayOrderService {
    private static final Logger LOGGER = Logger.getLogger(DelayOrderServiceImpl.class);

    private static class CreateMessage implements MessageCreator {

        private JSONObject vipOrder;// vip订单实体
        private long expireTime;// 超时时间
        public CreateMessage(JSONObject vipOrder, long expireTime) {
            super();
            this.vipOrder = vipOrder;
            this.expireTime = expireTime;
        }

        public Message createMessage(Session session) throws JMSException {
            Message message = session.createObjectMessage(vipOrder);
            // ScheduledMessage.AMQ_SCHEDULED_DELAY 设置消息延迟发送,expireTime:延迟时间
            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, expireTime);
            return message;
        }
    }

    /**
     * 方法描述:创建订单时调用此方法,将要处理的订单id存入消息队列
     * 创建时间:2019-11-30 15:02:11
     */
    @Override
    public void addDelayOrder(JSONObject vipOrder, long expireTime) {

        LOGGER.info("订单超时时长:" + expireTime/1000 + "秒,将被发送给消息队列,订单id:" + vipOrder.getLong("orderId"));

        // 第一步:初始化一个spring容器
        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("spring/applicationContext-activemq.xml");
        // 第二步:从容器中获得JMSTemplate对象。
        JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
        // 第三步:从容器中获得一个Destination对象
        Queue queue = (Queue) applicationContext.getBean("destinationQueue");

        jmsTemplate.send(queue, new DelayOrderServiceImpl.CreateMessage(vipOrder, expireTime));

        LOGGER.info("存入消息队列成功");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
/**
 * 类描述:ActiveMQ 消息监听器
 * @date:2019/11/30 11:16
 */
@Service
public class MyMessageListener implements MessageListener {
    private static final Logger LOGGER = Logger.getLogger(SpringMQ_Produce.class);

//    @Resource
//    private OrderService orderService;

    @Override
    public void onMessage(Message message) {
        try {
            JSONObject object = (JSONObject)((ObjectMessage) message).getObject();
            Long orderId = object.getLong("orderId");
            Integer orderType = object.getInteger("orderType");
            LOGGER.info("MQ接收到的消息:" + orderId + " + " + TimeUtil.nowTime());
            OrderService orderService = SpringContextHolder.getBean("orderService");
            orderService.updateDelayOrder(orderId, orderType);
        } catch (Exception e) {
            LOGGER.error("MessageListener异常:" + MyPubUtil.getError(e));
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

	/**
	 * 业务代码:更新延时订单状态
	 * @param orderId
	 * @param orderType
	 * @return
	 */
	@Override
	public Map<String, Object> updateDelayOrder(Long orderId, Integer orderType) {
		if (1 == orderType) {
			VipOrder vipOrder = vipOrderMapper.selectByPrimaryKey(orderId);
			if (null != vipOrder && BaseConstant.USER_CONSU_ORDER_AWAIT_PAY.equals(vipOrder.getOrderState())) {
				vipOrder.setOrderState(BaseConstant.USER_CONVER_ORDER_FINISH_CLOSE);// 7-交易关闭
				vipOrder.setUpdateTime(TimeUtil.nowTime());
				vipOrderMapper.updateByPrimaryKey(vipOrder);
				LOGGER.info("处理完成超时订单[VIP订单],订单id:" + orderId);
			}
		} else if (2 == orderType) {
			ConsumptionOrder consumptionOrder = consumptionOrderMapper.selectByPrimaryKey(orderId);
			if (consumptionOrder != null && BaseConstant.USER_CONSU_ORDER_AWAIT_PAY.equals(consumptionOrder.getOrderState())) {
				updateStock(consumptionOrder);// 修改库存,调用你自己修改库存的方法,这里就不贴上来了
				consumptionOrder.setOrderState(BaseConstant.USER_CONSU_ORDER_FINISH_CLOSE);// 7-交易关闭
				consumptionOrder.setUpdateTime(TimeUtil.nowTime());
				consumptionOrderMapper.updateByPrimaryKey(consumptionOrder);
				LOGGER.info("处理完成超时订单[消费订单],订单id:" + orderId);
			}
		}
		return ApiConstant.putok();
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

这里就是mq的配置文件和生产者和监听器的代码,以下记录下遇到的问题:
在mq的监听器中,通过@Resource和@Autowired注入service实例失败,获取不到bean。
spring容器中根本没有完成对注解bean的扫描,因为dispatcher.xml中配置的注解bean的优先级肯定没有框架中的contextListener的优先级高,所以这里获取的实例是null。

这里是通过一篇文章找到的解决方案:

解决方案,贴上链接

在applicationContext.xml声明一个bean:
在这里插入图片描述

public class SpringContextHolder implements ApplicationContextAware {
    private static ApplicationContext applicationContext;

    /**
     * 实现ApplicationContextAware接口的context注入函数, 将其存入静态变量.
     */
    public void setApplicationContext(ApplicationContext applicationContext) {
        SpringContextHolder.applicationContext = applicationContext; // NOSONAR
    }

    /**
     * 取得存储在静态变量中的ApplicationContext.
     */
    public static ApplicationContext getApplicationContext() {
        checkApplicationContext();
        return applicationContext;
    }

    /**
     * 从静态变量ApplicationContext中取得Bean, 自动转型为所赋值对象的类型.
     */
    @SuppressWarnings("unchecked")
    public static <T> T getBean(String name) {
        checkApplicationContext();
        return (T) applicationContext.getBean(name);
    }

    /**
     * 从静态变量ApplicationContext中取得Bean, 自动转型为所赋值对象的类型.
     */
    @SuppressWarnings("unchecked")
    public static <T> T getBean(Class<T> clazz) {
        checkApplicationContext();
        return (T) applicationContext.getBeansOfType(clazz);
    }

    /**
     * 清除applicationContext静态变量.
     */
    public static void cleanApplicationContext() {
        applicationContext = null;
    }

    private static void checkApplicationContext() {
        if (applicationContext == null) {
            throw new IllegalStateException("applicaitonContext未注入,请在applicationContext.xml中定义SpringContextHolder");
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

这里是用这种方式来获取bean。

以上。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/451989
推荐阅读
相关标签
  

闽ICP备14008679号