当前位置:   article > 正文

SpringBoot中引入WebSocket

SpringBoot中引入WebSocket

SpringBoot中引入WebSocket

1、引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

2、创建自己的处理类

该类可以实现org.springframework.web.socket.WebSocketHandler,也可以继承org.springframework.web.socket.handler.TextWebSocketHandlerorg.springframework.web.socket.handler.BinaryWebSocketHandler

它们的继承关系如下图所示:

image-20210817230345922

WebSocketHandler
public interface WebSocketHandler {
    // 建立连接之后调用
    void afterConnectionEstablished(WebSocketSession var1) throws Exception;
	// 当新的WebSocket消息到达时调用
    void handleMessage(WebSocketSession var1, WebSocketMessage<?> var2) throws Exception;
	// 处理消息传输异常
    void handleTransportError(WebSocketSession var1, Throwable var2) throws Exception;
	// 在WebSocket连接被任意一方关闭后,或者发生传输错误之后调用
    void afterConnectionClosed(WebSocketSession var1, CloseStatus var2) throws Exception;
	/* 
	 * WebSocket 是否支持处理部分消息,
	 * 若此标志设置为true,并且底层WebSocket服务器支持部分消息,
	 * 则大的 WebSocket 消息或未知大小的消息可能会被拆分并可能通过对handleMessage(WebSocketSession, WebSocketMessage)多次调用接收。
     * 标志WebSocketMessage.isLast()指示消息是否部分以及它是否是最后一部分
     */
    boolean supportsPartialMessages();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
AbstractWebSocketHandler
public abstract class AbstractWebSocketHandler implements WebSocketHandler {

	@Override
	public void afterConnectionEstablished(WebSocketSession session) throws Exception {
	}

	@Override
	public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
		if (message instanceof TextMessage) {
			handleTextMessage(session, (TextMessage) message);
		}
		else if (message instanceof BinaryMessage) {
			handleBinaryMessage(session, (BinaryMessage) message);
		}
		else if (message instanceof PongMessage) {
			handlePongMessage(session, (PongMessage) message);
		}
		else {
			throw new IllegalStateException("Unexpected WebSocket message type: " + message);
		}
	}

	protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
	}

	protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
	}

	protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception {
	}

	@Override
	public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
	}

	@Override
	public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
	}

	@Override
	public boolean supportsPartialMessages() {
		return false;
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

可以看到除了handleMessage,其它方法都空方法,这些方法需我们在具体的业务中去实现。

该类还定义了三个方法: handleTextMessage,handleBinaryMessage,handlePongMessage,分别用来处理TextMessage,BinaryMessagePongMessage,下图是WebSocketMessage的类图:

image-20210817235003132

TextMessage的范型参数是StringBinaryMessagePonMessage的范型参数都是ByteBuffer

TextWebSocketHandler
public class TextWebSocketHandler extends AbstractWebSocketHandler {

	@Override
	protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) {
		try {
			session.close(CloseStatus.NOT_ACCEPTABLE.withReason("Binary messages not supported"));
		}
		catch (IOException ex) {
			// ignore
		}
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

可以看到,该处理类重写了handlerBinaryMessage方法,如果是使用该处理类,那么就只能处理字符信息,若是接收的是字节信息,将会关闭该连接。

BinaryWebSocketHandler
public class BinaryWebSocketHandler extends AbstractWebSocketHandler {

	@Override
	protected void handleTextMessage(WebSocketSession session, TextMessage message) {
		try {
			session.close(CloseStatus.NOT_ACCEPTABLE.withReason("Text messages not supported"));
		}
		catch (IOException ex) {
			// ignore
		}
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

TextWebSocketHandler类同理,该处理类只能处理字节信息,若是接收到了字符信息,将会断开该连接。

根据业务的不同,若是只涉及到文本信息,那么我们可以继承TextWebSocketHandler;若是只需要传递二进制信息,那么可以继承BinaryWebSocketHandler;如果两种信息都有的话,可以继承AbstractWebSocketHandler或实现WebSocketHandler接口。

下面以继承TextWebSocketHandler为例:

public class MyHandler extends TextWebSocketHandler {
    private List<WebSocketSession> sessions = new ArrayList<>();
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        /**
         * 广播消息
         */
        sessions.forEach(s -> {
                if (!s.equals(session)){
                    try {
                        s.sendMessage(message);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
        });
    }

    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        /**
         * 保存会话
         */
        sessions.add(session);
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        /**
         *	删除会话
         */
        sessions.remove(session);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

我们需要实现上面的3个方法,分别在建立连接时保存会话,收到消息时,将消息广播,然后再断开连接时删除相应的会话。

3、配置

@Configuration
@EnableWebSocket
public class WebsocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) {
        webSocketHandlerRegistry.addHandler(myWebsocketHandler(),"myWS");
    }

    public WebSocketHandler myWebsocketHandler(){
        return new MyHandler();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 使用@EnableWebSocket注解

  • 实现WebSocketConfigurer

    该接口只定义了一个接口:

    /**
     * Defines callback methods to configure the WebSocket request handling
     * via {@link org.springframework.web.socket.config.annotation.EnableWebSocket @EnableWebSocket}.
     *
     * @author Rossen Stoyanchev
     * @since 4.0
     */
    public interface WebSocketConfigurer {
    
    	/**
    	 * Register {@link WebSocketHandler WebSocketHandlers} including SockJS fallback options if desired.
    	 */
    	void registerWebSocketHandlers(WebSocketHandlerRegistry registry);
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    正如注释所说,通过@EnableWebSocket,定义一个回调方法来配置WebSocket的请求处理类。

    在demo中,我们注册了一个自定义处理类,来处理来自/myWS的WebSocket消息。

至此,我们就可以使用Websocket进行消息广播了。

4、点对点通信

如果我们想要做到点对点的通信,那么我们需要将会话与客户端的关联关系保存下来。

我们可以通过以下方式获取在建立WebSocket连接的时候获取当前会话的登陆信息:

@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
    Principal user = session.getPrincipal(); // 获取
    log.info("User {} has connected",user.getName());
    sessions.add(session);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

用户信息都能获取到了,那点对点通信自然就不再话下了。

为什么WebSocketSession中会有用户信息呢?

我们在afterConnectionEstablished方法中打个断点,栈帧如图所示:

image-20210825232443013

我在WsHttpUpgradeHandler.init中发现了下面这行代码:

try {
    this.wsRemoteEndpointServer = new WsRemoteEndpointImplServer(this.socketWrapper, this.upgradeInfo, this.webSocketContainer);
    //实例化WsSession
    this.wsSession = new WsSession(this.ep, this.wsRemoteEndpointServer, this.webSocketContainer, this.handshakeRequest.getRequestURI(), this.handshakeRequest.getParameterMap(), this.handshakeRequest.getQueryString(), this.handshakeRequest.getUserPrincipal(), httpSessionId, this.negotiatedExtensions, this.subProtocol, this.pathParameters, this.secure, this.serverEndpointConfig);
    this.wsFrame = new WsFrameServer(this.socketWrapper, this.upgradeInfo, this.wsSession, this.transformation, this.applicationClassLoader);
    this.wsRemoteEndpointServer.setTransformation(this.wsFrame.getTransformation());
    // 调用handler中的afterConnectionEstablished方法, 
    this.ep.onOpen(this.wsSession, this.serverEndpointConfig);
    this.webSocketContainer.registerSession(this.serverEndpointConfig.getPath(), this.wsSession);
} catch (DeploymentException var10) {
    throw new IllegalArgumentException(var10);
} finally {
    t.setContextClassLoader(cl);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

上面第8行onOpen方法最终会调用afterConnectionEstablished方法,而onOpen方法的第一行为:

public void onOpen(final javax.websocket.Session session, EndpointConfig config) {
    /**
     * 要注意区分这里的wsSession 和 init方法中的wsSession 是不同的对象
     * init方法中的wsSession === session
     * this.wsSession是StandardWebSocketSession的实例对象
     **/
    this.wsSession.initializeNativeSession(session);
    ...
    this.handler.afterConnectionEstablished(this.wsSession);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
public void initializeNativeSession(Session session) {
    ...
    if (this.user == null) {
        this.user = session.getUserPrincipal(); 
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

可以看到,handler中Principal user = session.getPrincipal();返回的user就是WsHttpUpgradeHandler.init中实例化的WsSession中的user。

我们再返回到上面WsHttpUpgradeHandler.init方法中的第4行,实例化WsSession时,向构造函数传了this.handshakeRequest.getUserPrincipal(),这就是用户登陆信息。

handshakeRequest是在preInit方法中赋值的

public void preInit(Endpoint ep, ServerEndpointConfig serverEndpointConfig, WsServerContainer wsc, WsHandshakeRequest handshakeRequest, List<Extension> negotiatedExtensionsPhase2, String subProtocol, Transformation transformation, Map<String, String> pathParameters, boolean secure) {
    this.ep = ep;
    this.serverEndpointConfig = serverEndpointConfig;
    this.webSocketContainer = wsc;
    // 得看看handshakeRequest中的Pringcipal那儿来的
    this.handshakeRequest = handshakeRequest;
    this.negotiatedExtensions = negotiatedExtensionsPhase2;
    this.subProtocol = subProtocol;
    this.transformation = transformation;
    this.pathParameters = pathParameters;
    this.secure = secure;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

那我们在preInit方法中打个断点再康康:

image-20210826001713143

UpgradeUtil.doUpgrade

// HttpServletRequest req
WsHandshakeRequest wsRequest = new WsHandshakeRequest(req, pathParams);
......
wsHandler.preInit((Endpoint)ep, perSessionServerEndpointConfig, sc, wsRequest, (List)negotiatedExtensionsPhase2, subProtocol, transformation, pathParams, req.isSecure());
  • 1
  • 2
  • 3
  • 4
public WsHandshakeRequest(HttpServletRequest request, Map<String, String> pathParams) {
    this.request = request;
    this.queryString = request.getQueryString();
    this.userPrincipal = request.getUserPrincipal(); // 破案了
    this.httpSession = request.getSession(false);
    this.requestUri = buildRequestUri(request);
    ......
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

至于为什么HttpServletRequest中有用户信息,那又是另一个故事了。

SpringBoot + STOMP

STOMP协议介绍

  • STOMP协议是一个简单的基于文本的通信协议,用于客户端之间通过消息中间件传递信息。

  • 虽然STOMP协议是基于文本的协议,但它仍然可以用来传递二进制消息。

  • STOMP是一个基于的协议,该协议依赖一个可靠的双向流网络协议,如TCP,WebSocket等。客户端与服务端在网络流中传递STOMP帧,以达到通信的效果。

  • 下面是一个场景的STOMP帧

    • SEND
      destination:/queue/a
      content-type:text/plain
      
      hello queue a
      ^@
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6

      第一行:命令,命令一共有以下10种:

      • CONNECT:客户端向服务端发送该命令来与客户端建立连接
      • SEND:客户端向目的地(Destination) 发送消息
      • SUBSCRIBE:客户端订阅某个目的地
      • UNSUBSCRIBE:客户端取消订阅某个目的地
      • BEGIN:用于开始一次通信
      • COMMIT:用于提交一次通信
      • ABORT:回滚通信
      • ACK:消息同步,用户向目的地报告收到某个消息
      • NACK:与ACK相反
      • DISCONNECT:客户端关闭与客户端的连接
        • 优雅断开连接的方式:客户端向服务器发送DISCONNECT命令,并带有receipt-id头信息,如果服务端已经收到了客户端发送的最后一条消息,那么将返回一个RECEIPT帧,然后客户端再断开连接

      第二行至倒数第三行:头信息,键值对

      倒数第二行:空行,用于将头信息和消息体隔开

      最后是消息体,可以是文本也可能是二进制blob

    这里只是简单的介绍,详细的介绍可以看官方文档,以后我可能会专门写一篇文章来介绍STOMP协议

    声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/242036
推荐阅读
相关标签