当前位置:   article > 正文

websocket心跳机制加分发策略_websocket 心跳和房间分配

websocket 心跳和房间分配

webscoket心跳机制

问题websocket长连接断开问题

心跳机制每隔一段时间进行一次连接关闭,连接重连保证websocket实现长连接不断开。

我这里事件设置1小时,js代码如下

//js代码
var pathTotal="102020"
        var url="ws://172.16.28.250:8088/websocket/"+pathTotal
        var socket;
        var lockReconnect = false; //避免socket重复连接

        if (!window.WebSocket) {
            window.WebSocket = window.MozWebSocket;
        }

        if (window.WebSocket) {//判断当前设备是否支持websocket
            socket = new WebSocket(url);
            socket.onclose = function (event) {
                reconnect(url);  //尝试重新连接
                console.log(event);
                console.log('连接关闭')
            };
            socket.onerror = function () {
                reconnect(url);   //尝试重新连接
                console.log(event);
                console.log('连接错误')
            };
            socket.onopen = function (event) {
                heartCheck.reset().start(); //心跳检测重置
                console.log(event);
                console.log('连接开启')
            };
            socket.onmessage = function (event) {
                heartCheck.reset().start(); //心跳检测重置
                console.log(event);
                console.log('消息接收到了,只要有接收到消息,连接都是正常的')
                if (event.data != 'Msg') {
                    let data = JSON.parse(event.data);
                }
                    setMessageInnerHTML2(event.data,suffix)



            };
        } else {
            alert.log("你的浏览器不支持WebSocket!");
        }

        // // 监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
        window.onbeforeunload = function () {
            socket.close();
        }

        // 重新连接
        function reconnect(url) {
            console.log("重新连接")
            if (lockReconnect) return;
            lockReconnect = true;
            setTimeout(function () { //没连接上会一直重连,设置延迟避免请求过多
                socket = new WebSocket(url);
                lockReconnect = false;
            }, 2000);
        }

        //心跳检测
        var heartCheck = {
            timeout: 55000*60, //1分钟发一次心跳,时间设置小一点较好(50-60秒之间)
            timeoutObj: null,
            serverTimeoutObj: null,
            reset: function () {
                clearInterval(this.timeoutObj);
                clearTimeout(this.serverTimeoutObj);
                return this;
            },
            start: function () {

                var self = this;
                this.timeoutObj = setInterval(function () {
                    console.log("重新连接")

                    //这里发送一个心跳,后端收到后,返回一个心跳消息,
                    //onmessage拿到返回的心跳就说明连接正常
                    socket.send("Msg");
                    self.serverTimeoutObj = setTimeout(function () {//如果超过一定时间还没重置,说明后端主动断开了
                        socket.close(); //如果onclose会执行reconnect,我们执行socket.close()就行了.如果直接执行reconnect 会触发onclose导致重连两次
                    }, self.timeout)
                }, this.timeout)
            }
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
分发策略,消息接受使用rabbitmq通过no来判断位置信息,分发到各个大屏

 /**
     * 连接建立成功调用的方法
     *
     * @param session 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    @OnOpen
    public void onOpen(@PathParam(value = "no") String no,Session session) {
        this.session = session;
        sessions.add(session);
        webSocketSet.remove(no);
        webSocketSet.put(no,this);     //加入set中
        addOnlineCount();           //在线数加1
        log.info("有新连接加入!当前在线人数为" + getOnlineCount());
        String QUEUE_NAME = "GREATHIIT_FACE_TERMINAL";
//        String QUEUE_NAME = "closedPlace";
        try {
            //创建连接连接到MabbitMQ
            ConnectionFactory factory = new ConnectionFactory();
            //设置MabbitMQ所在主机ip或者主机名
            factory.setHost();
            factory.setPort();
            factory.setUsername();
            factory.setPassword();
            //创建一个连接
            Connection connection = factory.newConnection();
            //创建一个频道
            Channel channel = connection.createChannel();
            //指定一个队列
            /*
                durable:true、false.true:在服务器重启时,能够存活
                exclusive :是否为当前连接的专用队列,在连接断开后,会自动删除该队列,生产环境中应该很少用到吧。
                autodelete:当没有任何消费者使用时,自动删除该队列。this means that the queue will be deleted when there are no more processes consuming messages from it.
             */
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            //每次从队列获取的数量,保证一次只分发一个
            channel.basicQos(1);
            //监听队列
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //处理监听得到的消息
                    String message = null;
                    try {


                        message = new String(body, "UTF-8");
                        //        if(){} 业务逻辑判断
                        //将字符串转换成MsgDa对象
                        MsgDa msgDa = GsonJsonUtil.stringToObject(message, MsgDa.class);
                        LaboratoryService laboratoryService = (LaboratoryService) SpringContextUtil.getBean("laboratoryService");
                        ConcurrentHashMap<String, Object> deviceTerminalType = laboratoryService.getDataLaboratorBymsg(msgDa);
                        if(deviceTerminalType.size()>0){
                            String key =(String) deviceTerminalType.get("key");
                            String value = (String) deviceTerminalType.get("value");
                            if(webSocketSet.get(key)!=null){
                                synchronized (key){
                                    int length = value.length();
                                    if(length>80&&session.isOpen()){
                                        sendMessage(key,value);
                                    }

                                }
                            }
                        }

                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                        channel.abort();
                    } finally {
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }

                }
            };

            //autoAck是否自动回复,如果为true的话,每次生产者只要发送信息就会从内存中删除,那么如果消费者程序异常退出,
            // 那么就无法获取数据,我们当然是不希望出现这样的情况,所以才去手动回复,每当消费者收到并处理信息然后在通知
            // 生成者。最后从队列中删除这条信息。如果消费者异常退出,如果还有其他消费者,那么就会把队列中的消息发送给其
            // 他消费者,如果没有,等消费者启动时候再次发送。
            boolean autoAck = false;
            //消息消费完成确认
            channel.basicConsume(QUEUE_NAME, autoAck, consumer);

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

    }



    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        webSocketSet.remove(this);  //从set中删除
        sessions.remove(session);
        subOnlineCount();           //在线数减1
        log.info("有一连接关闭!当前在线人数为" + getOnlineCount());
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息
     * @param session 可选的参数
     */
