赞
踩
2017年以前采用的spring xml配置的方式,这里先回忆一下,现在采用yaml方式,
<!--rabbitmq依赖 -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
#rabbitmq
rabbitmq.addresses=192.168.5.130:5672,192.168.5.132:5672
rabbitmq.username=admin
rabbitmq.password=admin2015
rabbitmq.concurrentConsumers=5
rabbitmq.channel.cache.size=50
#
rabbitmq.direct.exchange=mal-ms.direct.exchange
rabbitmq.consumer.direct.exchange=eps.direct.exchange
rabbitmq.ccs.direct.exchange=ccs.direct.exchange
<?xml version="1.0" encoding="UTF-8" ?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.5.xsd" default-lazy-init="false"> <description>Spring Rabbit 生成者配置</description> <rabbit:connection-factory id="connectionFactory" addresses="${rabbitmq.addresses}" username="${rabbitmq.username}" password="${rabbitmq.password}" channel-cache-size="${rabbitmq.channel.cache.size}" /> <rabbit:admin connection-factory="connectionFactory" /> <!-- 消费者配置开始 --> <rabbit:annotation-driven /> <bean id="rabbitListenerContainerFactory" class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory"> <property name="connectionFactory" ref="connectionFactory" /> <property name="concurrentConsumers" value="3" /> <property name="maxConcurrentConsumers" value="10" /> </bean> <!-- 消费者配置开结束--> <!-- 生产者配置开始 --> <bean id="messageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate"> <property name="backOffPolicy"> <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy"> <property name="initialInterval" value="500" /> <property name="multiplier" value="10.0" /> <property name="maxInterval" value="10000" /> </bean> </property> </bean> <rabbit:template id="template" message-converter="messageConverter" connection-factory="connectionFactory" reply-timeout="2000" retry-template="retryTemplate" exchange="${rabbitmq.direct.exchange}" /> <!-- 将queue和routingKey进行绑定 --> <rabbit:queue name="ms.queue.mal001" /> <rabbit:queue name="ms.queue.fab001" /> <!-- direct方式:根据routingKey将消息发送到所有绑定的queue中 --> <rabbit:direct-exchange name="${rabbitmq.direct.exchange}"> <rabbit:bindings> <rabbit:binding queue="ms.queue.mal001" key="ms.binding.mal001" /> <rabbit:binding queue="ms.queue.fab001" key="ms.binding.fab001" /> </rabbit:bindings> </rabbit:direct-exchange> <!--消费者产生队列 --> <rabbit:queue name="eps.queue.report_status" /> <rabbit:direct-exchange name="${rabbitmq.consumer.direct.exchange}"> <rabbit:bindings> <rabbit:binding queue="eps.queue.report_status" key="eps.binding.report_status" /> </rabbit:bindings> </rabbit:direct-exchange> <!-- 生产者配置结束 --> <!-- 商城 --> <rabbit:queue name="ccs.queue.mal" /> <rabbit:queue name="ccs.queue.mal.reply" /> <rabbit:direct-exchange name="${rabbitmq.ccs.direct.exchange}"> <rabbit:bindings><!-- 商城 --> <rabbit:binding queue="ccs.queue.mal" key="ccs.binding.mal" /> <rabbit:binding queue="ccs.queue.mal.reply" key="ccs.binding.mal.reply" /> </rabbit:bindings> </rabbit:direct-exchange> </beans>
消费侧的代码通过@RabbitListener来设定
import java.util.Date; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.stereotype.Component; @Component public class CcsQueueMal { private static final Logger logger = LoggerFactory.getLogger(CcsQueueMal.class); @Autowired private MalmsService malmsService; /** * * @param id * 任务ID * @param type * 任务名称 * @return */ @RabbitListener(queues = "ccs.queue.mal") public void ccsQueueListener(String id, @Header("jobName") String jobName) { logger.info("Received request for id " + id); logger.info("Received request for job name " + jobName); logger.info("定时任务执行时间:" + new Date()); // 返回执行结果(成功,失败)和ID try { malmsService.cancelTimeOutChargeOrder();// 取消超时订单操作 } catch (Exception e) { logger.debug(e.getMessage(), e); } } }
从amqp中获取消息。
import java.io.UnsupportedEncodingException; import java.util.Date; import java.util.List; import java.util.Map; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class MsMQQueueListener { @RabbitListener(queues = "eps.queue.report_status") public void updateStatus(Message message) { String info = null; try { info = new String(message.getBody(), "utf-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } System.out.println(info); Gson gson = new Gson(); Map<String, Object> map = gson.fromJson(info, Map.class); String orderNo = map.get("orderNo").toString(); String logiNo = map.get("logiNo").toString(); String logistatus = map.get("status").toString(); } }
生产者
import org.springframework.amqp.core.AmqpTemplate;
@Autowired
AmqpTemplate template;
Map<String, String> map = new HashMap<String, String>();
map.put("orderNo", orderNo);
map.put("logiNo", epsLogiBill.getLogiNo());
map.put("status", LogiStatus.CREATED.status());
template.convertAndSend("eps.binding.report_status", map);
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。