赞
踩
SpringBoot整合 RabbitMQ
建springboot项目时导入springboot整合rabbitmq依赖包
spring:
rabbitmq:
port: 5672
password: root
username: root
virtual-host: /pms
host: localhost
template:
mandatory: true #没有投递到队列,强制将消息退回给生产者
publisher-confirm-type: correlated
publisher-returns: true
@Configuration public class RabbitConfg { //开启确认,返回 RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean b, String s) { System.out.println("消息的Id:" + correlationData.getId()); if(!b){ System.out.println("拒收消息的原因:" + s); } } }; RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { System.out.println("-------------------------------------"); System.out.println("消息的主体:" + new String(message.getBody())); System.out.println("错误码:" + i); System.out.println("错误的消息:" +s ); System.out.println("交换机:" +s1); System.out.println("路由Key: " +s2); System.out.println("-------------------------------------"); } }; //配置交换机 @Bean public TopicExchange topicExchange(){ return new TopicExchange("topic_exchange"); } @Bean public TopicExchange orderExchange(){ return new TopicExchange("order_exchange"); } //对象要用消息转换器(自动注入,需要导入jackson-databind包) @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory factory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(); //设置连接工厂 rabbitTemplate.setConnectionFactory(factory); //设置消息转换器 rabbitTemplate.setMessageConverter(messageConverter()); //找不到投递的队列,将消息强制退回给生产者 rabbitTemplate.setMandatory(true); //设置确认机制 rabbitTemplate.setConfirmCallback(confirmCallback); //设置返回机制 rabbitTemplate.setReturnCallback(returnCallback); return rabbitTemplate; } }
@Component
public class OrderProducer {
@Autowired
RabbitTemplate rabbitTemplate;
public void sendOrder(String exchange, String routingKey, OrderDto orderDto){
CorrelationData data = new CorrelationData(orderDto.getOrderSn());
//参数data使用来发送消息的附加参数,用来自定消息id
rabbitTemplate.convertAndSend(exchange,routingKey,orderDto,data);
}
}
spring:
rabbitmq:
port: 5672
username: root
password: root
virtual-host: /pms
host: localhost
listener:
simple:
acknowledge-mode: manual # 手动签收
消息转换器
@Configuration public class rabbitConfig implements RabbitListenerConfigurer { @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { //注册消息处理器工厂 registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory()); } //消息处理器方法工厂 @Bean public MessageHandlerMethodFactory messageHandlerMethodFactory(){ DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory(); //设置消息转换器 factory.setMessageConverter(messageConverter()); return factory; } //消息反序列化 @Bean public MappingJackson2MessageConverter messageConverter(){ return new MappingJackson2MessageConverter(); } }
@Component public class MsgConsumer { @RabbitListener(bindings = { @QueueBinding( value = @Queue(value = "msg_queue",declare = "true",durable = "true",exclusive = "false",autoDelete = "false"), exchange = @Exchange(value = "topic_exchange",durable = "true",autoDelete = "false",type = "topic",declare = "true"), key = "msg.#" ) }) @RabbitHandler//处理消息的消费者 //@Payload要处理的消息对象 public void processMsg(@Payload String msg, Channel channel, @Headers Map header){ System.out.println("消费者处理消息:" + msg); //签收消息要先获取消息的id Long tag = (Long) header.get(AmqpHeaders.DELIVERY_TAG); try { //签收 channel.basicAck(tag,false); } catch (IOException e) { e.printStackTrace(); } } }
@Component public class OrderConsumer { //队列 @Bean public Queue orderQueue(){ return new Queue("order_queue",true,false,false); } //交换机 @Bean public TopicExchange orderExchange(){ return new TopicExchange("order_exchange"); } //绑定队列到交换机 @Bean public Binding binding(){ return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("order.#"); } @RabbitListener(queues = "order_queue") @RabbitHandler public void proccessOrder(@Payload OrderDto orderDto, Channel channel, @Headers Map header){ System.out.println("消费者处理消息:" + orderDto.getOrderSn()+":" +orderDto.getCreatTime()); //签收消息 Long tag = (Long) header.get(AmqpHeaders.DELIVERY_TAG); try { channel.basicAck(tag,false); } catch (IOException e) { e.printStackTrace(); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。