当前位置:   article > 正文

spring集成rabbitMQ配置实现及绑定死信队列_consumer thread error, thread abort.

consumer thread error, thread abort.

环境:

jdk:1.7;

系统:win7;

需求:在既有的webService项目中集成rabbitMQ;

首先,pom.xml中添加依赖:

注意:这里的spring-rabbit版本需要对应的jdk版本支持。目前没找到有对应关系表;

因为我们使用的jdk是1.7,一开始我使用2.x的rabbit依赖,项目运行会报错;

  1. <!-- RabbitMQ -->
  2. <dependency>
  3. <groupId>com.rabbitmq</groupId>
  4. <artifactId>amqp-client</artifactId>
  5. <version>3.5.1</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.springframework.amqp</groupId>
  9. <artifactId>spring-rabbit</artifactId>
  10. <version>1.4.5.RELEASE</version>
  11. </dependency>

接下来,主要是xml配置文件;

在resource文件夹下添加application-mq.xml文件;

里面的“http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd”这个版本最好是跟spring-rabbit版本一致,否则每次启动项目都会去网络上加载这个文件,网络有时会有波动,导致加载这个网络文件失败;

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  5. xmlns:context="http://www.springframework.org/schema/context"
  6. xmlns:tx="http://www.springframework.org/schema/tx"
  7. xmlns:util="http://www.springframework.org/schema/util"
  8. xsi:schemaLocation="http://www.springframework.org/schema/rabbit
  9. http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
  10. http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
  11. http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
  12. <context:property-placeholder location="classpath*:/META-INF/env/*.properties" />
  13. <!--<util:properties id="appConfig" location="classpath:/META-INF/env/rabbit.properties"></util:properties>-->
  14. <!--配置connection-factory,指定连接rabbit server参数-->
  15. <!-- virtual-host="/"是默认的虚拟机路径-->
  16. <!-- channel-cache-size,channel的缓存数量,默认值为25 -->
  17. <!-- cache-mode,缓存连接模式,默认值为CHANNEL(单个connection连接,连接之后关闭,自动销毁) -->
  18. <rabbit:connection-factory id="connectionFactory" username="guest" password="guest" host="localhost" port="5672" virtual-host="/" channel-cache-size="25"/>
  19. <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成-->
  20. <rabbit:admin connection-factory="connectionFactory"/>
  21. <!-- 参数介绍:
  22. name:queue的名字。
  23. durable:是否为持久的。默认是true,RabbitMQ重启后queue依然存在。
  24. auto-delete:表示消息队列没有在使用时将被自动删除。默认是false。
  25. exclusive:表示该消息队列是否只在当前connection生效。默认false。
  26. -->
  27. <rabbit:queue id="METHOD_EVENT_DIRECT_QUEUE" name="METHOD_EVENT_DIRECT_QUEUE" durable="true" auto-delete="false" exclusive="false">
  28. <!-- 对应的死信队列 -->
  29. <rabbit:queue-arguments>
  30. <!-- 超时时间为30分钟 单位是毫秒-->
  31. <entry key="x-message-ttl" value="1800000" value-type="java.lang.Long" />
  32. <entry key="x-dead-letter-exchange" value="dlx.exchange" />
  33. <entry key="x-dead-letter-routing-key" value="routingKey" />
  34. </rabbit:queue-arguments>
  35. </rabbit:queue>
  36. <!-- 声明死信队列 -->
  37. <rabbit:queue id="dlq.queue" name="dlq.queue" durable="true" auto-delete="false" exclusive="false"/>
  38. <!--绑定队列,
  39. rabbitmq的exchangeType常用的三种模式:direct,fanout,topic三种,
  40. 我们用direct模式,即rabbit:direct-exchange标签,
  41. Direct交换器很简单,如果是Direct类型,就会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,如果相等,则发送到该Binding对应的Queue中。
  42. 有一个需要注意的地方:如果找不到指定的exchange,就会报错。
  43. 但routing key找不到的话,不会报错,这条消息会直接丢失,所以此处要小心,
  44. auto-delete:自动删除,如果为Yes,则该交换机所有队列queue删除后,自动删除交换机,默认为false -->
  45. <!-- 参数介绍:
  46. name:exchange的名字。
  47. durable:是否为持久的,默认为true,RabbitMQ重启后exhange依然存在。
  48. auto-delete:表示exchange在未被使用时是否自动删除,默认是false。
  49. key:queue在该direct-exchange中的key值。当消息发送给该direct-exchange中指定key为设置值时,消息将会转发给queue参数指定的消息队列。(可以理解为就是routingkey)
  50. -->
  51. <!-- 定义direct exchange,绑定METHOD_EVENT_DIRECT_EXCHANGE queue -->
  52. <rabbit:direct-exchange name="METHOD_EVENT_DIRECT_EXCHANGE" durable="true" auto-delete="false">
  53. <rabbit:bindings>
  54. <rabbit:binding queue="METHOD_EVENT_DIRECT_QUEUE" key="METHOD_EVENT_DIRECT_ROUTING_KEY"></rabbit:binding>
  55. </rabbit:bindings>
  56. </rabbit:direct-exchange>
  57. <!-- 死信队列绑定 -->
  58. <rabbit:direct-exchange name="dlx.exchange" durable="true" auto-delete="false">
  59. <rabbit:bindings>
  60. <rabbit:binding queue="dlq.queue" key="bindingKey"></rabbit:binding>
  61. </rabbit:bindings>
  62. </rabbit:direct-exchange>
  63. <!-- 定义fanout-exchange,绑定 queue -->
  64. <!-- <rabbit:fanout-exchange name="">
  65. <rabbit:bindings>
  66. <rabbit:binding></rabbit:binding>
  67. </rabbit:bindings>
  68. </rabbit:fanout-exchange> -->
  69. <!--定义rabbit template用于数据的接收和发送-->
  70. <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="METHOD_EVENT_DIRECT_EXCHANGE" />
  71. <!-- 配置线程池 -->
  72. <bean id ="taskExecutor" class ="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >
  73. <!-- 线程池维护线程的最少数量,核心线程数 -->
  74. <property name ="corePoolSize" value ="5" />
  75. <!-- 线程池维护线程所允许的空闲时间 单位:秒-->
  76. <property name ="keepAliveSeconds" value ="60" />
  77. <!-- 线程池维护线程的最大数量 -->
  78. <property name ="maxPoolSize" value ="100" />
  79. <!-- 线程池所使用的缓冲队列 -->
  80. <property name ="queueCapacity" value ="50" />
  81. </bean>
  82. <!-- 消息接收者 -->
  83. <bean id="msgConsumer" class="com.mq.consumer.MsgConsumer"/>
  84. <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象
  85. acknowledge:auto 自动确认(默认), manual手动确认
  86. concurrency:并发数量 ,设置的是对每个listener在初始化的时候设置的并发消费者的个数
  87. prefetch 是每次从一次性从broker里面取的待消费的消息的个数
  88. -->
  89. <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" task-executor="taskExecutor" concurrency="5" prefetch="5">
  90. <rabbit:listener queues="METHOD_EVENT_DIRECT_QUEUE" ref="msgConsumer"/>
  91. </rabbit:listener-container>
  92. </beans>

