当前位置:   article > 正文

spring中使用rabbitmq

spring中使用rabbitmq

2017年以前采用的spring xml配置的方式,这里先回忆一下,现在采用yaml方式,

<!--rabbitmq依赖 -->
<dependency>
	<groupId>org.springframework.amqp</groupId>
	<artifactId>spring-rabbit</artifactId>
	<version>2.0.1.RELEASE</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
#rabbitmq
rabbitmq.addresses=192.168.5.130:5672,192.168.5.132:5672
rabbitmq.username=admin
rabbitmq.password=admin2015
rabbitmq.concurrentConsumers=5  
rabbitmq.channel.cache.size=50
#
rabbitmq.direct.exchange=mal-ms.direct.exchange
rabbitmq.consumer.direct.exchange=eps.direct.exchange
rabbitmq.ccs.direct.exchange=ccs.direct.exchange
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
	xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	xsi:schemaLocation="
	http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
	http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
	http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.5.xsd"
	default-lazy-init="false">
	<description>Spring Rabbit 生成者配置</description>

	<rabbit:connection-factory id="connectionFactory"
		addresses="${rabbitmq.addresses}" username="${rabbitmq.username}"
		password="${rabbitmq.password}" channel-cache-size="${rabbitmq.channel.cache.size}" />

	<rabbit:admin connection-factory="connectionFactory" />
	
	<!-- 消费者配置开始 -->
	<rabbit:annotation-driven />

	<bean id="rabbitListenerContainerFactory"
		class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
		<property name="connectionFactory" ref="connectionFactory" />
		<property name="concurrentConsumers" value="3" />
		<property name="maxConcurrentConsumers" value="10" />
	</bean>
	<!-- 消费者配置开结束-->

	<!-- 生产者配置开始 -->
	<bean id="messageConverter"
		class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />

	<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
		<property name="backOffPolicy">
			<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
				<property name="initialInterval" value="500" />
				<property name="multiplier" value="10.0" />
				<property name="maxInterval" value="10000" />
			</bean>
		</property>
	</bean>

	<rabbit:template id="template" message-converter="messageConverter"
		connection-factory="connectionFactory" reply-timeout="2000" retry-template="retryTemplate" 
		exchange="${rabbitmq.direct.exchange}"  />

	<!-- 将queue和routingKey进行绑定 -->
	<rabbit:queue name="ms.queue.mal001" />
	<rabbit:queue name="ms.queue.fab001" />
	<!-- direct方式:根据routingKey将消息发送到所有绑定的queue中 -->
	<rabbit:direct-exchange name="${rabbitmq.direct.exchange}">
		<rabbit:bindings>
			<rabbit:binding queue="ms.queue.mal001" key="ms.binding.mal001" />
			<rabbit:binding queue="ms.queue.fab001" key="ms.binding.fab001" />
		</rabbit:bindings>
	</rabbit:direct-exchange>
	
	<!--消费者产生队列 -->
	<rabbit:queue name="eps.queue.report_status" />
	<rabbit:direct-exchange name="${rabbitmq.consumer.direct.exchange}">
		<rabbit:bindings>
			<rabbit:binding queue="eps.queue.report_status" key="eps.binding.report_status" />
		</rabbit:bindings>
	</rabbit:direct-exchange>
	<!-- 生产者配置结束 -->

	<!-- 商城 -->
	<rabbit:queue name="ccs.queue.mal" />
	<rabbit:queue name="ccs.queue.mal.reply" />
	<rabbit:direct-exchange name="${rabbitmq.ccs.direct.exchange}">
		<rabbit:bindings><!-- 商城 -->
			<rabbit:binding queue="ccs.queue.mal" key="ccs.binding.mal" />
			<rabbit:binding queue="ccs.queue.mal.reply" key="ccs.binding.mal.reply" />
		</rabbit:bindings>
	</rabbit:direct-exchange>
</beans>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76

消费侧的代码通过@RabbitListener来设定

import java.util.Date;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

@Component
public class CcsQueueMal {

	private static final Logger logger = LoggerFactory.getLogger(CcsQueueMal.class);
	@Autowired
	private MalmsService malmsService;

	/**
	 * 
	 * @param id
	 *            任务ID
	 * @param type
	 *            任务名称
	 * @return
	 */
	@RabbitListener(queues = "ccs.queue.mal")
	public void ccsQueueListener(String id, @Header("jobName") String jobName) {
		logger.info("Received request for id " + id);
		logger.info("Received request for job name " + jobName);
		logger.info("定时任务执行时间:" + new Date());
		// 返回执行结果(成功,失败)和ID
		try {
			malmsService.cancelTimeOutChargeOrder();// 取消超时订单操作
		} catch (Exception e) {
			logger.debug(e.getMessage(), e);
		}
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

从amqp中获取消息。

import java.io.UnsupportedEncodingException;
import java.util.Date;
import java.util.List;
import java.util.Map;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MsMQQueueListener {
@RabbitListener(queues = "eps.queue.report_status")
	public void updateStatus(Message message) {
		String info = null;
		try {
			info = new String(message.getBody(), "utf-8");
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
		}
		System.out.println(info);
		Gson gson = new Gson();
		Map<String, Object> map = gson.fromJson(info, Map.class);
		String orderNo = map.get("orderNo").toString();
		String logiNo = map.get("logiNo").toString();
		String logistatus = map.get("status").toString();
		}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

生产者

	import org.springframework.amqp.core.AmqpTemplate;
	@Autowired
	AmqpTemplate template;
	Map<String, String> map = new HashMap<String, String>();
	map.put("orderNo", orderNo);
	map.put("logiNo", epsLogiBill.getLogiNo());
	map.put("status", LogiStatus.CREATED.status());
	template.convertAndSend("eps.binding.report_status", map);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/956413
推荐阅读
相关标签
  

闽ICP备14008679号