当前位置:   article > 正文

Java版的WebSocket_javax.websocket

javax.websocket

项目背景介绍:公司新开发的APP,需要使用系统公告功能,实现实时更新系统公告功能。

 公告的修改功能,由PC端的运营管理界面来进行修改更新发布。当修改完后,要保证APP端,在用户不刷新或重新登录APP的情况下,更新公告内容。

项目的实际架构部署:

负载均衡的后面有两台服务器,部署的服务都是一样的。通过nginx来代理转发请求的具体服务。

多实例部署的情况下,会造成websocket的session不能共享。这里采用的是:redis的发布和订阅功能,实现修改后的公告消息在每台服务器上都能收到,然后再获取每台机器上的websocket会话,给APP端发送消息。 

  1. public class Constants
  2. {
  3. /** redis 订阅消息通道标识*/
  4. public final static String REDIS_CHANNEL = "onMessage";
  5. public final static String REDIS_CHANNEL_CLOSE="close";
  6. public final static String REDIS_CHANNEL_SEND="send";
  7. }
  1. @Autowired
  2. private StringRedisTemplate stringRedisTemplate;
  3. private void sendMessage(String message) {
  4. String newMessge= null;
  5. try {
  6. newMessge = new String(message.getBytes(Constants.UTF8), Constants.UTF8);
  7. } catch (UnsupportedEncodingException e) {
  8. e.printStackTrace();
  9. }
  10. // Map<String,String> map = new HashMap<String, String>();
  11. // map.put(Constants.REDIS_MESSAGE_KEY, key);
  12. // map.put(Constants.REDIS_MESSAGE_VALUE, newMessge);
  13. stringRedisTemplate.convertAndSend(Constants.REDIS_CHANNEL, newMessge);//发布消息
  14. System.out.println("将消息: "+newMessge+ "发布出去...");
  15. }
  1. package cn.com.yuanquanyun.config;
  2. import cn.com.yuanquanyun.client.service.ws.WebSocketServer;
  3. import cn.com.yuanquanyun.common.constant.Constants;
  4. import cn.com.yuanquanyun.common.utils.StringUtils;
  5. import com.alibaba.fastjson.JSON;
  6. import com.alibaba.fastjson.JSONObject;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.data.redis.connection.Message;
  11. import org.springframework.data.redis.connection.MessageListener;
  12. import org.springframework.stereotype.Component;
  13. import java.io.IOException;
  14. import java.util.Collection;
  15. import java.util.Iterator;
  16. import java.util.concurrent.ConcurrentHashMap;
  17. /**
  18. * 消息监听对象,接收订阅消息 -- 每台服务器都订阅公告消息
  19. */
  20. @Component
  21. public class RedisReceiver implements MessageListener {
  22. Logger log = LoggerFactory.getLogger(this.getClass());
  23. //@Autowired
  24. //private WebSocketServer webSocketServer;
  25. /**
  26. * 处理接收到的订阅消息
  27. */
  28. @Override
  29. public void onMessage(Message message, byte[] pattern) {
  30. String channel = new String(message.getChannel());// 订阅的频道名称
  31. String msg = "";
  32. try {
  33. msg = new String(message.getBody(), Constants.UTF8);//注意与发布消息编码一致,否则会乱码
  34. System.out.println("打印订阅的消息:" + msg);
  35. if (!StringUtils.isEmpty(msg)) {
  36. if (Constants.REDIS_CHANNEL.endsWith(channel))// 最新消息
  37. {
  38. //JSONObject jsonObject = JSON.parseObject(msg);
  39. //将更新后的公告推送给在线的用户手机端
  40. ConcurrentHashMap<String, WebSocketServer> webSocketMap = WebSocketServer.getWebSocketMap();
  41. Collection<WebSocketServer> values = webSocketMap.values();
  42. System.err.println("values ===" + values.size());
  43. Iterator<WebSocketServer> it = values.iterator();
  44. while (it.hasNext()) {
  45. WebSocketServer ws = it.next();
  46. try {
  47. ws.sendMessage(msg);
  48. } catch (IOException e) {
  49. System.err.println("sendMessage_error=" + e);
  50. }
  51. }
  52. // webSocketServer.sendMessageByWayBillId(
  53. // Long.parseLong(jsonObject.get(Constants.REDIS_MESSAGE_KEY).toString())
  54. // ,jsonObject.get(Constants.REDIS_MESSAGE_VALUE).toString());
  55. } else {
  56. //TODO 其他订阅的消息处理
  57. }
  58. } else {
  59. log.info("消息内容为空,不处理。");
  60. }
  61. } catch (Exception e) {
  62. log.error("处理消息异常:" + e.toString());
  63. e.printStackTrace();
  64. }
  65. }
  66. }
  1. package cn.com.yuanquanyun.config;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.web.socket.config.annotation.EnableWebSocket;
  5. import org.springframework.web.socket.server.standard.ServerEndpointExporter;
  6. /**
  7. * 开启websocket支持
  8. * @author Administrator
  9. */
  10. @Configuration
  11. //@EnableWebSocket
  12. public class WebSocketConfig {
  13. @Bean
  14. public ServerEndpointExporter serverEndpointExporter() {
  15. System.err.println("WebSocketConfig 初始化 ");
  16. return new ServerEndpointExporter();
  17. }
  18. }
