当前位置:   article > 正文

springboot+RabbitMQ实现一条消息被所有consumer消费_rabbitmq一条消息让每一个消费者都消费一次

rabbitmq一条消息让每一个消费者都消费一次

需求背景:

        用户认证中心(Authorization center简称ac)使用jwt实现用户请求身份认证,需要支持多副本部署。系统架构如下:

        用户登录后生成jwt,纵向需要通过socket长连接把jwt下发到应用集成层ws,ws再把jwt下发到应用。前端请求各应用时可以在应用的filter中校验jwt是否有效,无效则向上询问ws jwt是否有效,无效再请求ac jwt是否有效。

        所以,用户登录请求通过负载均衡落到ac副本1(简称ac1)后,ac1生成jwt,除了纵向下发之外,还需要横向同步到ac2 ac3,ac2和ac3再纵向同步jwt,实现全平台的单点登录

具体需求:

        ac1发送消息到rabbit mq,其他的所有副本ac2和ac3消费消息。

实现方案:

        1.添加依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

        2.添加rabbitMQ配置

  1. spring:
  2. application:
  3. name: xxx
  4. rabbitmq:
  5. addresses: x.x.x.x:5672
  6. username: admin
  7. password: admin
  8. virtual-host: /
  9. # 启用消息发布ack
  10. publisher-confirm-type: correlated
  11. # 启用发布返回
  12. publisher-returns: true
  13. template:
  14. #启用强制信息;默认false
  15. mandatory: true
  16. retry:
  17. # 发送重试是否可用
  18. enabled: true
  19. #最大重试次数
  20. max-attempts: 3
  21. #消费端配置
  22. listener:
  23. simple:
  24. missing-queues-fatal: false
  25. #最大/最小的消费者数量
  26. concurrency: 1
  27. max-concurrency: 8
  28. #表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
  29. acknowledge-mode: manual
  30. #监听器抛出异常而拒绝的消息是否被重新放回队列,默认值为true
  31. default-requeue-rejected: true
  32. #每次从mq取消息的条数
  33. prefetch: 1
  34. retry:
  35. enabled: true
  36. #最大重试次数
  37. max-attempts: 5
  38. #最大重试时间间隔
  39. max-interval: 32000
  40. #第一次和第二次尝试发布或传递消息之间的间隔
  41. initial-interval: 2000
  42. #应用于上一重试间隔的乘数
  43. multiplier: 2

         3.登录service中发送mq(jwt),即mq provider

  1. @Autowired
  2. private RabbitTemplate rabbitTemplate;
  3. public void login(Token token){
  4. //如果是自己发的mq则不需要消费,直接return
  5. if(cache.contains(token.getJwt())return;
  6. //下发token到ws层
  7. server.getNamespace("/ws1").getBroadcastOperations().sendEvent("LOGIN_EVENT",token,
  8. broadcastAckCallback);
  9. //获取redis中注册的ac节点信息,如果节点数大于1则需要发送mq同步token
  10. int acNodeCount= redisTemplate.boundValueOps(nodeKey).get();
  11. if(acNodeCount>1){
  12. rabbitTemplate.convertAndSend(""ac-topic-exchange"",
  13. AcConstant.LOGIN_ROUTING_KEY,
  14. token);
  15. }

4.MQ consumer

通过@Queue注解中的value属性,用spring spel表达式,在每次启动ac的时候生成一个带有随机字符串的名字,绑定到ac的topic exchange。这样,启动3个ac副本,就有3个queue绑定到了ac的exchange,mq message发送到topic exchange, 通过routing key分发到所有符合规则的queue,就能实现所有副本消费同一条消息。

  1. @RabbitListener(
  2. bindings = {
  3. @QueueBinding(
  4. value = @Queue(value = "ac-login-queue- #{T(System).currentTimeMillis()}"),
  5. exchange = @Exchange(value = "ac-topic-exchange",
  6. type = ExchangeTypes.TOPIC),
  7. key = "ac.login.#")
  8. },
  9. ackMode = "MANUAL")
  10. @RabbitHandler
  11. public void loginConsumer(Token token, Channel channel, Message message) throws IOException {
  12. log.info("【loginConsumer】 监听到其他ac节点的登录事件,jwt:{}",token.getJwt());
  13. long messageId = message.getMessageProperties().getDeliveryTag();
  14. try{
  15. listenLogin();
  16. //手动ack
  17. channel.basicAck(messageId,false);
  18. }catch (Exception e){
  19. //失败ack,消息重新入队
  20. log.error("loginConsumer消费失败:{}",e.getCause().getMessage());
  21. channel.basicNack(messageId,false,true);
  22. }
  23. }

登出时的实现逻辑相同。 

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号