赞
踩
使用RabbitMQ进行异步通信时,可能不仅仅需要传递消息本身,有时需要包含一些额外的信息,比如登录状态。可以参考前端发送请求时的做法,把登录状态保存在请求头中。Message类中也有请求头,可以使用一下方式添加到请求头:
message.getMessageProperties().getHeaders().put("user-info", 1L);
实际使用中,没有必要每次都创建一个Message类,然后手动添加。通过MessagePostProcessor可以简单实现请求头添加,而最常用的convertAndSend方法就可以直接传入MessagePostProcessor作为参数:
- String exchangeName = "test.direct";
- String key = "test";
- rabbitTemplate.convertAndSend(exchangeName, key, "msg", new MessagePostProcessor() {
- @Override
- public Message postProcessMessage(Message message) throws AmqpException {
- message.getMessageProperties().setHeader("user-info", 1L);
- // 这里也可以做一些其他的统一逻辑处理
- return message;
- }
- });
也可以这样写:
- @Data
- public class MyMessagePostProcessor implements MessagePostProcessor {
-
- private Long id;
-
- public MyMessagePostProcessor(Long id) {
- this.id = id;
- }
-
- @Override
- public Message postProcessMessage(Message message) throws AmqpException {
- message.getMessageProperties().getHeaders().put("user-info", id);
- return message;
- }
- }
- // 后续就不需要重写添加请求头的逻辑了
- rabbitTemplate.convertAndSend(exchangeName, key, "msg", new MyMessagePostProcessor(id);

实际上,RabbitTemplate也可以设置前置/后置处理器:
- @PostConstruct// 初始化完成后立即执行
- public void init(RabbitTemplate rabbitTemplate) {
- // 添加消息后置处理器(消费前执行)
- rabbitTemplate.setAfterReceivePostProcessors(new MessagePostProcessor() {
- @Override
- public Message postProcessMessage(Message message) throws AmqpException {
- Long userId = (Long) message.getMessageProperties().getHeaders().get("user-info");
- UserContext.setUser(userId);
- return message;
- }
- });
- // 添加消息前置处理器
- rabbitTemplate.setBeforePublishPostProcessors(new MessagePostProcessor() {
- @Override
- public Message postProcessMessage(Message message) throws AmqpException {
- message.getMessageProperties().getHeaders().put("user-info", 1L);
- return message;
- }
- });
- }

通过这种方法就不需要再每次发送消息前加一个处理器了,然而这种方式并不十分可行,它所添加的后置处理器针对的是rabbitTemplate,也就是消费端必须通过rabbitTemplate(也必须是同一个rabbitTemplate)消费,RabbitTemplate提供了两个直接消费的方法:
在异步场景下显然这两种方法都不满足要求。
一般情况下,消费更倾向于使用监听机制,也就是使用@RabbitListener注解,这种方式也可以自动的声明交换机、队列以及建立绑定,因此使用更加广泛
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "test.queue", durable = "true"),
- exchange = @Exchange(name = "test.direct"),
- key = "test"
- ))
- public void listenerTest(String message) {
- // todo
- }
然而这种方式无法执行前面设置的后置处理器逻辑,也就是拿不到消息头中的信息。
直接接收Message作为参数:
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "test.queue", durable = "true"),
- exchange = @Exchange(name = "test.direct"),
- key = "test"
- ))
- public void listenerTest(String msg, Message message) {
- // 这里不传消息体作为参数,MessageConvert就不会自动进行反序列化
- Long userId = (Long) message.getMessageProperties().getHeaders().get("user-info");
- System.out.println(userId);
- }
因此也可以通过Spring AOP 把处理请求头的逻辑抽取出来(AOP增强的是方法本身,因此即使方法中用不到Message这个参数,也不可以省略,否则就找不到消息头了):
- // 定义切面类
- @Aspect
- @Component
- public class RabbitListenerAspect {
-
- @Before("@annotation(org.springframework.amqp.rabbit.annotation.RabbitListener)")
- public void beforeMessageProcessing(JoinPoint joinPoint) {
- Object[] args = joinPoint.getArgs();
-
- for (Object arg : args) {
- if (arg instanceof Message) {
- Message message = (Message) arg;
-
- Map<String, Object> headers = message.getMessageProperties().getHeaders();
- Long userId = (Long) headers.get("user-info");
- UserContext.setUser(userId);
- }
- }
- }
- }

使用convertAndSend方法发送消息时,在发送前会通过MessageConvert对消息进行序列化(默认使用jdk序列化),在消费者接收前,也会同样的调用对应转换器中的反序列化工具转化为Java对象,因此可以利用反序列化这个过程,由于反序列化的过程发生在消费者端,因此可以在这个过程中执行一些消费前的逻辑:
- // 这里使用的是Json序列化转换器
- public class CustomMessageConverter extends Jackson2JsonMessageConverter {
- @Override
- public Object fromMessage(Message message) {
- // 获取headers
- Map<String, Object> headers = message.getMessageProperties().getHeaders();
- Long userId = (Long) headers.get("user-info");
- // 添加用户信息到上下文
- UserContext.setUser(userId);
- // 调用父类方法反序列化
- return super.fromMessage(message);
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。