当前位置:   article > 正文

(二十)ATP应用测试平台——websocket实现微服务版在线客服聊天室实战案例_微服务聊天系统

微服务聊天系统

前言

在前面的博客内容中我们介绍了如何使用websocket实现一个网页版的在线客服聊天室,众所周知,由于websocket是一个长连接,要和服务端保持会话连接,所以其本身并不适用于微服务环境,在微服务环境中,有可能A、B俩个客户端连接到不同的服务A、B中,这样就没法保证A、B俩个客户端完成聊天的功能,因为会话不在同一台服务器上,A、B无法感知到对方发送的消息,为了解决websocket单机的这个痛点,我们引入消息中间键RocketMQ的广播机制,实现消息的转发,从而实现微服务版的websocke聊天室功能。其架构如下:

本节内容使用的主要技术包含springboot、redis、rocketmq、vue等,关于中间键的搭建本节内容不在展开,请关注作者的往期博客内容。 

正文

  • 引入websocket、redis和rocketmq的pom依赖

①核心pom依赖

  1. <!-- rocketmq-->
  2. <dependency>
  3. <groupId>org.apache.rocketmq</groupId>
  4. <artifactId>rocketmq-spring-boot-starter</artifactId>
  5. <version>2.2.2</version>
  6. </dependency>
  7. <!-- websocket-->
  8. <dependency>
  9. <groupId>org.springframework.boot</groupId>
  10. <artifactId>spring-boot-starter-websocket</artifactId>
  11. </dependency>
  12. <!-- redis-->
  13. <dependency>
  14. <groupId>org.springframework.session</groupId>
  15. <artifactId>spring-session-data-redis</artifactId>
  16. <version>2.4.3</version>
  17. </dependency>

 PS:可以按需引入自己需要的依赖,作者这里只列出核心的pom依赖

  • 配置application.yml

