赞
踩
心跳机制每隔一段时间进行一次连接关闭,连接重连保证websocket实现长连接不断开。
我这里事件设置1小时,js代码如下
//js代码
var pathTotal="102020"
var url="ws://172.16.28.250:8088/websocket/"+pathTotal
var socket;
var lockReconnect = false; //避免socket重复连接
if (!window.WebSocket) {
window.WebSocket = window.MozWebSocket;
}
if (window.WebSocket) {//判断当前设备是否支持websocket
socket = new WebSocket(url);
socket.onclose = function (event) {
reconnect(url); //尝试重新连接
console.log(event);
console.log('连接关闭')
};
socket.onerror = function () {
reconnect(url); //尝试重新连接
console.log(event);
console.log('连接错误')
};
socket.onopen = function (event) {
heartCheck.reset().start(); //心跳检测重置
console.log(event);
console.log('连接开启')
};
socket.onmessage = function (event) {
heartCheck.reset().start(); //心跳检测重置
console.log(event);
console.log('消息接收到了,只要有接收到消息,连接都是正常的')
if (event.data != 'Msg') {
let data = JSON.parse(event.data);
}
setMessageInnerHTML2(event.data,suffix)
};
} else {
alert.log("你的浏览器不支持WebSocket!");
}
// // 监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
window.onbeforeunload = function () {
socket.close();
}
// 重新连接
function reconnect(url) {
console.log("重新连接")
if (lockReconnect) return;
lockReconnect = true;
setTimeout(function () { //没连接上会一直重连,设置延迟避免请求过多
socket = new WebSocket(url);
lockReconnect = false;
}, 2000);
}
//心跳检测
var heartCheck = {
timeout: 55000*60, //1分钟发一次心跳,时间设置小一点较好(50-60秒之间)
timeoutObj: null,
serverTimeoutObj: null,
reset: function () {
clearInterval(this.timeoutObj);
clearTimeout(this.serverTimeoutObj);
return this;
},
start: function () {
var self = this;
this.timeoutObj = setInterval(function () {
console.log("重新连接")
//这里发送一个心跳,后端收到后,返回一个心跳消息,
//onmessage拿到返回的心跳就说明连接正常
socket.send("Msg");
self.serverTimeoutObj = setTimeout(function () {//如果超过一定时间还没重置,说明后端主动断开了
socket.close(); //如果onclose会执行reconnect,我们执行socket.close()就行了.如果直接执行reconnect 会触发onclose导致重连两次
}, self.timeout)
}, this.timeout)
}
}
/**
* 连接建立成功调用的方法
*
* @param session 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
@OnOpen
public void onOpen(@PathParam(value = "no") String no,Session session) {
this.session = session;
sessions.add(session);
webSocketSet.remove(no);
webSocketSet.put(no,this); //加入set中
addOnlineCount(); //在线数加1
log.info("有新连接加入!当前在线人数为" + getOnlineCount());
String QUEUE_NAME = "GREATHIIT_FACE_TERMINAL";
// String QUEUE_NAME = "closedPlace";
try {
//创建连接连接到MabbitMQ
ConnectionFactory factory = new ConnectionFactory();
//设置MabbitMQ所在主机ip或者主机名
factory.setHost();
factory.setPort();
factory.setUsername();
factory.setPassword();
//创建一个连接
Connection connection = factory.newConnection();
//创建一个频道
Channel channel = connection.createChannel();
//指定一个队列
/*
durable:true、false.true:在服务器重启时,能够存活
exclusive :是否为当前连接的专用队列,在连接断开后,会自动删除该队列,生产环境中应该很少用到吧。
autodelete:当没有任何消费者使用时,自动删除该队列。this means that the queue will be deleted when there are no more processes consuming messages from it.
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//每次从队列获取的数量,保证一次只分发一个
channel.basicQos(1);
//监听队列
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//处理监听得到的消息
String message = null;
try {
message = new String(body, "UTF-8");
// if(){} 业务逻辑判断
//将字符串转换成MsgDa对象
MsgDa msgDa = GsonJsonUtil.stringToObject(message, MsgDa.class);
LaboratoryService laboratoryService = (LaboratoryService) SpringContextUtil.getBean("laboratoryService");
ConcurrentHashMap<String, Object> deviceTerminalType = laboratoryService.getDataLaboratorBymsg(msgDa);
if(deviceTerminalType.size()>0){
String key =(String) deviceTerminalType.get("key");
String value = (String) deviceTerminalType.get("value");
if(webSocketSet.get(key)!=null){
synchronized (key){
int length = value.length();
if(length>80&&session.isOpen()){
sendMessage(key,value);
}
}
}
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
channel.abort();
} finally {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//autoAck是否自动回复,如果为true的话,每次生产者只要发送信息就会从内存中删除,那么如果消费者程序异常退出,
// 那么就无法获取数据,我们当然是不希望出现这样的情况,所以才去手动回复,每当消费者收到并处理信息然后在通知
// 生成者。最后从队列中删除这条信息。如果消费者异常退出,如果还有其他消费者,那么就会把队列中的消息发送给其
// 他消费者,如果没有,等消费者启动时候再次发送。
boolean autoAck = false;
//消息消费完成确认
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
webSocketSet.remove(this); //从set中删除
sessions.remove(session);
subOnlineCount(); //在线数减1
log.info("有一连接关闭!当前在线人数为" + getOnlineCount());
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
* @param session 可选的参数
*/
// @OnMessage
// public void onMessage(String message, Session session) {
// log.info("来自客户端的消息:" + message);
// //群发消息
// for (WebSocketServer item : webSocketSet) {
// try {
// item.sendMessage(message);
// } catch (IOException e) {
// e.printStackTrace();
// continue;
// }
// }
// }
/**
* 发生错误时调用
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误");
error.printStackTrace();
}
/**
* 这个方法与上面几个方法不一样。没有用注解,是根据自己需要添加的方法。
*
* @param message
* @throws IOException
*/
public void sendMessage(String no, String message) throws IOException {
//阻塞式的(同步的)
// if (sessions.size() != 0) {
// for (Session s : sessions) {
// if (s != null) {
// s.getBasicRemote().sendText(message);
// }
// }
// }
webSocketSet.get(no).sendMessage(message);
//非阻塞式的(异步的)
// this.session.getAsyncRemote().sendText(message);
// log.info("[x] 推送消息"+message);
}
private void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
List<Terminal> list = terminalService.findTerminal(new Terminal().setTerminalCategory("face"));
for (Terminal terminal : list) {
map.put(terminal.getTerminalSerialNumber(),terminal);
}
String msgDaJSON = "";
String msgDaJSON2 = "";
Terminal terminal = map.get(msgDa.getDevice_id());
String key = terminal.getTerminalPlace();
if (msgDa != null) {
msgDaJSON = JSON.toJSONString(msgDa);
msgDa.setImageFile("");
msgDaJSON2 = JSON.toJSONString(msgDa);
int length = msgDaJSON2.length();
if(length>80){
t5(key.substring(0, 6)+".data", "\n" + msgDaJSON2);
}
}
ConcurrentHashMap<String, Object> concurrentHashMap = new ConcurrentHashMap<>();
Object str = redisUtil.get(key.substring(0, 6));
String[] split = str.toString().split(",");
// String[] splita = ;
for (String o : split) {
if(o.equals(key)){
concurrentHashMap.put("key", key.substring(0,6));
concurrentHashMap.put("value", msgDaJSON);
}
}
return concurrentHashMap;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。