赞
踩
使用springboot整合rabbitmq实现,一个生产者生产一条数据,多个消费者消费同一条数据案例,可以解决微服务分布式事务控制。保证最终一致性原则
相对简易得一种消息中间件,功能强大,性能中等,吞吐量一般。
引入库
<!-- rabbitmq消息队列-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
rabbitmq相关配置
@Configuration public class RabbitmqConfig { @Bean public Queue queue1() { return new Queue("queue1",true); } @Bean public FanoutExchange exchange1() { return new FanoutExchange("exchange1",true, false); } @Bean public Binding binding1() { return BindingBuilder.bind(queue1()).to(exchange1()); } }
因为是fanout广播模式,所以不用配置路由键
生产者代码
@Resource
private RabbitTemplate rabbitTemplate;
@Test
public void pushMessage() {
HashMap<Object, Object> hashMap = new HashMap<>();
hashMap.put("name","zhangsan");
hashMap.put("age","18");
rabbitTemplate.convertAndSend("exchange1","", hashMap);
}
多个消费者,消费同一条数据
消费者1号
@Component public class RabbitMqConsumer { // @RabbitListener(queues = "queue1") @RabbitListener(bindings = @QueueBinding( // value = @Queue(value = "queue1"), value = @Queue(), //切记: 此处无需设置队列名称,否在得话,多个消费者只有一个消费者能消费数据。其它消费者无法消费数据。 exchange = @Exchange(value = "exchange1",type = ExchangeTypes.FANOUT) )) public void getData(Message message) { try { String str = new String(message.getBody(),"utf-8"); System.out.println(str); } catch (Exception e) { e.printStackTrace(); } } }
消费者2号
@Component public class RabbitMqConsumer { // @RabbitListener(queues = "queue1") @RabbitListener(bindings = @QueueBinding( // value = @Queue(value = "queue1"), value = @Queue(), //切记: 此处无需设置队列名称,否在得话,多个消费者只有一个消费者能消费数据。其它消费者无法消费数据。 exchange = @Exchange(value = "exchange1",type = ExchangeTypes.FANOUT) )) public void getData(Message message) { try { String str = new String(message.getBody(),"utf-8"); System.out.println(str); } catch (Exception e) { e.printStackTrace(); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。