赞
踩
服务器如何将消息主动推送给客户端(浏览器)呢?
浏览器以指定时间间隔向后端服务器发出 HTTP 请求,服务器实时返回数据给前端。
缺点:
浏览器发出 ajax 请求,服务端收到请求后,会阻塞请求直到查询到最新数据或者超时才返回。
WebSocket 是一种基于 TCP 连接上进行全双工通信的协议。
websocket 对象创建:
let ws = new WebSocket(URL);
URL 格式说明:
websocket 对象相关事件:
事件 | 事件处理程序 | 描述 |
---|---|---|
open | ws.onopen | 连接建立时触发 |
message | ws.onmessage | 客户端接收到服务器发送的数据时触发 |
close | ws.onclose | 连接关闭时触发 |
websocket 对象提供的方法:
方法名称 | 描述 |
---|---|
send() | 通过websocket对象调用该方法发送数据给服务端 |
Tomcat 从 7.0.5 版本开始支持 WebSocket,并且实现了 Java WebSocket 规范。
Java WebSocket 应用由一系列 Endpoint 组成。Endpoint 是一个 Java 对象,代表 WebSocket 链接的一端,对于服务端,我们可以视为处理具体 WebSocket 消息的接口。
两种方式定义 Endpoint:
Endpoint 实例在 WebSocket 握手时创建,并在客户端与服务端链接过程中有效,链接关闭时结束。
注解 | 描述 |
---|---|
@OnOpen | 开启一个新的会话时调用(客户端与服务端握手成功时调用) |
@OnClose | 会话关闭时调用 |
@OnError | 连接过程中异常时调用 |
服务端如何接收客户端发送的数据呢?
在定义 Endpoint 时,通过 @OnMessage 注解指定接收消息的方法。
服务端如何发送数据给客户端呢?
发送消息由 RemoteEndpoint 完成,其实例由 Session 维护
- session.getBasicRemote 获取同步消息发送的实例,然后调用其 sendXxx() 方法发送消息;
- session.getAsyncRemote 获取异步消息发送的实例,然后调用其 sendXxx() 方法发送消息;
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
编写 WebSocketConfig 配置类,扫描添加了 @ServerEndpoint 注解的Bean。
package com.zte.rdcloud.iproject.infra.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; @Configuration public class WebSocketConfig { /** * 自动注册使用了@ServerEndpoint注解声明的Websocket endpoint * * @return */ @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
编写 controller:
编写获取 HttpSession 的配置类:
import javax.servlet.http.HttpSession; import javax.websocket.HandshakeResponse; import javax.websocket.server.HandshakeRequest; import javax.websocket.server.ServerEndpointConfig; public class GetHttpSessionConfig extends ServerEndpointConfig.Configurator { @Override public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) { // 获取 HttpSession 对象 HttpSession httpSession = (HttpSession) request.getHttpSession(); // 将 HttpSession 对象保存起来 sec.getUserProperties().put(HttpSession.class.getName(), httpSession); } }
再将该配置类添加到 @ServerEndpoint 中:
@ServerEndpoint(value = "/chat", configurator = GetHttpSessionConfig.class)
编写 聊天Endpoint 的具体实现类:
package com.zte.rdcloud.iproject.domain.common.websocket; import org.springframework.stereotype.Component; import javax.servlet.http.HttpSession; import javax.websocket.*; import javax.websocket.server.ServerEndpoint; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @ServerEndpoint(value = "/chat", configurator = GetHttpSessionConfig.class) // 声明访问路径 @Component public class ChatEndpoint { private static final Map<String, Session> onlineUsers = new ConcurrentHashMap<>(); private HttpSession httpSession; /** * 建立websocket连接后调用该方法 * * @param session */ @OnOpen public void onOpen(Session session, EndpointConfig endpointConfig){ // 1. 将session保存(endpoint是和浏览器的连接是一一对应的,每个人的聊天有每个人的endpoint) // onlineUsers.put("用户名", session); // key需要一个唯一标识,能够区分不同的用户 this.httpSession = (HttpSession) endpointConfig.getUserProperties().get(HttpSession.class.getName()); String user = (String) this.httpSession.getAttribute("user"); onlineUsers.put(user, session); // 2. 广播消息,需要将登录的所有用户推送给其他用户 broadcastAllUsers("message"); } private void broadcastAllUsers(String message){ try { for (Map.Entry<String, Session> entry : onlineUsers.entrySet()) { // 获取到所有用户对应的 session 对象 Session session = entry.getValue(); // 发送消息 session.getBasicRemote().sendText(message); } } catch (Exception e) { // 异常日志 } } /** * 浏览器发送消息到服务端调用该方法 * * @param session * @param message */ @OnMessage public void onMessage(Session session, String message){ try { // 将消息推送给指定用户 session.getBasicRemote().sendText(message); }catch (Exception e){ } } /** * 断开 websocket 时调用 * * @param session */ @OnClose public void onOClose(Session session){ // 从 onlineUsers 删除当前用户的session对象 String user = (String) this.httpSession.getAttribute("user"); onlineUsers.remove(user); } }
WebSocketController:
package com.zte.rdcloud.iproject.controller.common.websocket; import com.zte.itp.msa.core.model.ServiceData; import com.zte.rdcloud.iproject.domain.common.websocket.AbstractBackendThread; import com.zte.rdcloud.iproject.domain.common.websocket.WebSocketOperatorMap; import lombok.Getter; import lombok.Setter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.concurrent.CopyOnWriteArraySet; @Slf4j @Component @ServerEndpoint("/webSocket/{tenantId}/{workspaceKey}/{planId}") @Getter @Setter public class WebSocketController implements ApplicationContextAware { private static int onlineCount = 0; private static CopyOnWriteArraySet<WebSocketController> webSocketSet = new CopyOnWriteArraySet<WebSocketController>(); private static ApplicationContext applicationContext; private Session session = null; private String tenantId = null; private String workspaceKey = null; private String planId = null; private AbstractBackendThread thread = null; @Autowired WebSocketOperatorMap webSocketOperatorMap; @Override public void setApplicationContext(ApplicationContext context){ applicationContext = context; } /** * 连接建立成功调用的方法 * */ @OnOpen public void onOpen(Session session, @PathParam("tenantId") String tenantId, @PathParam("workspaceKey") String workspaceKey, @PathParam("planId") String planId) { this.session = session; this.workspaceKey = workspaceKey; this.planId = planId; this.tenantId = tenantId; webSocketSet.add(this); addOnlineCount(); log.info("new user[session: " + this.session + " tenantId: " + this.tenantId + " workspaceKey: " + this.workspaceKey + " planId: " + this.planId + " ] access, online count: " + getOnlineCount()); try { sendMessage("connection success."); } catch (IOException e) { log.error("webSocket IOException.", e); } } /** * 连接关闭调用的方法 */ @OnClose public void onClose() { closeThread(); webSocketSet.remove(this); subOnlineCount(); log.info("user[session: " + this.session + " tenantId: " + this.tenantId + " workspaceKey: " + this.workspaceKey + " planId: " + this.planId + " ] offline, online count: " + getOnlineCount()); } /** * 收到客户端消息后调用的方法 * @param message 客户端发送过来的消息 */ @SneakyThrows @OnMessage public void onMessage(Session session, String message) { log.info("reserved user[session " + session + " tenantId: " + this.tenantId + " workspaceKey: " + this.workspaceKey + " planId: " + this.planId + " ] message: " + message); if(webSocketOperatorMap == null) { webSocketOperatorMap = applicationContext.getBean(WebSocketOperatorMap.class); } if(StringUtils.isNotBlank(message)){ webSocketOperatorMap.doOneThing(message, this); } } /** * 发生错误 * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { closeThread(); log.error("发生错误", error); } public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } public void sendMessage(ServiceData message) throws IOException, EncodeException { this.session.getBasicRemote().sendObject(message); } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { WebSocketController.onlineCount++; } public static synchronized void subOnlineCount() { WebSocketController.onlineCount--; } private void closeThread(){ if(null != thread){ thread.closeThread(); while (thread.isInProcess()){ try { Thread.sleep(10); } catch (InterruptedException e) { throw new RuntimeException(e); } } } thread = null; } }
业务处理(查询前端需要的数据):
package com.zte.rdcloud.iproject.domain.common.websocket; import com.alibaba.fastjson.JSONObject; import com.zte.rdcloud.iproject.controller.common.websocket.WebSocketController; import com.zte.rdcloud.iproject.domain.versionplan.scope.dto.baseline.PlanBaselineStateDAO; import com.zte.rdcloud.iproject.domain.versionplan.scope.repository.PlanBaselineStateRepository; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.Objects; import java.util.Optional; import static com.zte.rdcloud.iproject.domain.versionplan.scope.constant.TableNameConstant.PLAN_BASELINE_STATE; @Slf4j @Service public class WebSocketOperatorMap { @Autowired private PlanBaselineStateRepository planBaselineStateRepository; public void doOneThing(String message, WebSocketController ws){ JSONObject obj = JSONObject.parseObject(message); String funcType = Optional.ofNullable(obj.getString("funcType")).orElse("other"); switch (funcType){ case "queryPlanBaselineState": ws.setThread(new BaselineThread(ws, 1)); break; case "queryPlanChangeState": ws.setThread(new BaselineThread(ws, 4)); break; default: } if(null != ws.getThread()){ ws.getThread().start(); } } class BaselineThread extends AbstractBackendThread { private volatile boolean flag = true; private volatile boolean inProcess = true; private final WebSocketController ws; private final int value; public BaselineThread(WebSocketController ws, int value){ this.value = value; this.ws = ws; } @Override public void run() { try { while(flag){ Integer planBaselineState = queryPlanBaselineState(ws.getTenantId(), ws.getWorkspaceKey(), ws.getPlanId()); if(!Objects.equals(value, planBaselineState)){ ws.sendMessage(String.valueOf(planBaselineState)); closeThread(); break; } Thread.sleep(10); } }catch (Exception e){ log.error("Listen baseLine state exception:", e); } inProcess = false; log.info("Thread finish: " + Thread.currentThread().getName()); } @Override public void closeThread() { this.flag = false; log.info("Close thread: " + Thread.currentThread().getName() + ", set flag = " + flag); } @Override public Boolean isInProcess() { return inProcess; } } private Integer queryPlanBaselineState(String tenantId, String workspaceKey, String planId) { PlanBaselineStateDAO planBaselineStateDAO = planBaselineStateRepository.queryPlanBaselineState(tenantId + PLAN_BASELINE_STATE, workspaceKey, planId); return null == planBaselineStateDAO ? 0 : planBaselineStateDAO.getBaselineState(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。