<!-- springboot集成websocket -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
  1. package cn.com.yuanquanyun.client.service.ws;
  2. import cn.com.yuanquanyun.client.dto.CustNotice;
  3. import cn.com.yuanquanyun.client.service.impl.CustNoticeServiceImpl;
  4. import cn.com.yuanquanyun.common.utils.spring.SpringUtils;
  5. import cn.hutool.log.Log;
  6. import cn.hutool.log.LogFactory;
  7. import org.springframework.stereotype.Component;
  8. import javax.websocket.*;
  9. import javax.websocket.server.PathParam;
  10. import javax.websocket.server.ServerEndpoint;
  11. import java.io.IOException;
  12. import java.util.concurrent.ConcurrentHashMap;
  13. /**
  14. * 此方法使用于工程部署是单例的情况下,若是多例的情况下,存在userId不在一台机器上,webSocketMap中查询不到。
  15. * 多机的情况下,要考虑数据共享的问题,可采用redis发布和订阅来解决.
  16. * @author Administrator
  17. *
  18. */
  19. @Component
  20. @ServerEndpoint("/ws/{userId}")
  21. public class WebSocketServer {
  22. static Log log=LogFactory.get(WebSocketServer.class);
  23. /**静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。*/
  24. // private static int onlineCount = 0;
  25. /**concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。*/
  26. private static ConcurrentHashMap<String,WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
  27. /**与某个客户端的连接会话,需要通过它来给客户端发送数据*/
  28. private Session session;
  29. /**接收userId*/
  30. private String userId="";
  31. public WebSocketServer() {
  32. System.out.println("WebSocketServer 初始化... hashCode="+this.hashCode());
  33. }
  34. public static ConcurrentHashMap<String,WebSocketServer> getWebSocketMap(){
  35. return webSocketMap;
  36. }
  37. /**
  38. * 连接建立成功调用的方法 只调一次
  39. *
  40. */
  41. @OnOpen
  42. public void onOpen(Session session,@PathParam("userId") String userId) {
  43. System.out.println("onOpen hashCode="+this.hashCode()+", userId="+userId);//根据打印的hashCode每建立一个连接就会生成一个新的WebSocketServer对象,与对应的userId对应
  44. this.session = session;
  45. this.userId=userId;
  46. if(webSocketMap.containsKey(userId)){
  47. webSocketMap.remove(userId);
  48. webSocketMap.put(userId,this);
  49. //加入set中
  50. }else{
  51. webSocketMap.put(userId,this);
  52. //加入set中
  53. //addOnlineCount();
  54. //在线数加1
  55. }
  56. //log.info("用户连接:"+userId+",当前在线人数为:" + getOnlineCount());
  57. try {
  58. // System.out.println("custNoticeService======"+SpringUtils.getBean(CustNoticeServiceImpl.class));
  59. CustNotice custNotice = SpringUtils.getBean(CustNoticeServiceImpl.class).selectNotice();
  60. // CustNotice custNotice = custNoticeService.selectNotice();
  61. // System.out.println("custNotice======"+custNotice.toString());
  62. sendMessage(custNotice.getNoticeContent());
  63. } catch (IOException e) {
  64. log.error("用户:"+userId+",网络异常!!!!!!");
  65. }
  66. }
  67. /**
  68. * 收到客户端消息后调用的方法
  69. *
  70. * @param message 客户端发送过来的消息*/
  71. @OnMessage
  72. public void onMessage(String message, Session session) {
  73. /*System.out.println("onMessage hashcode=="+this.hashCode());
  74. log.info("onMessage 用户消息:"+userId+",报文:"+message);
  75. //可以群发消息
  76. //消息保存到数据库、redis
  77. if(StringUtils.isNotBlank(message)){
  78. try {
  79. //解析发送的报文
  80. JSONObject jsonObject = JSON.parseObject(message);
  81. //追加发送人(防止串改)
  82. jsonObject.put("fromUserId",this.userId);
  83. String toUserId=jsonObject.getString("toUserId");
  84. //传送给对应toUserId用户的websocket
  85. if(StringUtils.isNotBlank(toUserId)&&webSocketMap.containsKey(toUserId)){
  86. webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString());
  87. }else{
  88. log.error("请求的userId:"+toUserId+"不在该服务器上");
  89. //否则不在这个服务器上,发送到mysql或者redis
  90. }
  91. }catch (Exception e){
  92. e.printStackTrace();
  93. }
  94. }*/
  95. }
  96. /**
  97. * 页面socket连接关闭,自动调用的方法
  98. */
  99. @OnClose
  100. public void onClose() {
  101. if(webSocketMap.containsKey(userId)){
  102. webSocketMap.remove(userId);
  103. //从set中删除
  104. // subOnlineCount();
  105. }
  106. // log.info("用户退出:"+userId+",当前在线人数为:" + getOnlineCount());
  107. }
  108. /**
  109. *
  110. * @param session
  111. * @param error
  112. */
  113. @OnError
  114. public void onError(Session session, Throwable error) {
  115. log.error("用户错误:"+this.userId+",原因:"+error.getMessage());
  116. error.printStackTrace();
  117. }
  118. /**
  119. * 实现服务器主动推送
  120. */
  121. public void sendMessage(String message) throws IOException {
  122. this.session.getBasicRemote().sendText(message);
  123. }
  124. /**
  125. * 发送自定义消息
  126. */
  127. /*public static void sendInfo(String message,@PathParam("userId") String userId) throws IOException {
  128. log.info("发送消息到:"+userId+",报文:"+message);
  129. if(StringUtils.isNotBlank(userId)&&webSocketMap.containsKey(userId)){
  130. webSocketMap.get(userId).sendMessage(message);
  131. }else{
  132. log.error("用户"+userId+",不在线!");
  133. }
  134. }*/
  135. }
  1. <html>
  2. <head>
  3. <meta charset="utf-8">
  4. <title>websocket通讯</title>
  5. </head>
  6. <script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.js"></script>
  7. <script>
  8. var socket;
  9. function openSocket() {
  10. if (typeof (WebSocket) == "undefined") {
  11. console.log("您的浏览器不支持WebSocket");
  12. } else {
  13. console.log("您的浏览器支持WebSocket");
  14. //实现化WebSocket对象,指定要连接的服务器地址与端口 建立连接
  15. //等同于socket = new WebSocket("ws://localhost:8888/xxxx/im/25");
  16. //var socketUrl="${request.contextPath}/im/"+$("#userId").val();
  17. // var socketUrl = "https://localhost:9093/websocket/" + $("#userId").val();
  18. // var socketUrl = "wss://localhost:8096/v3/ws/" + $("#userId").val();
  19. // var socketUrl = "wss://yu_ming:8096/v3/ws/" + $("#userId").val();
  20. var socketUrl = "wss://yu_ming:8093/websk/ws/" + $("#userId").val();
  21. socketUrl = socketUrl.replace("https", "wss").replace("http", "ws");
  22. console.log(socketUrl);
  23. if (socket != null) {
  24. socket.close();
  25. socket = null;
  26. }
  27. socket = new WebSocket(socketUrl);
  28. //打开事件
  29. socket.onopen = function () {
  30. console.log("websocket已打开");
  31. //socket.send("这是来自客户端的消息" + location.href + new Date());
  32. };
  33. //获得消息事件
  34. socket.onmessage = function (msg) {
  35. console.log(msg.data);
  36. //发现消息进入 开始处理前端触发逻辑
  37. };
  38. //关闭事件
  39. socket.onclose = function () {
  40. console.log("websocket已关闭");
  41. };
  42. //发生了错误事件
  43. socket.onerror = function () {
  44. console.log("websocket发生了错误");
  45. }
  46. }
  47. }
  48. function sendMessage() {
  49. if (typeof (WebSocket) == "undefined") {
  50. console.log("您的浏览器不支持WebSocket");
  51. } else {
  52. console.log("您的浏览器支持WebSocket");
  53. console.log($("#contentText").val());
  54. socket.send($("#contentText").val());
  55. }
  56. }
  57. </script>
  58. <body>
  59. <p>【userId】:
  60. <div><input id="userId" name="userId" type="text" value="10"></div>
  61. <p>【toUserId】:
  62. <div><input id="toUserId" name="toUserId" type="text" value="20"></div>
  63. <p>【toUserId】:
  64. <div><input id="contentText" name="contentText" type="text" value="hello websocket"></div>
  65. <p>【操作】:
  66. <div><a onclick="openSocket()">开启socket</a></div>
  67. <p>【操作】:
  68. <div><a onclick="sendMessage()">发送消息</a></div>
  69. </body>
  70. </html>