在web.xml文件中引入application-mq.xml文件

  1. <context-param>
  2. <param-name>contextConfigLocation</param-name>
  3. <param-value>classpath:application-context.xml classpath:application-mq.xml</param-value>
  4. </context-param>

生产者:

  1. package com.gtmc.mq.producer;
  2. import java.util.Map;
  3. import javax.annotation.Resource;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.amqp.core.AmqpTemplate;
  7. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  8. import org.springframework.context.support.AbstractApplicationContext;
  9. import org.springframework.context.support.ClassPathXmlApplicationContext;
  10. import org.springframework.stereotype.Service;
  11. /**
  12. *
  13. * @author wangxinyang
  14. * @date 2019年4月18日
  15. * @version 1.0.0
  16. *
  17. */
  18. @Service
  19. public class MsgProducer {
  20. private final Logger logger = LoggerFactory.getLogger(this.getClass());
  21. //两种获取amqpTemplate方法;一种是注入;另一种是通过加载application-mq.xml文件;
  22. @Resource
  23. private AmqpTemplate amqpTemplate;
  24. //第二种获取amqpTemplate方法;
  25. // AbstractApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:application-mq.xml");
  26. //getBean
  27. // AmqpTemplate amqpTemplate = ctx.getBean(AmqpTemplate.class);
  28. public void sendMsg(String content) {
  29. logger.info("要发送的消息 :" + content);
  30. amqpTemplate.convertAndSend("METHOD_EVENT_DIRECT_EXCHANGE", "METHOD_EVENT_DIRECT_ROUTING_KEY", content);
  31. logger.info("消息发送成功");
  32. }
  33. }

这里获取amqpTemplate,也可以使用rabbitTemplate,都可以去实例化。

