赞
踩
用户认证中心(Authorization center简称ac)使用jwt实现用户请求身份认证,需要支持多副本部署。系统架构如下:
用户登录后生成jwt,纵向需要通过socket长连接把jwt下发到应用集成层ws,ws再把jwt下发到应用。前端请求各应用时可以在应用的filter中校验jwt是否有效,无效则向上询问ws jwt是否有效,无效再请求ac jwt是否有效。
所以,用户登录请求通过负载均衡落到ac副本1(简称ac1)后,ac1生成jwt,除了纵向下发之外,还需要横向同步到ac2 ac3,ac2和ac3再纵向同步jwt,实现全平台的单点登录。
ac1发送消息到rabbit mq,其他的所有副本ac2和ac3消费消息。
1.添加依赖
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
2.添加rabbitMQ配置
- spring:
- application:
- name: xxx
- rabbitmq:
- addresses: x.x.x.x:5672
- username: admin
- password: admin
- virtual-host: /
- # 启用消息发布ack
- publisher-confirm-type: correlated
- # 启用发布返回
- publisher-returns: true
- template:
- #启用强制信息;默认false
- mandatory: true
- retry:
- # 发送重试是否可用
- enabled: true
- #最大重试次数
- max-attempts: 3
- #消费端配置
- listener:
- simple:
- missing-queues-fatal: false
- #最大/最小的消费者数量
- concurrency: 1
- max-concurrency: 8
- #表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
- acknowledge-mode: manual
- #监听器抛出异常而拒绝的消息是否被重新放回队列,默认值为true。
- default-requeue-rejected: true
- #每次从mq取消息的条数
- prefetch: 1
- retry:
- enabled: true
- #最大重试次数
- max-attempts: 5
- #最大重试时间间隔
- max-interval: 32000
- #第一次和第二次尝试发布或传递消息之间的间隔
- initial-interval: 2000
- #应用于上一重试间隔的乘数
- multiplier: 2
3.登录service中发送mq(jwt),即mq provider
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- public void login(Token token){
- //如果是自己发的mq则不需要消费,直接return
- if(cache.contains(token.getJwt())return;
-
- //下发token到ws层
- server.getNamespace("/ws1").getBroadcastOperations().sendEvent("LOGIN_EVENT",token,
- broadcastAckCallback);
-
- //获取redis中注册的ac节点信息,如果节点数大于1则需要发送mq同步token
- int acNodeCount= redisTemplate.boundValueOps(nodeKey).get();
- if(acNodeCount>1){
- rabbitTemplate.convertAndSend(""ac-topic-exchange"",
- AcConstant.LOGIN_ROUTING_KEY,
- token);
- }
4.MQ consumer
通过@Queue注解中的value属性,用spring spel表达式,在每次启动ac的时候生成一个带有随机字符串的名字,绑定到ac的topic exchange。这样,启动3个ac副本,就有3个queue绑定到了ac的exchange,mq message发送到topic exchange, 通过routing key分发到所有符合规则的queue,就能实现所有副本消费同一条消息。
- @RabbitListener(
- bindings = {
- @QueueBinding(
- value = @Queue(value = "ac-login-queue- #{T(System).currentTimeMillis()}"),
- exchange = @Exchange(value = "ac-topic-exchange",
- type = ExchangeTypes.TOPIC),
- key = "ac.login.#")
- },
- ackMode = "MANUAL")
- @RabbitHandler
- public void loginConsumer(Token token, Channel channel, Message message) throws IOException {
- log.info("【loginConsumer】 监听到其他ac节点的登录事件,jwt:{}",token.getJwt());
- long messageId = message.getMessageProperties().getDeliveryTag();
- try{
- listenLogin();
- //手动ack
- channel.basicAck(messageId,false);
- }catch (Exception e){
- //失败ack,消息重新入队
- log.error("loginConsumer消费失败:{}",e.getCause().getMessage());
- channel.basicNack(messageId,false,true);
- }
- }
登出时的实现逻辑相同。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。