当前位置:   article > 正文

spring boot 集成websocket + netty_netty-websocket-spring-boot-starter 与springboot集成

netty-websocket-spring-boot-starter 与springboot集成

pom 配置

		<!-- WebSocket -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-websocket</artifactId>
		</dependency>

		<!-- 启用Netty https://mvnrepository.com/artifact/org.yeauty/netty-websocket-spring-boot-starter -->
		<dependency>
			<groupId>org.yeauty</groupId>
			<artifactId>netty-websocket-spring-boot-starter</artifactId>
			<version>0.7.6</version>
		</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

config配置 :开启WebSocket支持

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * 开启WebSocket支持
 * Created by yj on 2020/3/30.
 */
@Configuration
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

netty-websocket 发送消息和接受消息

package com.ruoyi.framework.config.webSocket;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ruoyi.common.utils.SecurityUtils;
import com.ruoyi.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.yeauty.annotation.ServerEndpoint;

import javax.annotation.PostConstruct;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;


/**
 * 注意这里的注解@ServerEndpoint是org.yeauty路径下的,别引用错了包。
 * Created by yj on 2020/3/30.
 */
@ServerEndpoint(path = "/imserver",host = "8081")
@Component
public class WebSocketServer {

    private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);
    /**静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。*/
    private static int onlineCount = 0;
    /**concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。*/
    private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
    /**concurrent包的线程安全Set,用来存放每个客户端对应的心跳对象。*/
    private static ConcurrentHashMap<String, Long> webSocketHeartCheck = new ConcurrentHashMap<String, Long>();
    /**与某个客户端的连接会话,需要通过它来给客户端发送数据*/
    private Session session;
    /**接收userId*/
    private Long userId;

    public Long getUserId() {
        return userId;
    }

    public void setUserId(Long userId) {
        this.userId = userId;
    }

    public static ConcurrentHashMap<String, WebSocketServer> getWebSocketMap() {
        return webSocketMap;
    }

    public static void setWebSocketMap(ConcurrentHashMap<String, WebSocketServer> webSocketMap) {
        WebSocketServer.webSocketMap = webSocketMap;
    }



    /**
     * 连接建立成功调用的方法*/
    @OnOpen
    public void onOpen(Session session) {

/*        SysUser user = ShiroUtils.getSysUser();
        String userId= String.valueOf(user.getUserId());*/
        this.session = session;
        Long userId = SecurityUtils.getLoginUser().getUser().getUserId();
        this.userId= userId;
        if(webSocketMap.containsKey(this.userId)){
            webSocketMap.remove(this.userId);
            webSocketMap.put(String.valueOf(this.userId),this);
            webSocketHeartCheck.put(String.valueOf(this.userId),System.currentTimeMillis());
            //加入set中
        }else{
            webSocketMap.put(String.valueOf(this.userId),this);
            webSocketHeartCheck.put(String.valueOf(this.userId),System.currentTimeMillis());
            //加入set中
            addOnlineCount();
            //在线数加1
        }
        log.info("用户连接:"+ this.userId +",当前在线人数为:" + getOnlineCount());

        try {
            sendMessage("连接成功");
        } catch (IOException e) {
            log.error("用户:"+ this.userId +",网络异常!!!!!!");
        }
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        if(webSocketMap.containsKey(userId)){
            webSocketMap.remove(userId);
            //从set中删除
            subOnlineCount();
        }
        log.info("用户退出:"+userId+",当前在线人数为:" + getOnlineCount());
    }

    @PostConstruct
    public void checkClientLiving(){
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.print(123);
                while (true) {
                    // ------- ends here
                    try {
                        for (ConcurrentHashMap.Entry<String,Long> heartCheck : webSocketHeartCheck.entrySet()) {
                            if (System.currentTimeMillis() - heartCheck.getValue() > 600000){
                                webSocketMap.remove(heartCheck.getKey());
                                webSocketHeartCheck.remove(heartCheck.getKey());
                                //从set中删除
                                subOnlineCount();
                            }
                        }
                        Thread.sleep(600000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        Thread thread = new Thread(runnable);
        thread.start();
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息*/
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("用户消息:"+userId+",报文:"+message);
        //可以群发消息
        //消息保存到数据库、redis
        if(StringUtils.isNotBlank(message)){
            try {
                //解析发送的报文
                JSONObject jsonObject = JSON.parseObject(message);
                //追加发送人(防止串改)
                jsonObject.put("fromUserId",this.userId);
                String toUserId=jsonObject.getString("toUserId");
                //传送给对应toUserId用户的websocket
                if(StringUtils.isNotBlank(toUserId)&&webSocketMap.containsKey(toUserId)){
                    webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString());
                    webSocketHeartCheck.put(String.valueOf(userId),System.currentTimeMillis());
                }else{
                    log.error("请求的userId:"+toUserId+"不在该服务器上");
                    //否则不在这个服务器上,发送到mysql或者redis
                }
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    /**
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("用户错误:"+this.userId+",原因:"+error.getMessage());
        error.printStackTrace();
    }
    /**
     * 实现服务器主动推送
     */
    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }


    /**
     * 发送自定义模糊配备消息
     * */
    public static void sendInfoId(String message,@PathParam("userId") String userId) throws IOException {
        log.info("发送消息到:"+userId+",报文:"+message);
        if(StringUtils.isNotBlank(userId)){
            String userKey = userId + "-";
            for (ConcurrentHashMap.Entry<String, WebSocketServer> webMap : webSocketMap.entrySet()) {
                if(webMap.getKey().indexOf(userKey)>-1){
                    webSocketMap.get(webMap.getKey()).sendMessage(message);
                }
            }
        }else{
            log.error("用户"+userId+",不在线!");
        }
    }
    /**
     * 发送自定义消息
     * */
    public static void sendUserId(String message,@PathParam("userId") String userId) throws IOException {
        log.info("发送消息到:"+userId+",报文:"+message);
        if(StringUtils.isNotBlank(userId)&&webSocketMap.containsKey(userId)){
            webSocketMap.get(userId).sendMessage(message);
        }else{
            log.error("用户"+userId+",不在线!");
        }
    }

    /**
     * 群发自定义消息
     */
    public static void sendInfoUser(String message) throws IOException {
        for (ConcurrentHashMap.Entry<String, WebSocketServer> webMap : webSocketMap.entrySet()) {
            webMap.getValue().sendMessage(message);
        }
    }

    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    public static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount++;
    }

    public static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount--;
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226

前端


var prefix = ctx + "business/announcement/";
function getAnnouncementCnt(){
    var socket;
    var outTradeNo = new Date().getTime() + Math.floor(Math.random()*100);
    var userId = $("#userId").val()+"-"+outTradeNo
    if(window.WebSocket) {
        var wsUrl = "wss://127.0.0.1:443/imserver/"+userId;
        console.log(wsUrl)
        socket = new WebSocket(wsUrl);
        //打开事件
        socket.onopen = function() {

            console.log("websocket已打开");
            AnnouncementCnt();
            //心跳检测重置
            heartCheck.start();
        };
        socket.onmessage = function (event) {
            if (event.data == '有新消息'){
                console.log("123456");
                AnnouncementCnt();
                var contentWindow= $('iframe[data-id="/business/announcement"]')[0].contentWindow;
                contentWindow.indexPublic();
                contentWindow.indexSystem();
            }
            console.log('接收到消息');
            //拿到任何消息都说明当前连接是正常的
            heartCheck.start();
        }
        //关闭事件
        socket.onclose = function() {
            console.log("websocket已关闭");
            reconnect(wsUrl);
        };
        //发生了错误事件
        socket.onerror = function() {
            console.log("websocket发生了错误");
            reconnect(wsUrl);
        }
    } else {
        AnnouncementCnt();
    }
    //避免重复连接
    var lockReconnect = false;//避免重复连接
    function reconnect(wsUrl) {
        if(lockReconnect) {
            return;
        };
        lockReconnect = true;
        //没连接上会一直重连,设置延迟避免请求过多
        tt && clearTimeout(tt);
        tt = setTimeout(function () {
            getAnnouncementCnt(wsUrl);
            lockReconnect = false;
        }, 4000);
    }

    //心跳检测
    var heartCheck = {
        timeout: 300000, //每隔三秒发送心跳
        num: 3,  //3次心跳均未响应重连
        timeoutObj: null,
        serverTimeoutObj: null,
        start: function(){
            var _this = this;
            var _num = this.num;
            this.timeoutObj && clearTimeout(this.timeoutObj);
            this.serverTimeoutObj && clearTimeout(this.serverTimeoutObj);
            this.timeoutObj = setTimeout(function(){
                //这里发送一个心跳,后端收到后,返回一个心跳消息,
                //onmessage拿到返回的心跳就说明连接正常
                socket.send('{"toUserId":"'+userId+'"}'); // 心跳包
                _num--;
                //计算答复的超时次数
                if(_num === 0) {
                    socket.colse();
                }
            }, this.timeout)
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/404202
推荐阅读
相关标签
  

闽ICP备14008679号