赞
踩
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
该类可以实现org.springframework.web.socket.WebSocketHandler
,也可以继承org.springframework.web.socket.handler.TextWebSocketHandler
或org.springframework.web.socket.handler.BinaryWebSocketHandler
它们的继承关系如下图所示:
WebSocketHandler
public interface WebSocketHandler { // 建立连接之后调用 void afterConnectionEstablished(WebSocketSession var1) throws Exception; // 当新的WebSocket消息到达时调用 void handleMessage(WebSocketSession var1, WebSocketMessage<?> var2) throws Exception; // 处理消息传输异常 void handleTransportError(WebSocketSession var1, Throwable var2) throws Exception; // 在WebSocket连接被任意一方关闭后,或者发生传输错误之后调用 void afterConnectionClosed(WebSocketSession var1, CloseStatus var2) throws Exception; /* * WebSocket 是否支持处理部分消息, * 若此标志设置为true,并且底层WebSocket服务器支持部分消息, * 则大的 WebSocket 消息或未知大小的消息可能会被拆分并可能通过对handleMessage(WebSocketSession, WebSocketMessage)多次调用接收。 * 标志WebSocketMessage.isLast()指示消息是否部分以及它是否是最后一部分 */ boolean supportsPartialMessages(); }
AbstractWebSocketHandler
public abstract class AbstractWebSocketHandler implements WebSocketHandler { @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { } @Override public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception { if (message instanceof TextMessage) { handleTextMessage(session, (TextMessage) message); } else if (message instanceof BinaryMessage) { handleBinaryMessage(session, (BinaryMessage) message); } else if (message instanceof PongMessage) { handlePongMessage(session, (PongMessage) message); } else { throw new IllegalStateException("Unexpected WebSocket message type: " + message); } } protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { } protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception { } protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception { } @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { } @Override public boolean supportsPartialMessages() { return false; } }
可以看到除了handleMessage
,其它方法都空方法,这些方法需我们在具体的业务中去实现。
该类还定义了三个方法: handleTextMessage
,handleBinaryMessage
,handlePongMessage
,分别用来处理TextMessage
,BinaryMessage
和PongMessage
,下图是WebSocketMessage
的类图:
TextMessage
的范型参数是String
,BinaryMessage
和PonMessage
的范型参数都是ByteBuffer
。
TextWebSocketHandler
public class TextWebSocketHandler extends AbstractWebSocketHandler {
@Override
protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) {
try {
session.close(CloseStatus.NOT_ACCEPTABLE.withReason("Binary messages not supported"));
}
catch (IOException ex) {
// ignore
}
}
}
可以看到,该处理类重写了handlerBinaryMessage
方法,如果是使用该处理类,那么就只能处理字符信息,若是接收的是字节信息,将会关闭该连接。
BinaryWebSocketHandler
public class BinaryWebSocketHandler extends AbstractWebSocketHandler {
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
try {
session.close(CloseStatus.NOT_ACCEPTABLE.withReason("Text messages not supported"));
}
catch (IOException ex) {
// ignore
}
}
}
与TextWebSocketHandler
类同理,该处理类只能处理字节信息,若是接收到了字符信息,将会断开该连接。
根据业务的不同,若是只涉及到文本信息,那么我们可以继承TextWebSocketHandler
;若是只需要传递二进制信息,那么可以继承BinaryWebSocketHandler
;如果两种信息都有的话,可以继承AbstractWebSocketHandler
或实现WebSocketHandler
接口。
下面以继承TextWebSocketHandler
为例:
public class MyHandler extends TextWebSocketHandler { private List<WebSocketSession> sessions = new ArrayList<>(); @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { /** * 广播消息 */ sessions.forEach(s -> { if (!s.equals(session)){ try { s.sendMessage(message); } catch (IOException e) { e.printStackTrace(); } } }); } @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { /** * 保存会话 */ sessions.add(session); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { /** * 删除会话 */ sessions.remove(session); } }
我们需要实现上面的3个方法,分别在建立连接时保存会话,收到消息时,将消息广播,然后再断开连接时删除相应的会话。
@Configuration
@EnableWebSocket
public class WebsocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) {
webSocketHandlerRegistry.addHandler(myWebsocketHandler(),"myWS");
}
public WebSocketHandler myWebsocketHandler(){
return new MyHandler();
}
}
使用@EnableWebSocket
注解
实现WebSocketConfigurer
该接口只定义了一个接口:
/**
* Defines callback methods to configure the WebSocket request handling
* via {@link org.springframework.web.socket.config.annotation.EnableWebSocket @EnableWebSocket}.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface WebSocketConfigurer {
/**
* Register {@link WebSocketHandler WebSocketHandlers} including SockJS fallback options if desired.
*/
void registerWebSocketHandlers(WebSocketHandlerRegistry registry);
}
正如注释所说,通过@EnableWebSocket
,定义一个回调方法来配置WebSocket的请求处理类。
在demo中,我们注册了一个自定义处理类,来处理来自/myWS的WebSocket消息。
至此,我们就可以使用Websocket进行消息广播了。
如果我们想要做到点对点的通信,那么我们需要将会话与客户端的关联关系保存下来。
我们可以通过以下方式获取在建立WebSocket连接的时候获取当前会话的登陆信息:
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
Principal user = session.getPrincipal(); // 获取
log.info("User {} has connected",user.getName());
sessions.add(session);
}
用户信息都能获取到了,那点对点通信自然就不再话下了。
为什么WebSocketSession
中会有用户信息呢?
我们在afterConnectionEstablished
方法中打个断点,栈帧如图所示:
我在WsHttpUpgradeHandler.init
中发现了下面这行代码:
try {
this.wsRemoteEndpointServer = new WsRemoteEndpointImplServer(this.socketWrapper, this.upgradeInfo, this.webSocketContainer);
//实例化WsSession
this.wsSession = new WsSession(this.ep, this.wsRemoteEndpointServer, this.webSocketContainer, this.handshakeRequest.getRequestURI(), this.handshakeRequest.getParameterMap(), this.handshakeRequest.getQueryString(), this.handshakeRequest.getUserPrincipal(), httpSessionId, this.negotiatedExtensions, this.subProtocol, this.pathParameters, this.secure, this.serverEndpointConfig);
this.wsFrame = new WsFrameServer(this.socketWrapper, this.upgradeInfo, this.wsSession, this.transformation, this.applicationClassLoader);
this.wsRemoteEndpointServer.setTransformation(this.wsFrame.getTransformation());
// 调用handler中的afterConnectionEstablished方法,
this.ep.onOpen(this.wsSession, this.serverEndpointConfig);
this.webSocketContainer.registerSession(this.serverEndpointConfig.getPath(), this.wsSession);
} catch (DeploymentException var10) {
throw new IllegalArgumentException(var10);
} finally {
t.setContextClassLoader(cl);
}
上面第8行onOpen
方法最终会调用afterConnectionEstablished
方法,而onOpen
方法的第一行为:
public void onOpen(final javax.websocket.Session session, EndpointConfig config) {
/**
* 要注意区分这里的wsSession 和 init方法中的wsSession 是不同的对象
* init方法中的wsSession === session
* this.wsSession是StandardWebSocketSession的实例对象
**/
this.wsSession.initializeNativeSession(session);
...
this.handler.afterConnectionEstablished(this.wsSession);
}
public void initializeNativeSession(Session session) {
...
if (this.user == null) {
this.user = session.getUserPrincipal();
}
}
可以看到,handler中Principal user = session.getPrincipal();
返回的user就是WsHttpUpgradeHandler.init
中实例化的WsSession
中的user。
我们再返回到上面WsHttpUpgradeHandler.init
方法中的第4行,实例化WsSession
时,向构造函数传了this.handshakeRequest.getUserPrincipal()
,这就是用户登陆信息。
handshakeRequest
是在preInit
方法中赋值的
public void preInit(Endpoint ep, ServerEndpointConfig serverEndpointConfig, WsServerContainer wsc, WsHandshakeRequest handshakeRequest, List<Extension> negotiatedExtensionsPhase2, String subProtocol, Transformation transformation, Map<String, String> pathParameters, boolean secure) {
this.ep = ep;
this.serverEndpointConfig = serverEndpointConfig;
this.webSocketContainer = wsc;
// 得看看handshakeRequest中的Pringcipal那儿来的
this.handshakeRequest = handshakeRequest;
this.negotiatedExtensions = negotiatedExtensionsPhase2;
this.subProtocol = subProtocol;
this.transformation = transformation;
this.pathParameters = pathParameters;
this.secure = secure;
}
那我们在preInit
方法中打个断点再康康:
UpgradeUtil.doUpgrade
:
// HttpServletRequest req
WsHandshakeRequest wsRequest = new WsHandshakeRequest(req, pathParams);
......
wsHandler.preInit((Endpoint)ep, perSessionServerEndpointConfig, sc, wsRequest, (List)negotiatedExtensionsPhase2, subProtocol, transformation, pathParams, req.isSecure());
public WsHandshakeRequest(HttpServletRequest request, Map<String, String> pathParams) {
this.request = request;
this.queryString = request.getQueryString();
this.userPrincipal = request.getUserPrincipal(); // 破案了
this.httpSession = request.getSession(false);
this.requestUri = buildRequestUri(request);
......
}
至于为什么HttpServletRequest
中有用户信息,那又是另一个故事了。
STOMP协议是一个简单的基于文本的通信协议,用于客户端之间通过消息中间件传递信息。
虽然STOMP协议是基于文本的协议,但它仍然可以用来传递二进制消息。
STOMP是一个基于帧的协议,该协议依赖一个可靠的双向流网络协议,如TCP,WebSocket等。客户端与服务端在网络流中传递STOMP帧,以达到通信的效果。
下面是一个场景的STOMP帧
SEND
destination:/queue/a
content-type:text/plain
hello queue a
^@
第一行:命令,命令一共有以下10种:
ACK
相反receipt-id
头信息,如果服务端已经收到了客户端发送的最后一条消息,那么将返回一个RECEIPT帧,然后客户端再断开连接第二行至倒数第三行:头信息,键值对
倒数第二行:空行,用于将头信息和消息体隔开
最后是消息体,可以是文本也可能是二进制blob
这里只是简单的介绍,详细的介绍可以看官方文档,以后我可能会专门写一篇文章来介绍STOMP协议
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。