当前位置:   article > 正文

RabbitMQ统一消息处理_rabbitmq messagepostprocessor 使用

rabbitmq messagepostprocessor 使用

添加消息请求头

使用RabbitMQ进行异步通信时,可能不仅仅需要传递消息本身,有时需要包含一些额外的信息,比如登录状态。可以参考前端发送请求时的做法,把登录状态保存在请求头中。Message类中也有请求头,可以使用一下方式添加到请求头:

message.getMessageProperties().getHeaders().put("user-info", 1L);

实际使用中,没有必要每次都创建一个Message类,然后手动添加。通过MessagePostProcessor可以简单实现请求头添加,而最常用的convertAndSend方法就可以直接传入MessagePostProcessor作为参数:

  1. String exchangeName = "test.direct";
  2. String key = "test";
  3. rabbitTemplate.convertAndSend(exchangeName, key, "msg", new MessagePostProcessor() {
  4. @Override
  5. public Message postProcessMessage(Message message) throws AmqpException {
  6. message.getMessageProperties().setHeader("user-info", 1L);
  7. // 这里也可以做一些其他的统一逻辑处理
  8. return message;
  9. }
  10. });

也可以这样写:

  1. @Data
  2. public class MyMessagePostProcessor implements MessagePostProcessor {
  3. private Long id;
  4. public MyMessagePostProcessor(Long id) {
  5. this.id = id;
  6. }
  7. @Override
  8. public Message postProcessMessage(Message message) throws AmqpException {
  9. message.getMessageProperties().getHeaders().put("user-info", id);
  10. return message;
  11. }
  12. }
  13. // 后续就不需要重写添加请求头的逻辑了
  14. rabbitTemplate.convertAndSend(exchangeName, key, "msg", new MyMessagePostProcessor(id);

消费消息请求头

使用消息后置处理器

实际上,RabbitTemplate也可以设置前置/后置处理器:

  1. @PostConstruct// 初始化完成后立即执行
  2. public void init(RabbitTemplate rabbitTemplate) {
  3. // 添加消息后置处理器(消费前执行)
  4. rabbitTemplate.setAfterReceivePostProcessors(new MessagePostProcessor() {
  5. @Override
  6. public Message postProcessMessage(Message message) throws AmqpException {
  7. Long userId = (Long) message.getMessageProperties().getHeaders().get("user-info");
  8. UserContext.setUser(userId);
  9. return message;
  10. }
  11. });
  12. // 添加消息前置处理器
  13. rabbitTemplate.setBeforePublishPostProcessors(new MessagePostProcessor() {
  14. @Override
  15. public Message postProcessMessage(Message message) throws AmqpException {
  16. message.getMessageProperties().getHeaders().put("user-info", 1L);
  17. return message;
  18. }
  19. });
  20. }

通过这种方法就不需要再每次发送消息前加一个处理器了,然而这种方式并不十分可行,它所添加的后置处理器针对的是rabbitTemplate,也就是消费端必须通过rabbitTemplate(也必须是同一个rabbitTemplate)消费,RabbitTemplate提供了两个直接消费的方法:

  • receive() 阻塞等待一定时间,返回值为Message,超时返回null;
  • receiveNowait() 直接尝试获取对应queue中的消息,返回值为Message,如果没有消息返回null;

在异步场景下显然这两种方法都不满足要求。

一般情况下,消费更倾向于使用监听机制,也就是使用@RabbitListener注解,这种方式也可以自动的声明交换机、队列以及建立绑定,因此使用更加广泛

  1. @RabbitListener(bindings = @QueueBinding(
  2. value = @Queue(name = "test.queue", durable = "true"),
  3. exchange = @Exchange(name = "test.direct"),
  4. key = "test"
  5. ))
  6. public void listenerTest(String message) {
  7. // todo
  8. }

然而这种方式无法执行前面设置的后置处理器逻辑,也就是拿不到消息头中的信息。

手动获取或利用AOP

直接接收Message作为参数:

  1. @RabbitListener(bindings = @QueueBinding(
  2. value = @Queue(name = "test.queue", durable = "true"),
  3. exchange = @Exchange(name = "test.direct"),
  4. key = "test"
  5. ))
  6. public void listenerTest(String msg, Message message) {
  7. // 这里不传消息体作为参数,MessageConvert就不会自动进行反序列化
  8. Long userId = (Long) message.getMessageProperties().getHeaders().get("user-info");
  9. System.out.println(userId);
  10. }

因此也可以通过Spring AOP 把处理请求头的逻辑抽取出来(AOP增强的是方法本身,因此即使方法中用不到Message这个参数,也不可以省略,否则就找不到消息头了):

  1. // 定义切面类
  2. @Aspect
  3. @Component
  4. public class RabbitListenerAspect {
  5. @Before("@annotation(org.springframework.amqp.rabbit.annotation.RabbitListener)")
  6. public void beforeMessageProcessing(JoinPoint joinPoint) {
  7. Object[] args = joinPoint.getArgs();
  8. for (Object arg : args) {
  9. if (arg instanceof Message) {
  10. Message message = (Message) arg;
  11. Map<String, Object> headers = message.getMessageProperties().getHeaders();
  12. Long userId = (Long) headers.get("user-info");
  13. UserContext.setUser(userId);
  14. }
  15. }
  16. }
  17. }

利用消息转换器

使用convertAndSend方法发送消息时,在发送前会通过MessageConvert对消息进行序列化(默认使用jdk序列化),在消费者接收前,也会同样的调用对应转换器中的反序列化工具转化为Java对象,因此可以利用反序列化这个过程,由于反序列化的过程发生在消费者端,因此可以在这个过程中执行一些消费前的逻辑:

  1. // 这里使用的是Json序列化转换器
  2. public class CustomMessageConverter extends Jackson2JsonMessageConverter {
  3. @Override
  4. public Object fromMessage(Message message) {
  5. // 获取headers
  6. Map<String, Object> headers = message.getMessageProperties().getHeaders();
  7. Long userId = (Long) headers.get("user-info");
  8. // 添加用户信息到上下文
  9. UserContext.setUser(userId);
  10. // 调用父类方法反序列化
  11. return super.fromMessage(message);
  12. }
  13. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/运维做开发/article/detail/763273
推荐阅读
相关标签
  

闽ICP备14008679号