生产者的具体使用,可以使用接口中去注入生产者,然后调用发送消息的方法;

这里有个问题,就是如果你要使用spring的注入方式,那就这个接口的一整套都要使用注入方式。

如果你使用new的方式,那就只能用new的方式。

不能既用new的方式,又用注入的方式,这种情况,会出现注入失败的问题;

--------------------------------------------------

我们本次项目使用的是加载配置文件的方式去发送消息的,原本是写在基类是去加载配置文件,后来发现每次调用都要去加载,特别耗时;

后来我写了一个applicationUtil类,使用了单例模式,去加载xml配置文件,这样就不用每次调用都去加载配置文件了。

  1. package com.wxy.util;
  2. import org.springframework.context.support.AbstractApplicationContext;
  3. import org.springframework.context.support.ClassPathXmlApplicationContext;
  4. /**
  5. * application-mq.xml实例化工具
  6. * @author wangxinyang
  7. * @date 2019年5月22日
  8. * @version 1.0.0
  9. *
  10. */
  11. final public class ApplicationContextMqUtil {
  12. // private static AbstractApplicationContext applicationContext = null;
  13. private ApplicationContextMqUtil() {}
  14. private static class SingletonInstance {
  15. private static final AbstractApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/application-mq.xml");
  16. }
  17. public static AbstractApplicationContext getInstance() {
  18. return SingletonInstance.applicationContext;
  19. }
  20. }

接下来是消费者实现:

  1. package com.mq.consumer;
  2. import java.io.UnsupportedEncodingException;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.amqp.core.Message;
  6. import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
  7. import org.springframework.stereotype.Service;
  8. import com.rabbitmq.client.Channel;
  9. /**
  10. * mq消费者
  11. * @author wangxinyang
  12. * @date 2019年4月18日
  13. * @version 1.0.0
  14. *
  15. */
  16. @Service
  17. public class MsgConsumer implements ChannelAwareMessageListener {
  18. private final Logger logger = LoggerFactory.getLogger(MsgConsumer.class);
  19. @Override
  20. public void onMessage(Message message, Channel channel) throws Exception {
  21. String context = "";
  22. try {
  23. context = new String(message.getBody(), "utf-8");
  24. } catch (UnsupportedEncodingException e) {
  25. e.printStackTrace();
  26. }
  27. logger.info("message:" + message);
  28. logger.info("接收处理当前监听队列当中的消息: " + context + "\n 当前线程name:" + Thread.currentThread().getName() + "\n 当前线程id:"
  29. + Thread.currentThread().getId());
  30. // 消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
  31. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  32. // nack返回false,并重新回到队列,就是重发机制,通常存在于消费失败后处理中;
  33. //第三个参数与拒绝消息方法的第二个参数同理。即true重新进入队列,false则丢弃;
  34. // channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
  35. // 拒绝消息,即丢弃消息,消息不会重新回到队列,后面的参数为true则重入队列;为false则丢弃;
  36. // channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
  37. }
  38. }

 

接下来分享一下使用过程中遇到的问题:

由于使用的是spring 的amqpTelplate,还有一个是rabbitTemplate,这两个的区别不大,rabbitTemplate是也是遵循amqp协议的;

 

当消费者在消费消息时,在收到消息去做业务处理时,抛出error异常时,

注意:error异常默认是没有捕获的,我们一般写的try...cath块,都是捕获的Exception异常;

这里可以了解一下exception和error的区别;(自行百度)

会走这里:org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.class ,1199行,

这里捕获到的error异常,logger.error("Consumer thread error, thread abort.", e);

到此,ack不会给服务器返回信息,服务器默认就是unacked,在此之后;

上面给aborted赋值为true;会走下面这个if;

然后stop();

这时候其实服务还在运行,但是消费者停止掉了(这里不是很确定,是当前消费者线程停止,还是整个消费者停止(即假死状态)),再有消息进入队列中,消费者是不会去正常消费消息的;

到这里我之前用的方法是重启消费者;

--------------------------------------------------------------------

还有一种情况,就是当我们设置手动ack的时候,消费消息,如果消费失败,没有重发和拒绝机制,也没有走到ack,这时候,消息其实还是存在于队列中的,只是我们在观礼台获取不到这条消息,这条消息也没有重新进入到队列。

这种情况,同样是需要重启消费者才能去重新消费这条消息。

所以我们在手动确认消息的情况下,在catch中需要处理一下消费失败的消息,拒绝或者是发送到死信队列中,然后手动处理。

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/139183?site
推荐阅读
相关标签
  

闽ICP备14008679号