①配置文件

  1. server:
  2. port: 8888
  3. spring:
  4. #数据源配置
  5. datasource:
  6. dynamic:
  7. primary: master #设置默认的数据源或者数据源组,默认值即为master
  8. strict: false #设置严格模式,默认false不启动. 启动后在未匹配到指定数据源时候会抛出异常,不启动则使用默认数据源.
  9. datasource:
  10. master:
  11. url: jdbc:mysql://192.168.56.10:3306/atp
  12. username: root
  13. password: root
  14. driver-class-name: com.mysql.cj.jdbc.Driver # 3.2.0开始支持SPI可省略此配置
  15. profiles:
  16. active: dev
  17. servlet:
  18. multipart:
  19. max-file-size: 52428800
  20. max-request-size: 52428800
  21. #redis配置
  22. redis:
  23. #redisson配置
  24. redisson:
  25. file: classpath:redisson.yaml
  26. #默认数据分区
  27. database: 0
  28. #redis集群节点配置
  29. cluster:
  30. nodes:
  31. - 192.168.56.10:6379
  32. - 192.168.56.10:6380
  33. - 192.168.56.10.6381
  34. max-redirects: 3
  35. #超时时间
  36. timeout: 10000
  37. #哨兵节点配置
  38. sentinel:
  39. master: mymaster
  40. nodes:
  41. - "192.168.56.10:26379"
  42. - "192.168.56.10:26380"
  43. - "192.168.56.10:26381"
  44. #redis密码
  45. password: root
  46. #redis 客户端工具
  47. lettuce:
  48. pool:
  49. # 连接池最大连接数(使用负值表示没有限制) 默认为8
  50. max-active: 8
  51. # 连接池中的最小空闲连接 默认为 0
  52. min-idle: 1
  53. # 连接池最大阻塞等待时间(使用负值表示没有限制) 默认为-1
  54. max-wait: 1000
  55. # 连接池中的最大空闲连接 默认为8
  56. max-idle: 8
  57. session:
  58. store-type: redis
  59. redis:
  60. flush-mode: on_save
  61. namespace: spring:session:atp
  62. thymeleaf:
  63. cache: false
  64. #mybatisplus配置
  65. mybatis-plus:
  66. mapper-locations: classpath*:/mapper/*/*Mapper.xml
  67. type-aliases-package: com.yundi.atp.platform.module.*.entity
  68. configuration:
  69. map-underscore-to-camel-case: true
  70. global-config:
  71. db-config:
  72. id-type: assign_id
  73. #rocketmq配置
  74. rocketmq:
  75. #注册地址
  76. name-server: 192.168.56.10:9876;192.168.56.10:9877
  77. producer:
  78. #生产者组名称
  79. group: atp-producer
  80. #命名空间
  81. namespace: atp
  82. #异步消息发送失败重试次数,默认是2
  83. retry-times-when-send-async-failed: 2
  84. #发送消息超时时间,默认2000ms
  85. send-message-timeout: 2000
  86. #消息的最大长度:默认1024 * 1024 * 4(默认4M)
  87. max-message-size: 40000000
  88. #压缩消息阈值,超过4k就压缩
  89. compress-message-body-threshold: 4096
  90. #是否发送失败,重试另外的broker
  91. retry-next-server: false
  92. #是否启用消息追踪
  93. enable-msg-trace: false
  94. #默认追踪的主题
  95. customized-trace-topic: RMQ_SYS_TRACE_TOPIC
  96. #消息发送失败重试的次数
  97. retry-times-when-send-failed: 2

  • 创建websocket服务配置WebSocketConfig.java

  1. package com.yundi.atp.platform.websocket;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.web.socket.server.standard.ServerEndpointExporter;
  5. @Configuration
  6. public class WebSocketConfig {
  7. /**
  8. * 注入ServerEndpointExporter,
  9. * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
  10. */
  11. @Bean
  12. public ServerEndpointExporter serverEndpointExporter() {
  13. return new ServerEndpointExporter();
  14. }
  15. }
 
  • 创建微服务版websocket服务

  1. package com.yundi.atp.platform.websocket;
  2. import com.alibaba.fastjson.JSON;
  3. import com.yundi.atp.platform.common.Constant;
  4. import com.yundi.atp.platform.enums.MessageType;
  5. import com.yundi.atp.platform.module.test.entity.ChatMsg;
  6. import com.yundi.atp.platform.module.test.service.ChatMsgService;
  7. import com.yundi.atp.platform.rocketmq.RocketConstant;
  8. import com.yundi.atp.platform.rocketmq.RocketProducer;
  9. import lombok.extern.slf4j.Slf4j;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.data.redis.core.RedisTemplate;
  12. import org.springframework.stereotype.Component;
  13. import javax.websocket.OnClose;
  14. import javax.websocket.OnMessage;
  15. import javax.websocket.OnOpen;
  16. import javax.websocket.Session;
  17. import javax.websocket.server.PathParam;
  18. import javax.websocket.server.ServerEndpoint;
  19. import java.util.Map;
  20. import java.util.Set;
  21. import java.util.UUID;
  22. import java.util.concurrent.ConcurrentHashMap;
  23. import java.util.concurrent.CopyOnWriteArraySet;
  24. @Slf4j
  25. @Component
  26. @ServerEndpoint(Constant.WEBSOCKET_MQ_URL + "{userName}")
  27. public class WebSocketMqServer {
  28. /**
  29. * 会话session
  30. */
  31. private Session session;
  32. /**
  33. * socket连接
  34. */
  35. private static CopyOnWriteArraySet<WebSocketMqServer> webSockets = new CopyOnWriteArraySet<>();
  36. /**
  37. * 会话连接池
  38. */
  39. private static Map<String, Session> sessionPool = new ConcurrentHashMap<>();
  40. /**
  41. * 消息持久化
  42. */
  43. private static ChatMsgService chatMsgService;
  44. /**
  45. * redis
  46. */
  47. private static RedisTemplate redisTemplate;
  48. /**
  49. * RocketMQ消息工具类
  50. */
  51. private static RocketProducer rocketProducer;
  52. @Autowired
  53. public void setWebSocketServer(ChatMsgService chatMsgService,
  54. RedisTemplate redisTemplate,
  55. RocketProducer rocketProducer) {
  56. WebSocketMqServer.chatMsgService = chatMsgService;
  57. WebSocketMqServer.redisTemplate = redisTemplate;
  58. WebSocketMqServer.rocketProducer = rocketProducer;
  59. }
  60. @OnOpen
  61. public void onOpen(Session session, @PathParam(value = "userName") String userName) {
  62. //1.将用户添加到在线用户列表中
  63. if (!Constant.SUPER_ADMIN.equals(userName)) {
  64. redisTemplate.opsForSet().add("online", userName);
  65. }
  66. //2.保存会话连接
  67. this.session = session;
  68. webSockets.add(this);
  69. sessionPool.put(userName, session);
  70. Set online = redisTemplate.opsForSet().members("online");
  71. log.info("【websocket消息】有新的连接,总在线人数为:" + online.size());
  72. //3.创建消息
  73. WebSocketMqMsg webSocketMqMsg = new WebSocketMqMsg();
  74. //消息类型
  75. webSocketMqMsg.setKey(MessageType.MESSAGE_OPEN.getCode());
  76. //在线人数
  77. webSocketMqMsg.setOnlineList(online);
  78. //全部人数
  79. webSocketMqMsg.setUserList(chatMsgService.getUserList());
  80. //4.消息异步发送到RocketMQ
  81. rocketProducer.sendAsyncMsg(RocketConstant.ROCKET_TOPIC, RocketConstant.ROCKET_TAG_CHAT, UUID.randomUUID().toString(), JSON.toJSONString(webSocketMqMsg));
  82. }
  83. @OnClose
  84. public void onClose(@PathParam(value = "userName") String userName) {
  85. //1.更新在线用户列表
  86. redisTemplate.opsForSet().remove("online", userName);
  87. //2.清除会话连接
  88. webSockets.remove(this);
  89. sessionPool.remove(userName);
  90. Set online = redisTemplate.opsForSet().members("online");
  91. log.info("【websocket消息】连接断开,总在线人数为:" + online.size());
  92. //3.创建消息
  93. WebSocketMqMsg webSocketMqMsg = new WebSocketMqMsg();
  94. webSocketMqMsg.setKey(MessageType.MESSAGE_CLOSE.getCode());
  95. webSocketMqMsg.setOnlineList(online);
  96. webSocketMqMsg.setUserList(chatMsgService.getUserList());
  97. //4.消息异步发送到RocketMQ
  98. rocketProducer.sendAsyncMsg(RocketConstant.ROCKET_TOPIC, RocketConstant.ROCKET_TAG_CHAT, UUID.randomUUID().toString(), JSON.toJSONString(webSocketMqMsg));
  99. }
  100. @OnMessage
  101. public void onMessage(String message) {
  102. //1.持久化消息内容
  103. ChatMsg chatMsg = JSON.parseObject(message, ChatMsg.class);
  104. chatMsgService.save(chatMsg);
  105. //2.创建消息
  106. WebSocketMqMsg webSocketMqMsg = new WebSocketMqMsg();
  107. webSocketMqMsg.setKey(MessageType.MESSAGE_SEND.getCode());
  108. webSocketMqMsg.setData(chatMsg);
  109. //3.消息异步发送到RocketMQ
  110. rocketProducer.sendAsyncMsg(RocketConstant.ROCKET_TOPIC, RocketConstant.ROCKET_TAG_CHAT, UUID.randomUUID().toString(), JSON.toJSONString(webSocketMqMsg));
  111. }
  112. /**
  113. * 广播消息
  114. */
  115. public void sendAllMessage(String message) {
  116. for (WebSocketMqServer webSocket : webSockets) {
  117. log.info("【websocket消息】广播消息:" + message);
  118. try {
  119. Session session = webSocket.session;
  120. if (session != null && session.isOpen()) {
  121. webSocket.session.getAsyncRemote().sendText(message);
  122. }
  123. } catch (Exception e) {
  124. e.printStackTrace();
  125. }
  126. }
  127. }
  128. /**
  129. * 单点消息
  130. *
  131. * @param userName
  132. * @param message
  133. */
  134. public void sendOneMessage(String userName, String message) {
  135. log.info("【websocket消息】单点消息:" + message);
  136. Session session = sessionPool.get(userName);
  137. if (session != null && session.isOpen()) {
  138. try {
  139. session.getAsyncRemote().sendText(message);
  140. } catch (Exception e) {
  141. e.printStackTrace();
  142. }
  143. }
  144. }
  145. }

