赞
踩
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring.application.name=springboot-rabbitmq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#发送方确认
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.virtual-host=/
配置失败通知和发送方确认
@Configuration public class RabbitConfig { @Value("${spring.rabbitmq.host}") private String addresses; @Value("${spring.rabbitmq.port}") private String port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.virtual-host}") private String virtualHost; @Value("${spring.rabbitmq.publisher-confirms}") private boolean publisherConfirms; @Autowired private UserReceiver userReceiver; //TODO 连接工厂 @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses(addresses+":"+port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); //发送方确认 connectionFactory.setPublisherConfirms(publisherConfirms); return connectionFactory; } //TODO rabbitAdmin类封装对RabbitMQ的管理操作 @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){ return new RabbitAdmin(connectionFactory); } //TODO 使用Template @Bean public RabbitTemplate newRabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); //TODO 失败通知 template.setMandatory(true); //TODO 失败回调 template.setReturnCallback(returnCallback()); //TODO 发送方确认 template.setConfirmCallback(confirmCallback()); return template; } //===============使用了RabbitMQ系统缺省的交换器(direct交换器)========== //TODO 申明队列(最简单的方式) @Bean public Queue directQueue() { return new Queue("direct.queue"); } //===============topic交换器========== @Bean public Queue topicQueue() { return new Queue("topic.queue"); } //TODO 申明交换器 @Bean public TopicExchange topicExchange() { return new TopicExchange("topic.exchange"); } //TODO 绑定关系 @Bean public Binding topicBinding() { return BindingBuilder .bind(topicQueue()) .to(topicExchange() ) //路由键 .with("topic.*"); } //===============fanout交换器========== //TODO 申明队列 @Bean public Queue fanoutQueue() { return new Queue("fanout.queue"); } //TODO 申明交换器 @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanout.exchange"); } //TODO 绑定关系 @Bean Binding bindingExchangeA(Queue fanoutQueue,FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue).to(fanoutExchange); } //===============消费者确认========== @Bean public SimpleMessageListenerContainer messageContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory()); //TODO 绑定了这个directQueue队列 container.setQueues(directQueue()); //TODO 手动提交 container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //TODO 消费确认方法 container.setMessageListener(userReceiver); return container; } //===============失败通知=============== public RabbitTemplate.ReturnCallback returnCallback(){ return new RabbitTemplate.ReturnCallback(){ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("无法路由的消息,需要考虑另外处理。"); System.out.println("Returned replyText:"+replyText); System.out.println("Returned exchange:"+exchange); System.out.println("Returned routingKey:"+routingKey); String msgJson = new String(message.getBody()); System.out.println("Returned Message:"+msgJson); } }; } //===============发送方确认=============== public RabbitTemplate.ConfirmCallback confirmCallback(){ return new RabbitTemplate.ConfirmCallback(){ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { System.out.println("发送者确认发送给mq成功"); } else { //处理失败的消息 System.out.println("发送者发送给mq失败,考虑重发:"+cause); } } }; } }
@Component public class UserReceiver implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { try { String msg = new String(message.getBody()); System.out.println("UserReceiver>>>>>>>接收到消息:"+msg); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); System.out.println("UserReceiver>>>>>>消息已消费"); } catch (Exception e) { System.out.println(e.getMessage()); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true); System.out.println("UserReceiver>>>>>>拒绝消息,要求Mq重新派发"); throw e; } } catch (Exception e) { System.out.println(e.getMessage()); } } }
@Component
public class DefaultSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String msg) {
this.rabbitTemplate.convertAndSend("directQueue", msg);
}
}
需要手动应答的,在前面配置类已经配置完成了。
不需要的,去掉配置类里消费者确认的代码,
@Component
@RabbitListener(queues = "directQueue")
public class DirectReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println(msg);
}
}
或者
@Component
public class DirectReceiver {
@RabbitListener(queues = "directQueue")
public void process(String msg) {
System.out.println(msg);
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。