赞
踩
- <!-- spring boot 整合rabbit MQ -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- amqp:
- queue: DIRECT_QUEUE
- exchange: DIRECT_EXCHANGE
- routingkey: DIRECT_QUEUE_KEY
- package com.xx.mq.config;
-
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.DirectExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class ApiRabbitMqConfig {
-
- @Value("${amqp.queue}")
- private String queue;
-
- @Value("${amqp.exchange}")
- private String exchange;
-
- @Value("${amqp.routingkey}")
- private String routingKey;
-
- /**
- * 创建队列
- *
- * @return
- */
- @Bean
- public Queue getApiQueue() {
- /*
- * name(必填): 创建消息队列 队列名称:CHANNEL_API_QUEUE
- * durable: 是否持久化,默认false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
- * exclusive: 默认false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
- * autoDelete: 是否自动删除,默认false,当没有生产者或者消费者使用此队列,该队列会自动删除。
- */
- return new Queue(queue, true);
- }
-
- /**
- * 创建直流交换器
- *
- * @return
- */
- @Bean
- public DirectExchange getApiDirectExchange() {
- /*
- * name(必填): 交换器名称
- * durable: 是否持久化,默认true
- * autoDelete: 是否自动删除,默认false
- */
- return new DirectExchange(exchange, true, false);
- }
-
- /**
- * 交换器绑定队列
- *
- * @return
- */
- @Bean
- public Binding bindingQueueToExchange() {
- /*
- * 将队列绑定到交换器上, 并设置路由键
- */
- return BindingBuilder.bind(getApiQueue()).to(getApiDirectExchange()).with(routingKey);
- }
- }
- /**
- * 消息生产者
- */
- @Component
- @Slf4j
- public class Producer {
- @Value("${amqp.exchange}")
- private String exchange;
-
- @Value("${amqp.routingkey}")
- private String routingKey;
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- public void sendMessage(Message message) {
- log.info("MQ异步推送exchange:{},routingKey:{},message:{}", exchange, routingKey, message);
- rabbitTemplate.convertAndSend(exchange, routingKey, message);
- }
- }
- @Component
- @Slf4j
- public class APIChannelListener {
- @Autowired
- private MessageConverter messageConverter;
- @Autowired
- private APIChannelService apiChannelService;
-
- /**
- * 监听API服务进行投保单保存并转核保
- * @param message 消息
- */
- @RabbitListener(queues = "${amqp.queue}")
- public void proposalSaveAndSubmit(Message message) {
- try {
- ProposalVo proposalVo = (ProposalVo) messageConverter.fromMessage(message);
- log.info("消费消息: {}", JSONObject.toJSONString(proposalVo));
- // 业务
- ResBean resBean = apiChannelService.addUser(User, true);
- log.info("响应结果: {}", JSONObject.toJSONString(resBean));
- } catch (Exception e) {
- log.info("错误信息:{}", e.getMessage());
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。