ps:这里我们将会话的消息先推送给消息中间键RocketMQ,然后将消息通过广播的形式分发给每一台服务器去消费,如何能消费成功 ,就将消息推送给对应的客户端

  • 常量定义
  1. package com.yundi.atp.platform.common;
  2. public class Constant {
  3. /**
  4. * zookeeper分布式锁根路径
  5. */
  6. public final static String LOCK_ROOT_PATH = "/zookeeper/lock/";
  7. /**
  8. * websocket协议
  9. */
  10. public final static String WEBSOCKET_PROTOCOL = "ws://";
  11. /**
  12. * 单机版聊天室
  13. */
  14. public final static String WEBSOCKET_SINGLE_URL = "/websocket/chat/";
  15. /**
  16. * 微服务版聊天室
  17. */
  18. public final static String WEBSOCKET_MQ_URL = "/websocket/mq/chat/";
  19. /**
  20. * 超级管理员
  21. */
  22. public final static String SUPER_ADMIN = "super_admin";
  23. }

  • 自定义消息类型:根据不同消息内容处理不同的消息业务逻辑
  1. package com.yundi.atp.platform.enums;
  2. public enum MessageType {
  3. MESSAGE_OPEN(1, "开启连接"),
  4. MESSAGE_CLOSE(2, "断开连接"),
  5. MESSAGE_SEND(3, "发送消息"),
  6. MESSAGE_RE_OPEN(4, "异地登录下线通知");
  7. private Integer code;
  8. private String msg;
  9. MessageType(Integer code, String msg) {
  10. this.code = code;
  11. this.msg = msg;
  12. }
  13. public Integer getCode() {
  14. return code;
  15. }
  16. public String getMsg() {
  17. return msg;
  18. }
  19. }

