赞
踩
- rabbitmq:
- username: admin
- password: admin
- #单机配置
- host: IP地址
- port: 端口
- # 集群配置 addresses: ip:端口,ip:端口,ip:端口
- publisher-confirms: true
- publisher-returns: true # 消息发送到交换机确认机制,是否返回回调
- virtual-host: /
- listener:
- simple:
- retry:
- enabled: true
- initial-interval: 6000
- acknowledge-mode: manual #采用手动应答
- @SpringBootConfiguration
- @ComponentScan(basePackages = {"扫描包路径"})
- public class RabbitMQConfig {
- @Bean
- public Queue quorumQueue() {
- return QueueBuilder
- .durable("quorumqueue") // 持久化
- .quorum() // 仲裁队列
- .build();
- }
- }
- @Autowired
- private AmqpTemplate amqpTemplate;
- public R saveCalculatemq(@RequestBody JSONObject data){
- Map<String,Object> send = new HashMap<>(16);
- send.put("data",data);
- String message = JSON.toJSONString(send);
- //发送到消息队列
- amqpTemplate.convertAndSend("quorumqueue",message);
- return R.ok();
- }
- @Component
- @Async
- public class QueueListner {
- @RabbitListener(queues={"quorumqueue"})
- public synchronized void receiveMessage(Message message, Channel channel){
- log.debug("消息接收到了:"+new String(message.getBody()));
- }
- }
- package com.bonc.calculate.config;
-
- import org.springframework.amqp.core.*;
- import org.springframework.boot.SpringBootConfiguration;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.ComponentScan;
-
- @SpringBootConfiguration
- @ComponentScan(basePackages = {"需要扫描的包"})
- public class FanoutConfig {
-
- //声明FanoutExchange交换机
- @Bean
- public FanoutExchange fanoutExchange() {
- return new FanoutExchange("fanout.exchange");//交换机名称
- }
- //声明第1个队列
- @Bean
- public Queue fanoutQueue1(){
- return QueueBuilder
- .durable("fanout.queue1") // 持久化
- .quorum() // 仲裁队列
- .build();
- }
- //绑定队列1和交换机
- @Bean
- public Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){
- return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
- }
- //声明第2个队列
- @Bean
- public Queue fanoutQueue2(){
- return QueueBuilder
- .durable("fanout.queue2") // 持久化
- .quorum() // 仲裁队列
- .build();
- }
- //绑定队列2和交换机
- @Bean
- public Binding bindingQueue2(Queue fanoutQueue2,FanoutExchange fanoutExchange){
- return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
- }
- }
amqpTemplate.convertAndSend(交换机名称,"",发送的消息);
- package com.bonc.calculate.listener;
-
- import com.rabbitmq.client.Channel;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.scheduling.annotation.Async;
- import org.springframework.stereotype.Component;
- import java.util.Map;
-
-
- @Component
- @Async
- public class QueueListner {
-
- protected static final Logger log = LoggerFactory.getLogger(QueueBoncListner.class);
-
- @RabbitListener(queues={"fanout.queue1"})
- public synchronized void receiveMessage(Message message, Channel channel){
- log.debug("fanout.queue1消息接收到了:"+new String(message.getBody()));
- }
-
-
- @RabbitListener(queues={"fanout.queue2"})
- public synchronized void receiveMessage2(Message message, Channel channel){
- log.debug("fanout.queue2消息接收到了:"+new String(message.getBody()));
- }
-
- //页面配置好 也可以直接接收消息
- @RabbitListener(queues={"fanout_queue3"})
- public synchronized void receiveMessage3(Message message, Channel channel){
- log.debug("fanout.queue3消息接收到了:"+new String(message.getBody()));
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。