//    @OnMessage
//    public void onMessage(String message, Session session) {
//        log.info("来自客户端的消息:" + message);
//        //群发消息
//        for (WebSocketServer item : webSocketSet) {
//            try {
//                item.sendMessage(message);
//            } catch (IOException e) {
//                e.printStackTrace();
//                continue;
//            }
//        }
//    }

    /**
     * 发生错误时调用
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("发生错误");

        error.printStackTrace();
    }

    /**
     * 这个方法与上面几个方法不一样。没有用注解,是根据自己需要添加的方法。
     *
     * @param message
     * @throws IOException
     */
    public void sendMessage(String no, String message) throws IOException {
        //阻塞式的(同步的)
//        if (sessions.size() != 0) {
//            for (Session s : sessions) {
//                if (s != null) {
//                    s.getBasicRemote().sendText(message);
//                }
//            }
//        }

        webSocketSet.get(no).sendMessage(message);

        //非阻塞式的(异步的)
//        this.session.getAsyncRemote().sendText(message);
//        log.info("[x] 推送消息"+message);
    }

    private void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }

    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    public static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount++;
    }

    public static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount--;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
消息处理分发
  • 首先将人脸识别终端发过来的数据进行JSON解析成msgDa对象
  • 通过对象中设备id判断设备所在位置
  • 通过第一次初始化设定的规则发送到不同楼层的大屏
  • 初始化补充,通过key value形式存入redis
 List<Terminal> list = terminalService.findTerminal(new Terminal().setTerminalCategory("face"));
        for (Terminal terminal : list) {
            map.put(terminal.getTerminalSerialNumber(),terminal);
        }
        String msgDaJSON = "";
        String msgDaJSON2 = "";
        Terminal terminal = map.get(msgDa.getDevice_id());
        String key = terminal.getTerminalPlace();

        if (msgDa != null) {

            msgDaJSON = JSON.toJSONString(msgDa);
            msgDa.setImageFile("");
            msgDaJSON2 = JSON.toJSONString(msgDa);

            int length = msgDaJSON2.length();
            if(length>80){
                t5(key.substring(0, 6)+".data", "\n" + msgDaJSON2);

            }
        }
        ConcurrentHashMap<String, Object> concurrentHashMap = new ConcurrentHashMap<>();

        Object str = redisUtil.get(key.substring(0, 6));
        String[] split = str.toString().split(",");
//        String[] splita = ;

        for (String o : split) {
            if(o.equals(key)){
                concurrentHashMap.put("key", key.substring(0,6));
                concurrentHashMap.put("value", msgDaJSON);
            }
        }

        return concurrentHashMap;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/474778
推荐阅读
相关标签
  

闽ICP备14008679号