​​​​​​​

  • RocketMQ消息发送的工具类 
  1. package com.yundi.atp.platform.rocketmq;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.rocketmq.client.producer.*;
  4. import org.apache.rocketmq.spring.core.RocketMQTemplate;
  5. import org.apache.rocketmq.spring.support.RocketMQHeaders;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.messaging.Message;
  8. import org.springframework.messaging.support.MessageBuilder;
  9. import org.springframework.stereotype.Component;
  10. @Component
  11. @Slf4j
  12. public class RocketProducer {
  13. @Autowired
  14. private RocketMQTemplate rocketMQTemplate;
  15. /**
  16. * 发送同步消息:消息响应后发送下一条消息
  17. *
  18. * @param topic 消息主题
  19. * @param tag 消息tag
  20. * @param key 业务号
  21. * @param data 消息内容
  22. */
  23. public void sendSyncMsg(String topic, String tag, String key, String data) {
  24. //消息
  25. Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
  26. //主题
  27. String destination = topic + ":" + tag;
  28. SendResult sendResult = rocketMQTemplate.syncSend(destination, message);
  29. log.info("【RocketMQ】发送同步消息:{}", sendResult);
  30. }
  31. /**
  32. * 发送异步消息:异步回调通知消息发送的状况
  33. *
  34. * @param topic 消息主题
  35. * @param tag 消息tag
  36. * @param key 业务号
  37. * @param data 消息内容
  38. */
  39. public void sendAsyncMsg(String topic, String tag, String key, String data) {
  40. //消息
  41. Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
  42. //主题
  43. String destination = topic + ":" + tag;
  44. rocketMQTemplate.asyncSend(destination, message, new SendCallback() {
  45. @Override
  46. public void onSuccess(SendResult sendResult) {
  47. log.info("【RocketMQ】发送异步消息:{}", sendResult);
  48. }
  49. @Override
  50. public void onException(Throwable e) {
  51. log.info("【RocketMQ】发送异步消息异常:{}", e.getMessage());
  52. }
  53. });
  54. }
  55. /**
  56. * 发送单向消息:消息发送后无响应,可靠性差,效率高
  57. *
  58. * @param topic 消息主题
  59. * @param tag 消息tag
  60. * @param key 业务号
  61. * @param data 消息内容
  62. */
  63. public void sendOneWayMsg(String topic, String tag, String key, String data) {
  64. //消息
  65. Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
  66. //主题
  67. String destination = topic + ":" + tag;
  68. rocketMQTemplate.sendOneWay(destination, message);
  69. }
  70. /**
  71. * 同步延迟消息
  72. *
  73. * @param topic 主题
  74. * @param tag 标签
  75. * @param key 业务号
  76. * @param data 消息体
  77. * @param timeout 发送消息的过期时间
  78. * @param delayLevel 延迟等级-----固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
  79. */
  80. public void sendSyncDelayMsg(String topic, String tag, String key, String data, long timeout, int delayLevel) {
  81. //消息
  82. Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
  83. //主题
  84. String destination = topic + ":" + tag;
  85. SendResult sendResult = rocketMQTemplate.syncSend(destination, message, timeout, delayLevel);
  86. log.info("【RocketMQ】发送同步延迟消息:{}", sendResult);
  87. }
  88. /**
  89. * 异步延迟消息
  90. *
  91. * @param topic 主题
  92. * @param tag 标签
  93. * @param key 业务号
  94. * @param data 消息体
  95. * @param timeout 发送消息的过期时间
  96. * @param delayLevel 延迟等级-----固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
  97. */
  98. public void sendAsyncDelayMsg(String topic, String tag, String key, String data, long timeout, int delayLevel) {
  99. //消息
  100. Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
  101. //主题
  102. String destination = topic + ":" + tag;
  103. rocketMQTemplate.asyncSend(destination, message, new SendCallback() {
  104. @Override
  105. public void onSuccess(SendResult sendResult) {
  106. log.info("【RocketMQ】发送异步延迟消息:{}", sendResult);
  107. }
  108. @Override
  109. public void onException(Throwable e) {
  110. log.info("【RocketMQ】发送异步延迟消息异常:{}", e.getMessage());
  111. }
  112. }, timeout, delayLevel);
  113. }
  114. /**
  115. * 同步顺序消息
  116. *
  117. * @param topic 主题
  118. * @param tag 标签
  119. * @param key 业务号
  120. * @param data 消息体
  121. */
  122. public void sendSyncOrderlyMsg(String topic, String tag, String key, String data) {
  123. //消息
  124. Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
  125. //主题
  126. String destination = topic + ":" + tag;
  127. SendResult sendResult = rocketMQTemplate.syncSendOrderly(destination, message, key);
  128. log.info("【RocketMQ】发送同步顺序消息:{}", sendResult);
  129. }
  130. /**
  131. * 异步顺序消息
  132. *
  133. * @param topic 主题
  134. * @param tag 标签
  135. * @param key 业务号
  136. * @param data 消息体
  137. */
  138. public void sendAsyncOrderlyMsg(String topic, String tag, String key, String data) {
  139. //消息
  140. Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
  141. //主题
  142. String destination = topic + ":" + tag;
  143. rocketMQTemplate.asyncSendOrderly(destination, message, key, new SendCallback() {
  144. @Override
  145. public void onSuccess(SendResult sendResult) {
  146. log.info("【RocketMQ】发送异步顺序消息:{}", sendResult);
  147. }
  148. @Override
  149. public void onException(Throwable e) {
  150. log.info("【RocketMQ】发送异步顺序消息异常:{}", e.getMessage());
  151. }
  152. });
  153. }
  154. /**
  155. * 分布式事务消息
  156. *
  157. * @param topic 主题
  158. * @param tag 标签
  159. * @param key 业务号
  160. * @param data 消息体
  161. */
  162. public void sendTransactionMsg(String topic, String tag, String key, String data) {
  163. //消息
  164. Message message = MessageBuilder.withPayload(data)
  165. .setHeader(RocketMQHeaders.KEYS, key)
  166. .setHeader(RocketMQHeaders.TRANSACTION_ID, key)
  167. .build();
  168. //主题
  169. String destination = topic + ":" + tag;
  170. TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(destination, message, null);
  171. if (transactionSendResult.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE) &&
  172. transactionSendResult.getSendStatus().equals(SendStatus.SEND_OK)) {
  173. log.info("分布式事物消息发送成功");
  174. }
  175. log.info("分布式事物消息发送结果:{}", transactionSendResult);
  176. }
  177. }

  • websocket服务的连接地址获取及历史消息获取 
  1. package com.yundi.atp.platform.module.test.controller;
  2. import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
  3. import com.yundi.atp.platform.common.Result;
  4. import com.yundi.atp.platform.module.test.entity.ChatMsg;
  5. import com.yundi.atp.platform.module.test.service.ChatMsgService;
  6. import io.swagger.annotations.Api;
  7. import io.swagger.annotations.ApiOperation;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.web.bind.annotation.GetMapping;
  10. import org.springframework.web.bind.annotation.PathVariable;
  11. import org.springframework.web.bind.annotation.RequestMapping;
  12. import org.springframework.web.bind.annotation.RestController;
  13. import javax.servlet.http.HttpServletRequest;
  14. import java.net.InetAddress;
  15. import java.net.UnknownHostException;
  16. import java.util.Comparator;
  17. import java.util.List;
  18. import java.util.stream.Collectors;
  19. @Api(tags = "聊天室接口-mq版")
  20. @RestController
  21. @RequestMapping("/test/mq/chatMsg")
  22. public class ChatMsgMqController {
  23. @Autowired
  24. private ChatMsgService chatMsgService;
  25. @ApiOperation(value = "获取聊天室地址")
  26. @GetMapping(value = "/getWebSocketAddress/{username}")
  27. public Result getWebSocketAddress(HttpServletRequest request, @PathVariable(value = "username") String username) throws UnknownHostException {
  28. String address = "ws://" + InetAddress.getLocalHost().getHostAddress() + ":" + request.getServerPort() + request.getContextPath() + "/websocket/mq/chat/" + username;
  29. return Result.success(address);
  30. }
  31. @ApiOperation(value = "获取历史聊天记录")
  32. @GetMapping(value = "/getHistoryChat/{username}")
  33. public Result getWebSocketAddress(@PathVariable(value = "username") String username) {
  34. List<ChatMsg> list = chatMsgService.list(new QueryWrapper<ChatMsg>()
  35. .and(wrapper -> wrapper.eq("sender", username).or().eq("receiver", username))
  36. .orderByDesc("create_time"));
  37. List<ChatMsg> collect = list.stream().sorted(Comparator.comparing(ChatMsg::getCreateTime)).collect(Collectors.toList());
  38. return Result.success(collect);
  39. }
  40. @ApiOperation(value = "获取用户列表")
  41. @GetMapping(value = "/getUserList")
  42. public Result getUserList() {
  43. List<String> userList = chatMsgService.getUserList();
  44. return Result.success(userList);
  45. }
  46. }

