赞
踩
环境:
jdk:1.7;
系统:win7;
需求:在既有的webService项目中集成rabbitMQ;
首先,pom.xml中添加依赖:
注意:这里的spring-rabbit版本需要对应的jdk版本支持。目前没找到有对应关系表;
因为我们使用的jdk是1.7,一开始我使用2.x的rabbit依赖,项目运行会报错;
- <!-- RabbitMQ -->
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>3.5.1</version>
- </dependency>
- <dependency>
- <groupId>org.springframework.amqp</groupId>
- <artifactId>spring-rabbit</artifactId>
- <version>1.4.5.RELEASE</version>
- </dependency>
接下来,主要是xml配置文件;
在resource文件夹下添加application-mq.xml文件;
里面的“http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd”这个版本最好是跟spring-rabbit版本一致,否则每次启动项目都会去网络上加载这个文件,网络有时会有波动,导致加载这个网络文件失败;
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:rabbit="http://www.springframework.org/schema/rabbit"
- xmlns:context="http://www.springframework.org/schema/context"
- xmlns:tx="http://www.springframework.org/schema/tx"
- xmlns:util="http://www.springframework.org/schema/util"
- xsi:schemaLocation="http://www.springframework.org/schema/rabbit
- http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
- http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
- http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
-
- <context:property-placeholder location="classpath*:/META-INF/env/*.properties" />
-
- <!--<util:properties id="appConfig" location="classpath:/META-INF/env/rabbit.properties"></util:properties>-->
-
- <!--配置connection-factory,指定连接rabbit server参数-->
- <!-- virtual-host="/"是默认的虚拟机路径-->
- <!-- channel-cache-size,channel的缓存数量,默认值为25 -->
- <!-- cache-mode,缓存连接模式,默认值为CHANNEL(单个connection连接,连接之后关闭,自动销毁) -->
- <rabbit:connection-factory id="connectionFactory" username="guest" password="guest" host="localhost" port="5672" virtual-host="/" channel-cache-size="25"/>
-
- <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成-->
- <rabbit:admin connection-factory="connectionFactory"/>
-
- <!-- 参数介绍:
- name:queue的名字。
- durable:是否为持久的。默认是true,RabbitMQ重启后queue依然存在。
- auto-delete:表示消息队列没有在使用时将被自动删除。默认是false。
- exclusive:表示该消息队列是否只在当前connection生效。默认false。
- -->
-
- <rabbit:queue id="METHOD_EVENT_DIRECT_QUEUE" name="METHOD_EVENT_DIRECT_QUEUE" durable="true" auto-delete="false" exclusive="false">
- <!-- 对应的死信队列 -->
- <rabbit:queue-arguments>
- <!-- 超时时间为30分钟 单位是毫秒-->
- <entry key="x-message-ttl" value="1800000" value-type="java.lang.Long" />
- <entry key="x-dead-letter-exchange" value="dlx.exchange" />
- <entry key="x-dead-letter-routing-key" value="routingKey" />
- </rabbit:queue-arguments>
- </rabbit:queue>
-
- <!-- 声明死信队列 -->
- <rabbit:queue id="dlq.queue" name="dlq.queue" durable="true" auto-delete="false" exclusive="false"/>
-
- <!--绑定队列,
- rabbitmq的exchangeType常用的三种模式:direct,fanout,topic三种,
- 我们用direct模式,即rabbit:direct-exchange标签,
- Direct交换器很简单,如果是Direct类型,就会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,如果相等,则发送到该Binding对应的Queue中。
- 有一个需要注意的地方:如果找不到指定的exchange,就会报错。
- 但routing key找不到的话,不会报错,这条消息会直接丢失,所以此处要小心,
- auto-delete:自动删除,如果为Yes,则该交换机所有队列queue删除后,自动删除交换机,默认为false -->
- <!-- 参数介绍:
- name:exchange的名字。
- durable:是否为持久的,默认为true,RabbitMQ重启后exhange依然存在。
- auto-delete:表示exchange在未被使用时是否自动删除,默认是false。
- key:queue在该direct-exchange中的key值。当消息发送给该direct-exchange中指定key为设置值时,消息将会转发给queue参数指定的消息队列。(可以理解为就是routingkey)
- -->
- <!-- 定义direct exchange,绑定METHOD_EVENT_DIRECT_EXCHANGE queue -->
- <rabbit:direct-exchange name="METHOD_EVENT_DIRECT_EXCHANGE" durable="true" auto-delete="false">
- <rabbit:bindings>
- <rabbit:binding queue="METHOD_EVENT_DIRECT_QUEUE" key="METHOD_EVENT_DIRECT_ROUTING_KEY"></rabbit:binding>
- </rabbit:bindings>
- </rabbit:direct-exchange>
-
- <!-- 死信队列绑定 -->
- <rabbit:direct-exchange name="dlx.exchange" durable="true" auto-delete="false">
- <rabbit:bindings>
- <rabbit:binding queue="dlq.queue" key="bindingKey"></rabbit:binding>
- </rabbit:bindings>
- </rabbit:direct-exchange>
-
-
- <!-- 定义fanout-exchange,绑定 queue -->
- <!-- <rabbit:fanout-exchange name="">
- <rabbit:bindings>
- <rabbit:binding></rabbit:binding>
- </rabbit:bindings>
- </rabbit:fanout-exchange> -->
-
- <!--定义rabbit template用于数据的接收和发送-->
- <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="METHOD_EVENT_DIRECT_EXCHANGE" />
-
- <!-- 配置线程池 -->
- <bean id ="taskExecutor" class ="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >
- <!-- 线程池维护线程的最少数量,核心线程数 -->
- <property name ="corePoolSize" value ="5" />
- <!-- 线程池维护线程所允许的空闲时间 单位:秒-->
- <property name ="keepAliveSeconds" value ="60" />
- <!-- 线程池维护线程的最大数量 -->
- <property name ="maxPoolSize" value ="100" />
- <!-- 线程池所使用的缓冲队列 -->
- <property name ="queueCapacity" value ="50" />
- </bean>
-
- <!-- 消息接收者 -->
- <bean id="msgConsumer" class="com.mq.consumer.MsgConsumer"/>
-
-
- <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象
- acknowledge:auto 自动确认(默认), manual手动确认
- concurrency:并发数量 ,设置的是对每个listener在初始化的时候设置的并发消费者的个数
- prefetch 是每次从一次性从broker里面取的待消费的消息的个数
- -->
-
- <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" task-executor="taskExecutor" concurrency="5" prefetch="5">
- <rabbit:listener queues="METHOD_EVENT_DIRECT_QUEUE" ref="msgConsumer"/>
- </rabbit:listener-container>
-
- </beans>
在web.xml文件中引入application-mq.xml文件
- <context-param>
- <param-name>contextConfigLocation</param-name>
- <param-value>classpath:application-context.xml classpath:application-mq.xml</param-value>
- </context-param>
生产者:
- package com.gtmc.mq.producer;
-
- import java.util.Map;
-
- import javax.annotation.Resource;
-
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.AmqpTemplate;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.context.support.AbstractApplicationContext;
- import org.springframework.context.support.ClassPathXmlApplicationContext;
- import org.springframework.stereotype.Service;
-
-
- /**
- *
- * @author wangxinyang
- * @date 2019年4月18日
- * @version 1.0.0
- *
- */
- @Service
- public class MsgProducer {
-
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
-
- //两种获取amqpTemplate方法;一种是注入;另一种是通过加载application-mq.xml文件;
- @Resource
- private AmqpTemplate amqpTemplate;
-
- //第二种获取amqpTemplate方法;
- // AbstractApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:application-mq.xml");
- //getBean
- // AmqpTemplate amqpTemplate = ctx.getBean(AmqpTemplate.class);
-
- public void sendMsg(String content) {
-
- logger.info("要发送的消息 :" + content);
- amqpTemplate.convertAndSend("METHOD_EVENT_DIRECT_EXCHANGE", "METHOD_EVENT_DIRECT_ROUTING_KEY", content);
- logger.info("消息发送成功");
-
- }
-
-
-
-
-
- }
这里获取amqpTemplate,也可以使用rabbitTemplate,都可以去实例化。
生产者的具体使用,可以使用接口中去注入生产者,然后调用发送消息的方法;
这里有个问题,就是如果你要使用spring的注入方式,那就这个接口的一整套都要使用注入方式。
如果你使用new的方式,那就只能用new的方式。
不能既用new的方式,又用注入的方式,这种情况,会出现注入失败的问题;
--------------------------------------------------
我们本次项目使用的是加载配置文件的方式去发送消息的,原本是写在基类是去加载配置文件,后来发现每次调用都要去加载,特别耗时;
后来我写了一个applicationUtil类,使用了单例模式,去加载xml配置文件,这样就不用每次调用都去加载配置文件了。
- package com.wxy.util;
-
- import org.springframework.context.support.AbstractApplicationContext;
- import org.springframework.context.support.ClassPathXmlApplicationContext;
- /**
- * application-mq.xml实例化工具
- * @author wangxinyang
- * @date 2019年5月22日
- * @version 1.0.0
- *
- */
- final public class ApplicationContextMqUtil {
-
- // private static AbstractApplicationContext applicationContext = null;
-
-
- private ApplicationContextMqUtil() {}
-
- private static class SingletonInstance {
- private static final AbstractApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/application-mq.xml");
- }
-
- public static AbstractApplicationContext getInstance() {
- return SingletonInstance.applicationContext;
- }
- }
接下来是消费者实现:
- package com.mq.consumer;
-
- import java.io.UnsupportedEncodingException;
-
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
- import org.springframework.stereotype.Service;
-
- import com.rabbitmq.client.Channel;
-
- /**
- * mq消费者
- * @author wangxinyang
- * @date 2019年4月18日
- * @version 1.0.0
- *
- */
- @Service
- public class MsgConsumer implements ChannelAwareMessageListener {
-
- private final Logger logger = LoggerFactory.getLogger(MsgConsumer.class);
-
-
- @Override
- public void onMessage(Message message, Channel channel) throws Exception {
- String context = "";
- try {
- context = new String(message.getBody(), "utf-8");
- } catch (UnsupportedEncodingException e) {
- e.printStackTrace();
- }
- logger.info("message:" + message);
- logger.info("接收处理当前监听队列当中的消息: " + context + "\n 当前线程name:" + Thread.currentThread().getName() + "\n 当前线程id:"
- + Thread.currentThread().getId());
-
-
- // 消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
- // nack返回false,并重新回到队列,就是重发机制,通常存在于消费失败后处理中;
- //第三个参数与拒绝消息方法的第二个参数同理。即true重新进入队列,false则丢弃;
- // channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
- // 拒绝消息,即丢弃消息,消息不会重新回到队列,后面的参数为true则重入队列;为false则丢弃;
- // channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
-
- }
-
- }
接下来分享一下使用过程中遇到的问题:
由于使用的是spring 的amqpTelplate,还有一个是rabbitTemplate,这两个的区别不大,rabbitTemplate是也是遵循amqp协议的;
当消费者在消费消息时,在收到消息去做业务处理时,抛出error异常时,
注意:error异常默认是没有捕获的,我们一般写的try...cath块,都是捕获的Exception异常;
这里可以了解一下exception和error的区别;(自行百度)
会走这里:org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.class ,1199行,
这里捕获到的error异常,logger.error("Consumer thread error, thread abort.", e);
到此,ack不会给服务器返回信息,服务器默认就是unacked,在此之后;
上面给aborted赋值为true;会走下面这个if;
然后stop();
这时候其实服务还在运行,但是消费者停止掉了(这里不是很确定,是当前消费者线程停止,还是整个消费者停止(即假死状态)),再有消息进入队列中,消费者是不会去正常消费消息的;
到这里我之前用的方法是重启消费者;
--------------------------------------------------------------------
还有一种情况,就是当我们设置手动ack的时候,消费消息,如果消费失败,没有重发和拒绝机制,也没有走到ack,这时候,消息其实还是存在于队列中的,只是我们在观礼台获取不到这条消息,这条消息也没有重新进入到队列。
这种情况,同样是需要重启消费者才能去重新消费这条消息。
所以我们在手动确认消息的情况下,在catch中需要处理一下消费失败的消息,拒绝或者是发送到死信队列中,然后手动处理。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。