赞
踩
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>4.0.2</version>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.amqp</groupId>
- <artifactId>spring-rabbit</artifactId>
- <version>1.7.9.RELEASE</version>
- </dependency>
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
- xsi:schemaLocation="http://www.springframework.org/schema/beans
- http://www.springframework.org/schema/beans/spring-beans.xsd
- http://www.springframework.org/schema/rabbit
- http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
-
- </beans>
-
1、配置连接
- <!--连接工厂-->
- <rabbit:connection-factory id="rabbitConnectionFactory" host="127.0.0.1" port="5672"
- username="test" password="test" virtual-host="/test"
- channel-cache-size="50" />
注:该配置还有publisher-confirms、publisher-returns等参数,用于消息确认。
2、配置admin:producer中的exchange,queue会自动的利用该admin自动在spring中生成
- <!-- 定义admin,producer中的exchange,queue会自动的利用该admin自动在spring中生成 -->
- <rabbit:admin connection-factory="rabbitConnectionFactory"/>
3、定义rabbitmq模板(消息生产者通过模板类进行推送数据)
- <!-- 定义rabbitmq模板,指定连接工厂、exchange、queue等 -->
- <rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory" exchange="XXX" queue="XXX"/>
注意:
4、配置队列
- <!-- 队列声明 :
- durable:true、false true:在服务器重启时,能够存活
- exclusive :当连接关闭后是否自动删除队列;是否私有队列,如果私有,其他通道不能访问当前队列
- autodelete:当没有任何消费者使用时,自动删除该队列 -->
- <!-- 用于发布/订阅模式的队列 -->
- <rabbit:queue name="myFanoutQueue" durable="true" exclusive="false" auto-delete="false"/>
- <!-- 用于路由模式的队列 -->
- <rabbit:queue name="myDirectQueue" durable="true" exclusive="false" auto-delete="false"/>
- <!-- 用于主题模式的队列 -->
- <rabbit:queue name="myTopicQueue_error" durable="true" exclusive="false" auto-delete="false"/>
- <rabbit:queue name="myTopicQueue_warn" durable="true" exclusive="false" auto-delete="false"/>
5、设置exchange,并且配置与队列queue的关系(durable、auto-delete与队列参数同一个意思)
- <!-- 定义交互机 发布/订阅模式 -->
- <rabbit:fanout-exchange name="myFanoutExchange" durable="true" auto-delete="false">
- <rabbit:bindings>
- <rabbit:binding queue="myFanoutQueue"></rabbit:binding>
- </rabbit:bindings>
- </rabbit:fanout-exchange>
-
- <!-- 定义交互机 路由模式(需要routing-key) -->
- <rabbit:direct-exchange name="myDirectExchange" durable="true" auto-delete="false">
- <rabbit:bindings>
- <rabbit:binding queue="myDirectQueue" key="error"></rabbit:binding>
- </rabbit:bindings>
- </rabbit:direct-exchange>
-
-
- <!--定义交互机 主题模式 -->
- <rabbit:topic-exchange name="myTopicExchange" durable="true" auto-delete="false">
- <rabbit:bindings>
- <rabbit:binding queue="myTopicQueue_error" pattern="error.#" ></rabbit:binding>
- <rabbit:binding queue="myTopicQueue_error" pattern="warn.#" ></rabbit:binding>
- <rabbit:binding queue="myTopicQueue_warn" pattern="warn.*"></rabbit:binding>
- </rabbit:bindings>
- </rabbit:topic-exchange>
注意:
6、定义消息发布类
- public class SpringSender {
- public static void sendMessage(String exchange,String routingKey,Object message){
- //加载配置文件
- AbstractApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring-rabbitmq.xml");
- //获取rabbitmqTemplate模板
- RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
- //发送消息
- rabbitTemplate.convertAndSend(exchange,routingKey,message);
- }
-
- public static void main(String[] args) throws Exception{
- SpringSender.sendMessage("myTopicExchange","warn.item","主题模式,发布警告信息");
- }
- }
注意:
- /**
- * 发送消息到默认的交换机和队列(不带有自定义参数)
- * @param messageObject 消息对象
- * @return boolean 发送标记
- */
- RabbitTemplate.convertAndSend(messageObject);
-
- /**
- * 发送消息到默认的交换机和队列
- * @param messageObject 消息对象
- * @param messageObject 自定义参数,在监听器ConfirmCallback中可以取到。
- * @return boolean 发送标记
- */
- RabbitTemplate.correlationConvertAndSend(messageObject,correlationdata);
-
- /**
- * 发送消息到指定的队列
- * @param queue 队列名称
- * @param messageObject 消息对象
- * @param messageObject 自定义参数,在监听器ConfirmCallback中可以取到。
- * @return boolean 发送标记
- */
- RabbitTemplate.convertAndSend(queue, messageObject,correlationdata);
-
- /**
- * 发送消息到指定的交换机和队列
- * @param exchange 交换机名称
- * @param queue 队列名称
- * @param messageObject 自定义参数,在监听器ConfirmCallback中可以取到。
- * @return boolean 发送标记
- */
- RabbitTemplate.convertAndSend(exchange,queue,messageObject,correlationdata);
-
- /**
- * 发送消息到默认的交换机和队列(不带有自定义参数)
- Send方法还有很多,此处只列举一种
- * @param Message AMQP封装的消息对象
- * @return void
- */
- RabbitTemplate.send(Message message);
-
1、定义主题模式的两个实现类
- /**
- * 用于接收routing-key为warn或error的消息
- */
- public class TopicErrorReceiver implements ChannelAwareMessageListener{
- @Override
- public void onMessage(Message message, Channel channel) throws Exception {
- try{
- System.out.println("************************ddd********************************");
- System.out.println("主题模式 warn/error 接收信息:"+new String(message.getBody()));
- System.out.println("********************************************************");
- //设置手工应答
- channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
- }catch (Exception e){
- channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
- }
- }
- }
-
- /**
- * 用于接收routing-key为warn的消息
- */
- public class TopicWarnReceiver implements ChannelAwareMessageListener{
-
- @Override
- public void onMessage(Message message, Channel channel) throws Exception {
- try{
- System.out.println("************************ddd********************************");
- System.out.println("主题模式 warm 接收信息:"+new String(message.getBody()));
- System.out.println("********************************************************");
- //设置手工应答
- channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
- }catch (Exception e){
- channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
- }
- }
- }
注:接收类都是实现了ChannelAwareMessageListener接口,并重写了onMessage方法
2、定义监听,并绑定接收者与队列的关系(即接收者监听哪些队列)
- <!-- 定义监听 -->
- <rabbit:listener-container connection-factory="rabbitConnectionFactory" acknowledge="manual" >
- <!-- 发布订阅模式监听-->
- <rabbit:listener ref="directReceiver" queue-names="myFanoutQueue"/>
- <!-- 路由模式监听 -->
- <rabbit:listener ref="directReceiver" queue-names="myDirectQueue"/>
- <!-- 主题模式监听-->
- <rabbit:listener ref="topicErrorReceiver" queue-names="myTopicQueue_error"/>
- <rabbit:listener ref="topicWarnReceiver" queue-names="myTopicQueue_warn"/>
- </rabbit:listener-container>
-
- <bean id="directReceiver" class="com.chensr.until.rabbitmq.springRabbitmq.DirectReceiver"/>
- <bean id="fanoutReceiver" class="com.chensr.until.rabbitmq.springRabbitmq.FanoutReceiver"/>
- <bean id="topicErrorReceiver" class="com.chensr.until.rabbitmq.springRabbitmq.TopicErrorReceiver"/>
- <bean id="topicWarnReceiver" class="com.chensr.until.rabbitmq.springRabbitmq.TopicWarnReceiver"/>
注意:
3、测试
1、在配置工厂连接的时候,设置publisher-confirms="true"
- <!--连接工厂-->
- <rabbit:connection-factory id="rabbitConnectionFactory" host="127.0.0.1" port="5672"
- username="test" password="test" virtual-host="/test"
- channel-cache-size="50" publisher-confirms="true"/>
2、在定义rabbitmq模板时,指定confirm-callback的实现类
- <!-- 定义rabbitmq模板,指定连接工厂、exchange、queue等 -->
- <rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory" exchange="myDirectExchange"
- confirm-callback="confirmCallback" />
3、创建实现类ConfirmCallback,实现RabbitTemplate.ConfirmCallback接口
- public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {
- /**
- * CorrelationData 是在发送消息时传入回调方法的参数,可以用于区分消息对象。 CorrelationData对象中只有一个属性 String id。
- * 通过这个参数,我们可以区分当前是发送哪一条消息时的回调,并通过ack参数来进行失败重发功能
- *
- * @param correlationData 回调的相关数据.
- * @param ack true for ack, false for nack
- * @param cause 专门给NACK准备的一个可选的原因,其他情况为null。
- */
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- System.out.println("********************************************************");
- System.out.println("exChange确认"+ ack +" "+cause);
- System.out.println("********************************************************");
- }
- }
4、在配置文件中定义confirmCallback
<bean id="confirmCallback" class="com.chensr.until.rabbitmq.springRabbitmq.ConfirmCallback"/>
5、测试:执行SpringSender.sendMessage("myTopicExchange","error","queue message fanout" );,控制台显示
********************************************************
exChange确认true null
********************************************************
注意:
1、在定义rabbitmq模板时,指定return-callback的实现类,并且设置mandatory="true"
- <!-- 定义rabbitmq模板,指定连接工厂、exchange、queue等 -->
- <rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory"
- confirm-callback="confirmCallback" return-callback="returnCallback" mandatory="true"/>
注:mandatory为true表示推送消息到queue失败时调用return-callback
2、创建实现类ReturnCallback,实现RabbitTemplate.ReturnCallback接口
- public class ReturnCallback implements RabbitTemplate.ReturnCallback {
- @Override
- public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
- System.out.println("********************************************************");
- System.out.println("失败确认:"+message+" | "+replyCode+" | "+replyText+" | "+exchange+" | "+routingKey);
- System.out.println("********************************************************");
- }
- }
3、在配置文件中定义returnedMessage
<bean id="returnCallback" class="com.chensr.until.rabbitmq.springRabbitmq.ReturnCallback"/>
4、测试:执行SpringSender.sendMessage("myTopicExchange","error123","queue message" );,由于routing-key没匹配到对应的队列,所以控制台打印报错信息
********************************************************
失败确认:(Body:'queue message' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=null, receivedExchange=null, receivedRoutingKey=null, receivedDelay=null, deliveryTag=0, messageCount=null, consumerTag=null, consumerQueue=null]) | 312 | NO_ROUTE | myTopicExchange | error123
********************************************************
注意:
1、添加maven依赖
- <dependency>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-mapper-asl</artifactId>
- <version>1.9.13</version>
- </dependency>
2、在定义rabbitmq模板时,指定转换器message-converter="jsonMessageConverter"
- <!-- 定义rabbitmq模板,指定连接工厂、exchange、queue等 -->
- <rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory" message-converter="jsonMessageConverter"
- confirm-callback="confirmCallback" return-callback="returnCallback" mandatory="true"/>
3、配置bean(也可以重写)
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter"></bean>
4、测试:(将map转成json格式)
- public class SpringSender {
- public static void sendMessage(String exchange,String routingKey,Object message){
- //加载配置文件
- AbstractApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring-rabbitmq.xml");
- //获取rabbitmqTemplate模板
- RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
- //发送消息
- rabbitTemplate.convertAndSend(exchange,routingKey,message);
- }
-
- public static void main(String[] args) throws Exception{
- HashMap<String ,Object> map = new HashMap<String, Object>();
- map.put("message","queue message fanout");
- SpringSender.sendMessage("myFanoutExchange","myFanoutQueue",map);
- }
- }
输出
********************************************************
发布/订阅 接收信息:{"message":"queue message fanout"}
********************************************************
注:如果发生的消息是字符串,接收到的信息为字符串
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
- xsi:schemaLocation="http://www.springframework.org/schema/beans
- http://www.springframework.org/schema/beans/spring-beans.xsd
- http://www.springframework.org/schema/rabbit
- http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
-
- <!-- 引入rabbitmq配置文件 -->
- <bean id="configProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
- <property name="locations">
- <list>
- <value>classpath:conf/rabbitmq.properties</value>
- </list>
- </property>
- </bean>
-
- <!--连接工厂-->
- <rabbit:connection-factory id="rabbitConnectionFactory" host="127.0.0.1" port="5672"
- username="test" password="test" virtual-host="/test"
- channel-cache-size="50" publisher-confirms="true"/>
-
- <!-- 定义admin,producer中的exchange,queue会自动的利用该admin自动在spring中生成 -->
- <rabbit:admin connection-factory="rabbitConnectionFactory"/>
-
- <!-- 定义rabbitmq模板,指定连接工厂、exchange、queue等 -->
- <rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory" message-converter="jsonMessageConverter"
- confirm-callback="confirmCallback" return-callback="returnCallback" mandatory="true"/>
-
-
- <!-- 队列声明 :
- durable:true、false true:在服务器重启时,能够存活
- exclusive :当连接关闭后是否自动删除队列;是否私有队列,如果私有,其他通道不能访问当前队列
- autodelete:当没有任何消费者使用时,自动删除该队列 -->
- <!-- 用于发布/订阅模式的队列 -->
- <rabbit:queue name="myFanoutQueue" durable="true" exclusive="false" auto-delete="false"/>
- <!-- 用于路由模式的队列 -->
- <rabbit:queue name="myDirectQueue" durable="true" exclusive="false" auto-delete="false"/>
- <!-- 用于主题模式的队列 -->
- <rabbit:queue name="myTopicQueue_error" durable="true" exclusive="false" auto-delete="false"/>
- <rabbit:queue name="myTopicQueue_warn" durable="true" exclusive="false" auto-delete="false"/>
-
-
- <!-- 定义交互机 发布/订阅模式 -->
- <rabbit:fanout-exchange name="myFanoutExchange" durable="true" auto-delete="false">
- <rabbit:bindings>
- <rabbit:binding queue="myFanoutQueue"></rabbit:binding>
- </rabbit:bindings>
- </rabbit:fanout-exchange>
-
- <!-- 定义交互机 路由模式(需要routing-key) -->
- <rabbit:direct-exchange name="myDirectExchange" durable="true" auto-delete="false">
- <rabbit:bindings>
- <rabbit:binding queue="myDirectQueue" key="error"></rabbit:binding>
- </rabbit:bindings>
- </rabbit:direct-exchange>
-
-
- <!-- 定义交互机 主题模式 ;-->
- <rabbit:topic-exchange name="myTopicExchange" durable="true" auto-delete="false">
- <rabbit:bindings>
- <rabbit:binding queue="myTopicQueue_error" pattern="error.#" ></rabbit:binding>
- <rabbit:binding queue="myTopicQueue_error" pattern="warn.#" ></rabbit:binding>
- <rabbit:binding queue="myTopicQueue_warn" pattern="warn.*"></rabbit:binding>
- </rabbit:bindings>
- </rabbit:topic-exchange>
-
-
-
- <!-- 定义监听 -->
- <rabbit:listener-container connection-factory="rabbitConnectionFactory" acknowledge="manual" >
- <!-- 发布订阅模式监听-->
- <rabbit:listener ref="fanoutReceiver" queue-names="myFanoutQueue" />
- <!-- 路由模式监听 -->
- <rabbit:listener ref="directReceiver" queue-names="myDirectQueue"/>
- <!-- 主题模式监听-->
- <rabbit:listener ref="topicErrorReceiver" queue-names="myTopicQueue_error"/>
- <rabbit:listener ref="topicWarnReceiver" queue-names="myTopicQueue_warn"/>
- </rabbit:listener-container>
-
- <!--接收者实现类-->
- <bean id="directReceiver" class="com.chensr.until.rabbitmq.springRabbitmq.DirectReceiver"/>
- <bean id="fanoutReceiver" class="com.chensr.until.rabbitmq.springRabbitmq.FanoutReceiver"/>
- <bean id="topicErrorReceiver" class="com.chensr.until.rabbitmq.springRabbitmq.TopicErrorReceiver"/>
- <bean id="topicWarnReceiver" class="com.chensr.until.rabbitmq.springRabbitmq.TopicWarnReceiver"/>
-
- <!--confirmCallback回调-->
- <bean id="confirmCallback" class="com.chensr.until.rabbitmq.springRabbitmq.ConfirmCallback"/>
- <!--returnCallback回调-->
- <bean id="returnCallback" class="com.chensr.until.rabbitmq.springRabbitmq.ReturnCallback"/>
- <!--消息转换器,转成json格式-->
- <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter"></bean>
- </beans>
注:由于接收者都是继承ChannelAwareMessageListener,实现onMessage方法,所以这里不提供相应的代码
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。