赞
踩
<!--spring整合rabbitmq-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
注:maven方式,这一个依赖即可,如果是非maven项目,需要引入5个jar如下:
推荐使用mavne方式,简单,非Maven项目,先用maven把以来下载本地仓库,复制到非maven的项目中即可。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-2.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <!--生产者者配置如下:--> <!-- 定义RabbitMQ的连接工厂 --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.vhost}" connection-timeout="${rabbitmq.conTimeout}" publisher-confirms="${rabbitmq.publisher-confirms}" publisher-returns="${rabbitmq.publisher-returns}"/> <!-- 管理消息队列 --> <rabbit:admin connection-factory="connectionFactory"/> <!--此处为配置文件方式 管控台配置模式需要注释 默认模式管控台 Start--> <!-- 定义一个队列或者多个队列 自动声明--> <rabbit:queue name="Queue-1" auto-declare="true" durable="true"/> <rabbit:topic-exchange name="exchange-1"> <rabbit:bindings> <!-- 可绑定多个队列,发送的时候指定key进行发送 --> <rabbit:binding queue="Queue-1" pattern="ws.tjqb"/> </rabbit:bindings> </rabbit:topic-exchange> <!--此处为配置文件方式 管控台配置模式需要注释 默认模式管控台 End--> <!-- 定义交换机 自动声明--> <rabbit:topic-exchange name="exchange-1" auto-declare="true" durable="true"/> <!-- 5. 配置消息对象json转换类 --> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> <!-- 定义MQ消息模板 1. id : 定义消息模板ID 2.connection-factory : 把定义的连接工厂放到消息模板中 3.confirm-callback : confirm确认机制 4.return-callback : return确认机制 5.mandatory :#有2种状态 设置为 true 后 消费者在消息没有被路由到合适队列情况下会被return监听,而不会自动删除; 设置为 false 后 消费者在消息没有被路由到合适队列情况下会自动删除 --> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="exchange-1" confirm-callback="confirmCallBackListener" return-callback="returnCallBackListener" mandatory="true" message-converter="jsonMessageConverter"/> </beans>
package com.gblfy.order.controller; import com.gblfy.order.pojo.FisCallingTrace; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.UUID; @RestController public class Send { public static final String EXCHANGE = "exchange-1"; @Autowired RabbitTemplate rabbitTemplate; @RequestMapping("/test") public String test() { String uuidStr = UUID.randomUUID().toString(); CorrelationData correlationId = new CorrelationData(uuidStr); // 发送消息 Map<String, String> map = new HashMap<>(); map.put("email", "550731230@qq.com"); rabbitTemplate.convertAndSend(EXCHANGE, "ws.tjqb", map, correlationId); return "success"; }
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-2.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <!--消费者配置如下:--> <!-- 定义RabbitMQ的连接工厂 --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.vhost}" connection-timeout="${rabbitmq.conTimeout}" publisher-confirms="${rabbitmq.publisher-confirms}" publisher-returns="${rabbitmq.publisher-returns}"/> <!-- 管理消息队列 --> <rabbit:admin connection-factory="connectionFactory"/> <!-- 声明多个消费者对象 --> <bean id="emailMessageListener" class="com.gblfy.order.mqhandler.EmailMessageListener"/> <!-- 监听队列 1. connectionFactory 连接工厂 2. manual 手动签收 3. ref="" 消费者监听 --> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" concurrency="${rabbitmq.concurrency}" max-concurrency="${rabbitmq.max-concurrency}"> <rabbit:listener ref="emailMessageListener" method="onMessage" queue-names="Queue-1"/> </rabbit:listener-container> </beans>
package com.gblfy.order.mqhandler; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import java.io.IOException; @Component public class EmailMessageListener implements MessageListener { private static final ObjectMapper MAPPER = new ObjectMapper(); @Override public void onMessage(Message message) { try { JsonNode jsonNode = MAPPER.readTree(message.getBody()); String email = jsonNode.get("email").asText(); System.out.println("获取队列中消息:" + email); } catch (IOException e) { e.printStackTrace(); } } }
#RabbitMQ 连接信息 #IP地址 rabbitmq.host=192.168.0.114 #端口 rabbitmq.port=5672 #用户名 rabbitmq.username=fis #密码 rabbitmq.password=ncl@1234 #虚拟主机 rabbitmq.vhost=/app/fisMQ #连接超时时间 rabbitmq.conTimeout=15000 #发送确认 对应RabbitTemplate.ConfirmCallback接口 #消息发送成功 有2个重要参数 # ack 状态为true correlationId 全局唯一ID用于标识每一支队列 rabbitmq.publisher-confirms=true #发送失败回退,对应RabbitTemplate.ReturnCallback接口 rabbitmq.publisher-returns=true #默认消费者数量 rabbitmq.concurrency=10 #最大消费者数量 rabbitmq.max-concurrency=20
目前适配的spring版本4.2.3.RELEASE
声明:配置文件不变
package com.gblfy.order.controller; import com.gblfy.order.pojo.FisCallingTrace; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.UUID; @RestController public class Send { public static final String EXCHANGE = "exchange-1"; @Autowired RabbitTemplate rabbitTemplate; @RequestMapping("/test") public String test() { FisCallingTrace f = getFisCallingTrace(); String uuidStr = UUID.randomUUID().toString(); CorrelationData correlationId = new CorrelationData(uuidStr); // 发送消息 Map<String, Object> map = new HashMap<>(); map.put("mReqXml", "请求报文"); map.put("mResXml", "响应报文"); map.put("mUUID", uuidStr); map.put("serviceName", "NYHC"); map.put("fisCallingTrace", f); rabbitTemplate.convertAndSend(EXCHANGE, "ws.tjqb", map, correlationId); return "success"; } // @RequestMapping("/test") // public String test() { // String uuidStr = UUID.randomUUID().toString(); // CorrelationData correlationId = new CorrelationData(uuidStr); // // 发送消息 // Map<String, String> map = new HashMap<>(); // map.put("email", "550731230@qq.com"); // rabbitTemplate.convertAndSend(EXCHANGE, "ws.tjqb", map, correlationId); // return "success"; // } private FisCallingTrace getFisCallingTrace() { FisCallingTrace f = new FisCallingTrace(); f.setServicename("tjqb"); f.setServicetype("1"); f.setInterfacetype("2"); f.setResstatus("1"); f.setResremark("纽约数据回传接口"); f.setReqdate(new Date()); f.setReqtime("10:00:00"); f.setResdate(new Date()); f.setRestime("10:00:00"); f.setReqxml("请求报文"); f.setResxml("响应报文"); return f; } }
package com.gblfy.order.mqhandler; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.gblfy.order.pojo.FisCallingTrace; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.stereotype.Component; import java.io.IOException; @Slf4j @Component public class EmailMessageListener implements ChannelAwareMessageListener { private static final ObjectMapper MAPPER = new ObjectMapper(); @Override public void onMessage(Message message, Channel channel) throws Exception { try { JsonNode jsonNode = MAPPER.readTree(message.getBody()); String mReqXml = jsonNode.get("mReqXml").asText(); String mResXml = jsonNode.get("mResXml").asText(); String mUUID = jsonNode.get("mUUID").asText(); String serviceName = jsonNode.get("serviceName").asText(); System.out.println("获取队列中消息:" + mReqXml); System.out.println("获取队列中消息:" + mResXml); System.out.println("获取队列中消息:" + mUUID); System.out.println("获取队列中消息:" + serviceName); JsonNode jsonNode1 = jsonNode.get("fisCallingTrace"); String jsonStr = MAPPER.writeValueAsString(jsonNode1); FisCallingTrace f= MAPPER.readValue(jsonStr , FisCallingTrace.class); System.out.println("获取队列中消息:" + f.getReqxml()); System.out.println("获取队列中消息:" + f.getResxml()); // 消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { e.printStackTrace(); } log.info("解析操作"); log.info("落库操作"); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。