赞
踩
WebSocket协议通过在客户端和服务端之间提供全双工通信来进行Web和服务器的交互功能。在WebSocket应用程序中,服务器发布WebSocket端点,客户端使用url连接到服务器。建立连接后,服务器和客户端就可以互相发送消息。客户端通常连接到一台服务器,服务器接受多个客户端的连接。
HTPP协议是基于请求响应模式,并且无状态的。HTTP通信只能由客户端发起,HTTP 协议做不到服务器主动向客户端推送信息。
如果我们想要查询当前的排队情况,只能是页面轮询向服务器发出请求,服务器返回查询结果。轮询的效率低,非常浪费资源(因为必须不停连接,或者 HTTP 连接始终打开)
@ServerEndpoint(“/websocket/{uid}”)
申明这是一个websocket服务;
需要指定访问该服务的地址,在地址中可以指定参数,需要通过{}进行占位;
@OnOpen
用法:public void onOpen(Session session, @PathParam(“uid”) String uid) throws IOException{}
该方法将在建立连接后执行,会传入session对象,就是客户端与服务端建立的长连接通道,通过@PathParam获取url中声明的参数;
@OnClose
用法:public void onClose() {}
该方法是在连接关闭后执行;
@OnMessage
用法:public void onMessage(String message, Session session) throws IOException {}
该方法用于接收客户端发送的消息;
message:发来的消息数据;
session:会话对象(也是长连接通道);
发送消息到客户端;
用法:session.getBasicRemote().sendText(“hello,websocket.”);
通过session进行消息发送;
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @Component @Slf4j @ServerEndpoint(value = "/api/pushMessageMulti/{roomId}/{userId}",encoders = { ServerEncoder.class }) public class WebSocketServerMulti { /**静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。*/ private static int onlineCount = 0; /**concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。*/ private static Map<String, WebSocketServerMulti> userMap = new ConcurrentHashMap<>(); //存放房间对象 private static Map<String, Set<WebSocketServerMulti>> roomMap = new ConcurrentHashMap<>(); /**与某个客户端的连接会话,需要通过它来给客户端发送数据*/ private Session session; /**接收userId*/ private String userId = ""; //存出当前群聊在线的人数(使用原因:roomMap中无法获取到当前用户id) private static Map<String, List<String>> multiUser = new ConcurrentHashMap<>(); /** * 连接建立成 * 功调用的方法 */ @OnOpen public void onOpen(Session session,@PathParam("roomId") String roomId , @PathParam("userId") String userId) throws IOException, EncodeException { synchronized (session){ try { this.session = session; this.userId=userId; userMap.put(userId,this); if (!roomMap.containsKey(roomId)) { Set<WebSocketServerMulti> set = new HashSet<>(); set.add(userMap.get(userId)); roomMap.put(roomId,set); List<String> dd = new LinkedList<>(); dd.add(userId); multiUser.put(roomId,dd); } else { if(multiUser.get(roomId).contains(userId)){ log.info("存在:房间号:"+roomId+"用户连接:"+userId+",当前在线人数为:" + multiUser.get(roomId).size()); }else{ multiUser.get(roomId).add(userId); roomMap.get(roomId).add(this); } } System.out.println(multiUser.get(roomId).size()); log.info("房间号:"+roomId+"用户连接:"+userId+",当前在线人数为:" + multiUser.get(roomId).size()); Map<String,Object> map = new HashMap<>(); map.put("online_num",multiUser.get(roomId).size());//在线人数 map.put("online_list",roomMap.get(roomId));//人员列表 map.put("roomId",roomId);//群id map.put("message","用户"+***+"加入群聊");//消息 //自定义发送消息 sendMessageObject(map,roomId); }catch (Exception e){ e.printStackTrace(); } } } /** * 连接关闭@PathParam("userId") String userId * 调用的方法 */ @OnClose public void onClose( @PathParam("roomId") String roomId,@PathParam("userId") String userId) { try { if (roomMap.containsKey(roomId)) { Set<WebSocketServerMulti> set = roomMap.get(roomId); Iterator<WebSocketServerMulti> it = set.iterator(); while (it.hasNext()) { if (it.next().userId.equals(userId)) { it.remove(); } } multiUser.get(roomId).remove(userId); log.info("房间号:"+roomId+"用户退出:"+userId+",当前在线人数为:" + multiUser.get(roomId).size()); Map<String,Object> map = new HashMap<>(); map.put("online_num",multiUser.get(roomId).size());//在线人数 map.put("online_list",roomMap.get(roomId));//人员列表 map.put("roomId",roomId);//群id map.put("message","用户"+***+"加入群聊");//消息 //自定义发送消息 sendMessageObject(map,roomId); } }catch (Exception e){ e.printStackTrace(); } } /** * 收到客户端消 * 息后调用的方法 * @param message * 客户端发送过来的消息 **/ @OnMessage public void onMessage(String message) { //注意,要给session加上同步锁,否则会出现多个线程同时往同一个session写数据,导致报错的情况。 synchronized (session){ //可以群发消息 if(StringUtils.isNotBlank(message)){ try { //解析发送的报文 JSONObject jsonObject = JSONObject.parseObject(message); //追加发送人(防止串改) jsonObject.put("fromUserId",this.userId); int chatType=jsonObject.getInteger("chatType"); String myUserId=jsonObject.getString("myUserId"); String toRoomId=jsonObject.getString("toRoomId"); log.info("房间号:"+toRoomId+"用户消息:"+userId+",报文:"+message); sendMessageTo(message,toRoomId); }catch (Exception e){ e.printStackTrace(); } } } } /** * 群聊 * @param message 消息 * @param roomId 房间号 */ public void sendMessageTo(String message , String roomId) throws IOException { if (roomMap.containsKey(roomId)) { for (WebSocketServerMulti item : roomMap.get(roomId)) { item.session.getAsyncRemote().sendText(message); } } } /** * @param error */ @OnError public SystemResult onError(Throwable error) { log.error("用户错误:"+this.userId+",原因:"+error.getMessage()); ChatError chatError = new ChatError(); chatError.setUserId(Integer.valueOf(this.userId)); chatError.setDetails(error.getMessage()); chatError.setAddTime(new Date()); chatErrorMapper.insert(chatError); SystemResult systemResult = new SystemResult(); return systemResult.error(error.getMessage()); } /** * 实现服务 * 器主动推送 */ public void sendMessage(String message) { try { this.session.getBasicRemote().sendText(message); } catch (IOException e) { e.printStackTrace(); } } /** * 实现服务传送Object类型 * 器主动推送 */ public void sendMessageObject(Object message,String roomId) { try { if (roomMap.containsKey(roomId)) { for (WebSocketServerMulti item : roomMap.get(roomId)) { item.session.getBasicRemote().sendObject(message); } } } catch (Exception e) { e.printStackTrace(); } } /** * 获得此时的 * 在线人数 * @return */ public static synchronized int getOnlineCount() { return onlineCount; } /** * 在线人 * 数加1 */ public static synchronized void addOnlineCount() { WebSocketServerMulti.onlineCount++; } /** * 在线人 * 数减1 */ public static synchronized void subOnlineCount() { WebSocketServerMulti.onlineCount--; } }
import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @Component @Slf4j @ServerEndpoint("/api/pushMessageSolo/{userId}") public class WebSocketServerSolo { /**静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。*/ private static int onlineCount = 0; /**concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。*/ private static ConcurrentHashMap<String, WebSocketServerSolo> webSocketMap = new ConcurrentHashMap<>(); /**与某个客户端的连接会话,需要通过它来给客户端发送数据*/ private Session session; /**接收userId*/ private String userId = ""; /** * 连接建立成 * 功调用的方法 */ @OnOpen public void onOpen(Session session, @PathParam("userId") String userId) { this.session = session; this.userId=userId; if(webSocketMap.containsKey(userId)){ webSocketMap.remove(userId); //加入set中 webSocketMap.put(userId,this); }else{ //加入set中 webSocketMap.put(userId,this); //在线数加1 addOnlineCount(); } log.info("用户连接:"+userId+",当前在线人数为:" + getOnlineCount()); sendMessage("连接成功"); } /** * 连接关闭 * 调用的方法 */ @OnClose public void onClose() { if(webSocketMap.containsKey(userId)){ webSocketMap.remove(userId); //从set中删除 subOnlineCount(); } log.info("用户退出:"+userId+",当前在线人数为:" + getOnlineCount()); } /** * 收到客户端消 * 息后调用的方法 * @param message * 客户端发送过来的消息 **/ @OnMessage public void onMessage(String message, Session session) { log.info("用户消息:"+userId+",报文:"+message); //可以群发消息 //消息保存到数据库、redis if(StringUtils.isNotBlank(message)){ try { //解析发送的报文 JSONObject jsonObject = JSONObject.parseObject(message); //追加发送人(防止串改) jsonObject.put("fromUserId",this.userId); String toUserId=jsonObject.getString("toUserId"); String myUserId=jsonObject.getString("myUserId"); //传送给对应toUserId用户的websocket if(StringUtils.isNotBlank(toUserId)&&webSocketMap.containsKey(toUserId)){ webSocketMap.get(toUserId).sendMessage(message); }else{ //否则不在这个服务器上,发送到mysql或者redis log.error("请求的userId:"+toUserId+"不在该服务器上"); } if(StringUtils.isNotBlank(myUserId)&&webSocketMap.containsKey(myUserId)){ webSocketMap.get(myUserId).sendMessage(message); }else{ //否则不在这个服务器上,发送到mysql或者redis log.error("请求的userId:"+myUserId+"不在该服务器上"); } }catch (Exception e){ e.printStackTrace(); } } } /** * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { log.error("用户错误:"+this.userId+",原因:"+error.getMessage()); error.printStackTrace(); } /** * 实现服务 * 器主动推送 */ public void sendMessage(String message) { try { this.session.getBasicRemote().sendText(message); } catch (IOException e) { e.printStackTrace(); } } /** *发送自定 *义消息 **/ public static void sendInfo(String message, String userId) { log.info("发送消息到:"+userId+",报文:"+message); if(StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)){ webSocketMap.get(userId).sendMessage(message); }else{ log.error("用户"+userId+",不在线!"); } } /** * 获得此时的 * 在线人数 * @return */ public static synchronized int getOnlineCount() { return onlineCount; } /** * 在线人 * 数加1 */ public static synchronized void addOnlineCount() { WebSocketServerSolo.onlineCount++; } /** * 在线人 * 数减1 */ public static synchronized void subOnlineCount() { WebSocketServerSolo.onlineCount--; } }
本地开发的时候都可以正常使用,但是在部署到nginx代理服务器的时候发现报了错误,连不上,报错:Error in connection establishment: net::ERR_NAME_NOT_RESOLVED
发现是nginx服务器默认是不打开webSocket的功能的,这需要我们在nginx服务器上配置:
location /test/ {
proxy_pass http://test.com;
proxy_redirect default;
proxy_set_header Upgrade $http_upgrade; # allow websockets
proxy_set_header Connection "upgrade";
proxy_http_version 1.1;
}
如果nginx没有设置读取超时时间,websocket会一直断线重连,大约一分钟重连一次
可以设置长时间得超时时间,避免一直断线重连,避免消耗内存
location /test{
proxy_pass http://test.com;
proxy_set_header Upgrade $http_upgrade; # allow websockets
proxy_set_header Connection "upgrade";
proxy_http_version 1.1;
proxy_connect_timeout 60s;#l连接超时时间,不能设置太长会浪费连接资源
proxy_read_timeout 500s;#读超时时间
proxy_send_timeout 500s;#写超时时间
index index.html index.htm;
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。