赞
踩
通过前面的学习,可以发现,消息都是通过交换器发送至队列的,一条消息只能被一个消费者处理,实际开发中还会有一种情况,就是一条消息需要被多个消费者处理,就是广播的形式;广播的模式需要使用到 FanoutExchange (散列交换器),FanoutExchange 会将消息发送至每一个与之绑定的队列中
代码主体没有太大的改动,增加了 FanoutExchange,并且将队列绑定至 FanoutExchange
截图中被标记的部分如下
@Configuration public class MQTopicConfig { @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}") private int port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.publisher-returns}") private boolean publisherReturns; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherReturns(publisherReturns); connectionFactory.setPublisherConfirms(true); return connectionFactory; } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) //必须是prototype类型 public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); return template; } @Bean public Queue coreQueue() { return new Queue(Constant.HOSH_TOPIC); } @Bean public Queue subCoreQueue() { return new Queue(Constant.HOSH_TOPIC_NEW); } @Bean public TopicExchange coreTopicExchange() { return new TopicExchange(Constant.HOSH_TOPIC_EXC); } @Bean FanoutExchange fanoutExchange() { // 广播交换器 return new FanoutExchange(Constant.HOSH_BROADCAST_EXC); } @Bean public Binding bindingFanoutExchange() { // coreQueue 队列绑定至广播交换器 return BindingBuilder.bind(coreQueue()).to(fanoutExchange()); } @Bean public Binding bindingSubFanoutExchange() { // subCoreQueue 队列绑定至广播交换器 return BindingBuilder.bind(subCoreQueue()).to(fanoutExchange()); } @Bean public Binding bindingCoreExchange() { return BindingBuilder.bind(coreQueue()).to(coreTopicExchange()).with(Constant.HOSH_TOPIC); } @Bean public Binding bindingSubCoreExchange() { return BindingBuilder.bind(subCoreQueue()).to(coreTopicExchange()).with(Constant.HOSH_TOPIC_NEW); } }
把消息发送至散列交换器 FanoutExchange
@Component public class HoshMQSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{ private RabbitTemplate rabbitTemplate; @Autowired public HoshMQSender(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; this.rabbitTemplate.setConfirmCallback(this); this.rabbitTemplate.setReturnCallback(this); } public void send(String str) { sendMessageWithAck(str); } private void sendMessageWithAck(String str) { // 消息内容 byte[] toSendBytes = str.getBytes(); MessageProperties messageProperties = new MessageProperties(); messageProperties.setMessageId(String.valueOf(System.currentTimeMillis())); Message msg = new Message(toSendBytes, messageProperties); CorrelationData correlationData = new CorrelationData(); correlationData.setId(UUID.randomUUID().toString()); // rabbitTemplate.convertAndSend(Constant.HOSH_TOPIC_EXC, Constant.HOSH_TOPIC, msg, correlationData); rabbitTemplate.convertAndSend(Constant.HOSH_BROADCAST_EXC, "", msg, correlationData); } // 队列内容发送到 MQ 的确认 @Override public void confirm(CorrelationData correlationData, boolean b, String s) { // if (b) { // System.out.println("recv HoshMQSender confirm id=" + correlationData.getId()); // } else { // System.out.println("not recv " + s); // } } /** * exchange 到达 queue, 则 returnedMessage 不回调 * exchange 到达 queue 失败, 则 returnedMessage 回调 * 需要设置spring.rabbitmq.publisher-returns=true * @param message * @param replyCode * @param replyText * @param exchange * @param routingKey */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("sender return success" + message.toString() +"\n replyCode "+replyCode+"\n replyText "+replyText +"\n exchange "+exchange + "\n routingKey "+routingKey); } }
为了实现广播的效果,需要两个消费者
1、消费者1
@Component
public class HoshMqReceiver {
@RabbitHandler
@RabbitListener(queues = Constant.HOSH_TOPIC_NEW)
public void onReceiver(String info, Channel channel, Message msg) throws IOException {
System.out.println("HoshMqReceiver msg " + msg);
System.out.println("HoshMqReceiver info " + info);
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
}
}
2、消费者2
@Component public class HoshMqReceiver2 { @RabbitHandler @RabbitListener(queues = Constant.HOSH_TOPIC) public void onReceiver(String info, Channel channel, Message msg) { try { // 开启手动应答 ack 以后,只有当程序明确回复,数据已经被处理, // 对应数据才会被 RabbitMQ server 清除,否则保留在 RabbitMQ server 上 channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false); System.out.println("HoshMqReceiver2 msg " + JSON.toJSONString(msg)); System.out.println("HoshMqReceiver2 info " + info); } catch (IOException e) { e.printStackTrace(); } } }
发送消息测试一下,先看看测试结果
可以发现,在使用 FanoutExchange 后,一条消息会发送至所有与其绑定的队列中,而后,监听了对应队列的消费者就可以获取到同一条消息
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。