通过html界面功能可以模拟和后端建立websocket长连接。使用wss时app端要通过域名来访问。

这里有个小插曲:

生产环境app端请求后台是通过nginx代理转发到服务端的。测试环境没有通过nginx转发。

这里的nginx下的nginx.conf文件里需要配置一下支持websocket协议。

  1. #user nobody;
  2. worker_processes 1;
  3. #error_log logs/error.log;
  4. #error_log logs/error.log notice;
  5. #error_log logs/error.log info;
  6. #pid logs/nginx.pid;
  7. events {
  8. worker_connections 1024;
  9. }
  10. http {
  11. include mime.types;
  12. default_type application/octet-stream;
  13. #log_format main '$remote_addr - $remote_user [$time_local] "$request" '
  14. # '$status $body_bytes_sent "$http_referer" '
  15. # '"$http_user_agent" "$http_x_forwarded_for"';
  16. #access_log logs/access.log main;
  17. sendfile on;
  18. #tcp_nopush on;
  19. #keepalive_timeout 0;
  20. keepalive_timeout 65;
  21. #gzip on;
  22. map $http_upgrade $connection_upgrade {
  23. default upgrade;
  24. '' close;
  25. }
  26. server {
  27. listen 8093;
  28. server_name localhost;
  29. #charset koi8-r;
  30. #access_log logs/host.access.log main;
  31. location /v2 {
  32. proxy_pass http://10.0.0.10:8082/v2;
  33. }
  34. location /v3 {
  35. proxy_pass http://10.0.0.10:8083/v3;
  36. }
  37. location /websk {
  38. proxy_pass http://10.0.0.10:8083/v3;
  39. proxy_read_timeout 300s;
  40. proxy_send_timeout 300s;
  41. proxy_set_header Host $host;
  42. proxy_set_header X-real-ip $remote_addr;
  43. proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
  44. proxy_http_version 1.1;
  45. proxy_set_header Upgrade $http_upgrade;
  46. proxy_set_header Connection $connection_upgrade;
  47. }
  48. location /404 {
  49. index index.html index.htm;
  50. root /usr/share/nginx/html;
  51. error_page 404 /index.html;
  52. }
  53. location / {
  54. index index.html index.htm;
  55. root /usr/share/nginx/html;
  56. error_page 404 /index.html;
  57. }
  58. # redirect server error pages to the static page /50x.html
  59. #
  60. #error_page 500 502 503 504 /50x.html;
  61. location = /50x.html {
  62. root html;
  63. }
  64. # proxy the PHP scripts to Apache listening on 127.0.0.1:80
  65. #
  66. #location ~ \.php$ {
  67. # proxy_pass http://127.0.0.1;
  68. #}
  69. # pass the PHP scripts to FastCGI server listening on 127.0.0.1:9000
  70. #
  71. #location ~ \.php$ {
  72. # root html;
  73. # fastcgi_pass 127.0.0.1:9000;
  74. # fastcgi_index index.php;
  75. # fastcgi_param SCRIPT_FILENAME /scripts$fastcgi_script_name;
  76. # include fastcgi_params;
  77. #}
  78. # deny access to .htaccess files, if Apache's document root
  79. # concurs with nginx's one
  80. #
  81. #location ~ /\.ht {
  82. # deny all;
  83. #}
  84. }
  85. # another virtual host using mix of IP-, name-, and port-based configuration
  86. #
  87. #server {
  88. # listen 8000;
  89. # listen somename:8080;
  90. # server_name somename alias another.alias;
  91. # location / {
  92. # root html;
  93. # index index.html index.htm;
  94. # }
  95. #}
  96. # HTTPS server
  97. #
  98. #server {
  99. # listen 443 ssl;
  100. # server_name localhost;
  101. # ssl_certificate cert.pem;
  102. # ssl_certificate_key cert.key;
  103. # ssl_session_cache shared:SSL:1m;
  104. # ssl_session_timeout 5m;
  105. # ssl_ciphers HIGH:!aNULL:!MD5;
  106. # ssl_prefer_server_ciphers on;
  107. # location / {
  108. # root html;
  109. # index index.html index.htm;
  110. # }
  111. #}
  112. }

里面的关键部分是:

  1. map $http_upgrade $connection_upgrade {
  2. default upgrade;
  3. '' close;
  4. }
  5. location /websk {
  6. proxy_pass http://10.0.0.10:8083/v3;
  7. proxy_read_timeout 300s;
  8. proxy_send_timeout 300s;
  9. proxy_set_header Host $host;
  10. proxy_set_header X-real-ip $remote_addr;
  11. proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
  12. proxy_http_version 1.1;
  13. proxy_set_header Upgrade $http_upgrade;
  14. proxy_set_header Connection $connection_upgrade;
  15. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/249154
推荐阅读
相关标签
  

闽ICP备14008679号