​​​​​​​

  • 消息的广播分发
  1. package com.yundi.atp.platform.websocket;
  2. import com.alibaba.fastjson.JSON;
  3. import com.yundi.atp.platform.common.Constant;
  4. import com.yundi.atp.platform.enums.MessageType;
  5. import com.yundi.atp.platform.module.test.entity.ChatMsg;
  6. import com.yundi.atp.platform.rocketmq.RocketConstant;
  7. import lombok.extern.slf4j.Slf4j;
  8. import org.apache.rocketmq.spring.annotation.MessageModel;
  9. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
  10. import org.apache.rocketmq.spring.core.RocketMQListener;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.stereotype.Component;
  13. @Slf4j
  14. @Component
  15. @RocketMQMessageListener(consumerGroup = RocketConstant.ROCKET_CONSUMER_CHAT_GROUP,
  16. topic = RocketConstant.ROCKET_TOPIC,
  17. selectorExpression = RocketConstant.ROCKET_TAG_CHAT,
  18. namespace = RocketConstant.ROCKET_NAMESPACE,
  19. messageModel = MessageModel.BROADCASTING)
  20. public class WebSocketMqConsumer implements RocketMQListener<String> {
  21. @Autowired
  22. WebSocketMqServer webSocketMqServer;
  23. @Override
  24. public void onMessage(String message) {
  25. log.info("聊天室消息:{}", message);
  26. //1.解析消息
  27. WebSocketMqMsg webSocketMqMsg = JSON.parseObject(message, WebSocketMqMsg.class);
  28. //2.根据消息类型解析消息
  29. // 建立连接消息
  30. if (webSocketMqMsg.getKey().equals(MessageType.MESSAGE_OPEN.getCode())) {
  31. webSocketMqServer.sendOneMessage(Constant.SUPER_ADMIN, message);
  32. }
  33. // 关闭连接消息
  34. if (webSocketMqMsg.getKey().equals(MessageType.MESSAGE_CLOSE.getCode())) {
  35. webSocketMqServer.sendOneMessage(Constant.SUPER_ADMIN, message);
  36. }
  37. // 发送消息
  38. if (webSocketMqMsg.getKey().equals(MessageType.MESSAGE_SEND.getCode())) {
  39. ChatMsg data = webSocketMqMsg.getData();
  40. webSocketMqServer.sendOneMessage(data.getSender(), message);
  41. webSocketMqServer.sendOneMessage(data.getReceiver(), message);
  42. }
  43. }
  44. }

  • 消息主题定义
  1. package com.yundi.atp.platform.rocketmq;
  2. public class RocketConstant {
  3. /**
  4. * 消费者组
  5. */
  6. public final static String ROCKET_CONSUMER_GROUP = "atp-consumer";
  7. /**
  8. * 聊天室消费者组
  9. */
  10. public final static String ROCKET_CONSUMER_CHAT_GROUP = "atp-chat-consumer";
  11. /**
  12. * 主题
  13. */
  14. public final static String ROCKET_TOPIC = "atp";
  15. /**
  16. * tag
  17. */
  18. public final static String ROCKET_TAG = "app";
  19. /**
  20. * 聊天室tag
  21. */
  22. public final static String ROCKET_TAG_CHAT = "chat";
  23. /**
  24. * 名称空间
  25. */
  26. public final static String ROCKET_NAMESPACE = "atp";
  27. }

  • 客户端代码
  1. <template>
  2. <div class="container">
  3. <el-card class="box-card">
  4. <div slot="header">
  5. <el-row type="flex">
  6. <el-col :span="1" style="margin: 15px 10px;">
  7. <img alt="ATP客服" src="@/assets/logo.png" style="width:40px;height:40px;"/>
  8. </el-col>
  9. <el-col :span="3" style="line-height: 74px;margin-left: 10px;">
  10. <span style="display: inline-block;color: white;">ATP客服</span>
  11. </el-col>
  12. <el-col :span="20" v-if="username==='super_admin'">
  13. <h5 style="color: #83ccd2;padding: 0;text-align: right;margin: 50px 20px 0 0;">当前在线人数:{{ online }}</h5>
  14. </el-col>
  15. <el-col :span="20" v-else>
  16. <h5 style="color: #83ccd2;padding: 0 0 2px 0;text-align: right;margin: 50px 20px 0 0;font-size: 18px;">
  17. {{ username }}</h5>
  18. </el-col>
  19. </el-row>
  20. </div>
  21. <div class="content" ref="content">
  22. <el-row type="flex">
  23. <el-col :span="6" style="background: #eee;min-height: 600px;" v-if="username==='super_admin'">
  24. <el-tabs v-model="activeName" @tab-click="handleClick" style="width: 190px;margin: 0 2px;">
  25. <el-tab-pane label="在线用户" name="online">
  26. <div v-for="item in friend" :key="item" @click="switchUser(item)" :class="item===active?'mark':''">
  27. <el-badge :is-dot=msgNotify.includes(item) class="item" type="success">
  28. <li style="list-style-type:none;padding: 5px 8px;cursor: pointer;"
  29. class="active">
  30. {{ item }}
  31. </li>
  32. </el-badge>
  33. <el-divider></el-divider>
  34. </div>
  35. </el-tab-pane>
  36. <el-tab-pane label="全部用户" name="all">
  37. <div v-for="item in userList" :key="item" @click="switchUser(item)" :class="item===active?'mark':''">
  38. <el-badge :is-dot=msgNotify.includes(item) class="item" type="success">
  39. <li style="list-style-type:none;padding: 5px 8px;cursor: pointer;"
  40. :class="friend.includes(item)?'active':''">
  41. {{ item }}
  42. </li>
  43. </el-badge>
  44. <el-divider></el-divider>
  45. </div>
  46. </el-tab-pane>
  47. </el-tabs>
  48. </el-col>
  49. <el-col :span="18" v-if="username==='super_admin'">
  50. <div v-for="item in chatMsgList">
  51. <el-row type="flex" style="margin-bottom: 20px;" v-if="username===item.sender">
  52. <el-col :span="2">
  53. <img alt="ATP客服" src="@/assets/logo.png" style="width:30px;height:30px;margin: 10px 0px 0px 20px;"/>
  54. </el-col>
  55. <el-col :span="22">
  56. <el-row type="flex" style="margin-top: 10px;margin-left: 5px;opacity: 0.2;">
  57. <el-col :span="7"><span style="padding-left: 20px;">{{ item.sender }}</span></el-col>
  58. <el-col :span="7"><span>{{ item.createTime | dataFormat('yyyy-MM-dd HH:mm') }}</span></el-col>
  59. </el-row>
  60. <el-row>
  61. <el-col :span="14" style="margin-left: 8px;margin-top: 5px;">
  62. <el-card style="padding: 8px 5px;">
  63. {{ item.msg }}
  64. </el-card>
  65. </el-col>
  66. </el-row>
  67. </el-col>
  68. </el-row>
  69. <el-row type="flex" style="margin-bottom: 20px;" v-else justify="end">
  70. <el-col :span="22">
  71. <el-row type="flex" style="margin-top: 10px;margin-right: 5px;opacity: 0.2;" justify="end">
  72. <el-col :span="6"><span>{{ item.sender }}</span></el-col>
  73. <el-col :span="7"><span>{{ item.createTime | dataFormat('yyyy-MM-dd HH:mm') }}</span></el-col>
  74. </el-row>
  75. <el-row type="flex" justify="end" style="margin-right: 8px;margin-top: 5px;">
  76. <el-col :span="14" style="margin-right: 8px;">
  77. <el-card style="padding: 8px 5px;">
  78. {{ item.msg }}
  79. </el-card>
  80. </el-col>
  81. </el-row>
  82. </el-col>
  83. <el-col :span="2">
  84. <el-avatar shape="square" size="medium" style="float: right;margin: 10px 20px 0px 0px;">客户</el-avatar>
  85. </el-col>
  86. </el-row>
  87. </div>
  88. </el-col>
  89. <el-col :span="24" v-else>
  90. <div v-for="item in chatMsgList">
  91. <el-row type="flex" style="margin-bottom: 20px;" v-if="username===item.sender">
  92. <el-col :span="2">
  93. <el-avatar shape="square" size="medium" style="float: right;margin: 10px 20px 0px 0px;">客户</el-avatar>
  94. </el-col>
  95. <el-col :span="22">
  96. <el-row type="flex" style="margin-top: 10px;opacity: 0.2;margin-left: 20px;">
  97. <el-col :span="7"><span style="padding-left: 5px;">{{ item.sender }}</span></el-col>
  98. <el-col :span="7"><span>{{ item.createTime | dataFormat('yyyy-MM-dd HH:mm') }}</span></el-col>
  99. </el-row>
  100. <el-row>
  101. <el-col :span="14">
  102. <el-card style="padding: 8px 5px;">
  103. {{ item.msg }}
  104. </el-card>
  105. </el-col>
  106. </el-row>
  107. </el-col>
  108. </el-row>
  109. <el-row type="flex" style="margin-bottom: 20px;" v-else justify="end">
  110. <el-col :span="22">
  111. <el-row type="flex" style="margin-top: 10px;margin-right: 5px;opacity: 0.2;" justify="end">
  112. <el-col :span="6"><span>{{ item.sender }}</span></el-col>
  113. <el-col :span="7"><span>{{ item.createTime | dataFormat('yyyy-MM-dd HH:mm') }}</span></el-col>
  114. </el-row>
  115. <el-row type="flex" justify="end" style="margin-top: 5px;">
  116. <el-col :span="14">
  117. <el-card style="padding: 8px 5px;">
  118. {{ item.msg }}
  119. </el-card>
  120. </el-col>
  121. </el-row>
  122. </el-col>
  123. <el-col :span="2">
  124. <img alt="ATP客服" src="@/assets/logo.png" style="width:30px;height:30px;margin: 10px 0px 0px 20px;"/>
  125. </el-col>
  126. </el-row>
  127. </div>
  128. </el-col>
  129. </el-row>
  130. </div>
  131. <div class="operate" v-if="username==='super_admin'">
  132. <el-input
  133. type="textarea"
  134. :autosize="{ minRows: 3, maxRows: 3}"
  135. placeholder="您好!这里是ATP客服部,我是客服小美,很高兴为您服务!"
  136. v-model="msg">
  137. </el-input>
  138. <el-button type="success" size="mini" style="float: right;margin-top: 5px;" @click="sendMsg"
  139. :disabled="!(msg && active)">
  140. 发送
  141. </el-button>
  142. </div>
  143. <div class="operate" v-else>
  144. <el-input
  145. type="textarea"
  146. :autosize="{ minRows: 3, maxRows: 3}"
  147. placeholder="您好!这里是ATP客服部,我是客服小美,很高兴为您服务!"
  148. v-model="msg">
  149. </el-input>
  150. <el-button type="success" size="mini" style="float: right;margin-top: 5px;" @click="sendMsg" :disabled="!msg">
  151. 发送
  152. </el-button>
  153. </div>
  154. </el-card>
  155. </div>
  156. </template>
  157. <script>
  158. export default {
  159. name: "ClientMqChat",
  160. data() {
  161. return {
  162. msg: '',
  163. chatMsgList: [],
  164. username: sessionStorage.getItem("username"),
  165. friend: [],
  166. online: 0,
  167. active: '',
  168. receiver: 'super_admin',
  169. userList: [],
  170. activeName: 'online',
  171. msgNotify:[],
  172. }
  173. },
  174. created() {
  175. this.getWebSocketAddress();
  176. },
  177. methods: {
  178. //tab切换
  179. handleClick(tab, event) {
  180. const _this = this;
  181. if (tab.name === 'online') {
  182. if (!_this.active) {
  183. if (_this.online > 0) {
  184. _this.active = _this.friend[0];
  185. _this.activeName = 'online';
  186. _this.receiver = _this.active;
  187. _this.getHistoryChat(_this.receiver);
  188. } else {
  189. if (_this.userList.length > 0) {
  190. _this.active = _this.userList[0];
  191. _this.activeName = 'all';
  192. _this.receiver = _this.active;
  193. _this.getHistoryChat(_this.receiver);
  194. }
  195. }
  196. }
  197. }
  198. if (tab.name === 'all') {
  199. if (!_this.active) {
  200. if (_this.online > 0) {
  201. _this.active = _this.friend[0];
  202. _this.activeName = 'online';
  203. _this.receiver = _this.active;
  204. _this.getHistoryChat(_this.receiver);
  205. } else {
  206. if (_this.userList.length > 0) {
  207. _this.active = _this.userList[0];
  208. _this.activeName = 'all';
  209. _this.receiver = _this.active;
  210. _this.getHistoryChat(_this.receiver);
  211. }
  212. }
  213. }
  214. }
  215. },
  216. //切换用户
  217. switchUser(data) {
  218. if (this.active === data) {
  219. return;
  220. }
  221. this.active = data;
  222. this.receiver = data;
  223. //获取历史聊天记录
  224. this.getHistoryChat(this.receiver);
  225. this.msgNotify = this.msgNotify.filter(item => item != this.active);
  226. },
  227. //获取历史聊天记录
  228. getHistoryChat(data) {
  229. this.$http.get('/test/mq/chatMsg/getHistoryChat/' + data).then(res => {
  230. if (res.data.code === 1) {
  231. this.chatMsgList = res.data.data;
  232. this.flushScroll();
  233. } else {
  234. this.$message.warning(res.data.msg);
  235. }
  236. }).catch(error => {
  237. this.$message.error(error);
  238. });
  239. },
  240. //获取websocket地址
  241. getWebSocketAddress() {
  242. this.$http.get('/test/mq/chatMsg/getWebSocketAddress/' + this.username).then(res => {
  243. if (res.data.code === 1) {
  244. if ('WebSocket' in window) {
  245. this.websocket = new WebSocket(res.data.data);
  246. this.initWebSocket();
  247. if (this.username != 'super_admin') {
  248. this.getHistoryChat(this.username);
  249. }
  250. } else {
  251. this.$message.warning('当前浏览器不支持websocket创建!');
  252. }
  253. } else {
  254. this.$message.warning(res.data.msg);
  255. }
  256. }).catch(error => {
  257. this.$message.error(error);
  258. });
  259. },
  260. //初始化websocket
  261. initWebSocket() {
  262. const _this = this;
  263. _this.websocket.onerror = function (event) {
  264. _this.$message.error('服务端连接错误!');
  265. }
  266. _this.websocket.onopen = function (event) {
  267. _this.$message.success("连接成功!");
  268. }
  269. _this.websocket.onmessage = function (event) {
  270. let res = JSON.parse(event.data);
  271. if (res.key === 1) {
  272. _this.userList = res.userList;
  273. _this.friend = res.onlineList;
  274. _this.online = _this.friend.length;
  275. if (!_this.active) {
  276. if (_this.online > 0) {
  277. _this.active = _this.friend[0];
  278. _this.activeName = 'online';
  279. _this.receiver = _this.active;
  280. _this.getHistoryChat(_this.receiver);
  281. } else {
  282. if (_this.userList.length > 0) {
  283. _this.active = _this.userList[0];
  284. _this.activeName = 'all';
  285. _this.receiver = _this.active;
  286. _this.getHistoryChat(_this.receiver);
  287. }
  288. }
  289. }
  290. }
  291. if (res.key === 2) {
  292. _this.userList = res.userList;
  293. _this.friend = res.onlineList;
  294. _this.online = _this.friend.length;
  295. if (!_this.active) {
  296. if (_this.online > 0) {
  297. _this.active = _this.friend[0];
  298. _this.activeName = 'online';
  299. _this.receiver = _this.active;
  300. _this.getHistoryChat(_this.receiver);
  301. } else {
  302. if (_this.userList.length > 0) {
  303. _this.active = _this.userList[0];
  304. _this.activeName = 'all';
  305. _this.receiver = _this.active;
  306. _this.getHistoryChat(_this.receiver);
  307. }
  308. }
  309. }
  310. }
  311. if (res.key === 3) {
  312. if (_this.username === res.data.sender) {
  313. _this.chatMsgList.push(res.data);
  314. _this.flushScroll();
  315. } else {
  316. if (res.data.sender === 'super_admin') {
  317. _this.chatMsgList.push(res.data);
  318. _this.flushScroll();
  319. } else {
  320. if (res.data.sender === _this.active) {
  321. _this.chatMsgList.push(res.data);
  322. _this.flushScroll();
  323. } else {
  324. //发送其它用户处理
  325. _this.msgNotify.push(res.data.sender);
  326. }
  327. }
  328. }
  329. }
  330. }
  331. _this.websocket.onclose = function (event) {
  332. _this.$message.warning('服务端已关闭!');
  333. }
  334. },
  335. //发送消息
  336. sendMsg() {
  337. if (this.msg.trim().length === 0) {
  338. this.$message.warning('不能发送空消息!');
  339. return;
  340. }
  341. let chatMsg = {};
  342. chatMsg.msg = this.msg;
  343. chatMsg.sender = this.username;
  344. chatMsg.createTime = new Date();
  345. chatMsg.receiver = this.receiver;
  346. this.websocket.send(JSON.stringify(chatMsg));
  347. this.msg = '';
  348. this.flushScroll();
  349. },
  350. //刷新滚动条
  351. flushScroll() {
  352. let content = this.$refs.content;
  353. setTimeout(() => {
  354. content.scrollTop = content.scrollHeight;
  355. }, 100);
  356. },
  357. }
  358. }
  359. </script>
  360. <style scoped lang="scss">
  361. .container {
  362. padding-top: 50px;
  363. .box-card {
  364. margin: auto;
  365. width: 800px;
  366. height: 800px;
  367. max-height: 900px;
  368. ::v-deep .el-card__header {
  369. background: #867ba9 !important;
  370. border-bottom: none;
  371. padding: 0;
  372. }
  373. ::v-deep .el-card__body {
  374. padding: 0px !important;
  375. position: relative;
  376. .content {
  377. height: 600px;
  378. background: #ddd;
  379. overflow-y: auto;
  380. .el-divider--horizontal {
  381. margin: 0;
  382. }
  383. .active {
  384. color: #0080ff;
  385. }
  386. .mark {
  387. background: #deb068;
  388. }
  389. .item {
  390. margin-top: 10px;
  391. margin-right: 10px;
  392. }
  393. }
  394. .operate {
  395. padding: 5px 15px;
  396. }
  397. }
  398. }
  399. }
  400. </style>

  • ​​​​​​​ 启动前后端项目,分别使用客服账号和客户账号登录聊天室

  • 聊天消息

 

 

结语

至此,关于实现微服务的websocket聊天室到这里就结束了,下期见。。。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/blog/article/detail/49691
推荐阅读
相关标签
  

闽ICP备14008679号