赞
踩
http://host:port/myApp/myEndpoint/{server-id}/{session-id}/{transport}
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(myHandler(), "/myHandler").withSockJS();
}
@Bean
public WebSocketHandler myHandler() {
return new MyHandler();
}
}
List<Transport> transports = new ArrayList<>(2);
transports.add(new WebSocketTransport(new StandardWebSocketClient()));
transports.add(new RestTemplateXhrTransport());
SockJsClient sockJsClient = new SockJsClient(transports);
sockJsClient.doHandshake(new MyWebSocketHandler(), "ws://example.com:8080/sockjs");
WebSocket 协议定义了两种类型的消息,文本和二进制,但它们的内容是未定义的。客户端和服务端需要一种子协议(比如更高级的协议)来帮助解释消息。
STOMP是一个简单面向文本的消息传递协议
STOMP是一种基于帧的协议,其帧在HTTP上建模。STOMP帧的框架结构:
COMMAND
header1:value1
header2:value2
Body^@
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; import org.springframework.web.socket.config.annotation.StompEndpointRegistry; @Configuration @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/portfolio").withSockJS(); } @Override public void configureMessageBroker(MessageBrokerRegistry config) { config.setApplicationDestinationPrefixes("/app"); config.enableSimpleBroker("/topic", "/queue"); } }
@Slf4j @Controller public class RestWebSocketController { /** * 发起一条广播消息 * @param principal 身份标识 * @param message 接收的消息体 * @return 广播发送的消息 */ @MessageMapping("/app-test/topic") @SendTo("/topic/ping") public GreetingResponse topic(Principal principal, String message) { log.info("来自{}的消息{}", principal.getName(), message); return new GreetingResponse("Hello,Spring,这是客户端主动拉取的topic消息!"); } /** * 发送消息到当前请求用户 * @param principal 身份标识 * @param message 接收的消息体 * @return 发送到当前请求用户的消息 */ @MessageMapping({"/app-test/user"}) @SendToUser(value = "/ping", broadcast = false) public GreetingResponse queue(Principal principal, String message) { log.info("来自{}的消息{}", principal.getName(), message); return new GreetingResponse("Hello," + principal.getName() + ",这是客户端主动拉取的topic消息!"); } }
@MessageMapping 用在@Controller注解的类中,根据目的地路由消息的方法
@SendTo 自定义要将有效内容发送到的目标,一般用于广播消息
@SendToUser 是仅向与消息关联的用户发送消息
public class WebSocketHelperImpl { @Autowired private SimpMessagingTemplate messagingTemplate; /** * 通过websocket点对点发送单一定阅用户 * * @param subsAdd 用户的定阅地址,不需要拼接前缀和用户id * @param msg 发送的内容,Json字符串格式 * @param userId userId,需要发送的用户SaasCode */ public void sendToUser(String subsAdd, Object msg, String userId) { messagingTemplate.convertAndSendToUser(userId, subsAdd, msg); log.info("Send to user {} through webSocket successful!", userId); } /** * 通过websocket点对点发送多个定阅用户 * * @param subsAdd 用户的定阅地址,不需要拼接前缀和用户id * @param msg 发送的内容,Json字符串格式 * @param userIds 需要发送的用户id数组 */ public void sendToUsers(String subsAdd, Object msg, String[] userIds) { if (userIds != null && userIds.length > 0) { for (String userId : userIds) { sendToUser(subsAdd, msg, userId); } } } /** * 通过websocket广播消息,发给所有定阅用户 * * @param subsAdd 用户的定阅地址 * @param msg 发送的内容,Json字符串格式 */ public void broadCast(String subsAdd, Object msg) { messagingTemplate.convertAndSend(subsAdd, msg); log.info("BroadCast through webSocket successfully!"); } }
/** * 功能简介:基于stomp协议的webSocket配置 * 功能详解:wsTopicEndPoint和wsQueueEndPoint是两个端点,一个用于点对点通信, 另一个用于广播通信; 端点用于建立连接 * topic 和 queue是消息代理地址前缀,一个用于点对点,一个用于广播;这个前缀用于区分消息发送的目的地 * * @author alwaysBrother * @date 2019/12/17 */ @Slf4j @Configuration @EnableConfigurationProperties(WebSocketProperties.class) @EnableWebSocketMessageBroker public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer { @Autowired private WebSocketProperties webSocketProperties; @Autowired private TokenService tokenService; @Override public void configureMessageBroker(MessageBrokerRegistry config) { config.enableSimpleBroker("/topic", "/user"); } @Override public void registerStompEndpoints(StompEndpointRegistry registry) { // 用于广播的endPoint registry.addEndpoint(webSocketProperties.getTopicEndPoint()) .addInterceptors(interceptor) .setHandshakeHandler(handshakeHandler) .setAllowedOrigins("*") .withSockJS(); // 用于点对点通信的endPoint registry.addEndpoint(webSocketProperties.getUserEndPoint()) .addInterceptors(interceptor) .setHandshakeHandler(handshakeHandler) .setAllowedOrigins("*") .withSockJS(); } @Override public void configureWebSocketTransport(WebSocketTransportRegistration registry) { // The default value is 10 seconds (i.e. 10 * 10000). registry.setSendTimeLimit(15 * 1000) // the default value is 512K (i.e. 512 * 1024). .setSendBufferSizeLimit(512 * 1024) // The default value is 64K (i.e. 64 * 1024). .setMessageSizeLimit(64 * 1024); } private HandshakeInterceptor interceptor = new HandshakeInterceptor() { @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) { ServletServerHttpRequest req = (ServletServerHttpRequest) request; String token = req.getServletRequest().getHeader(HeaderConstants.TOKEN); if (StringUtils.isEmpty(token)) { log.error("token is empty, webSocket connect can`t be establish"); return false; } TokenDTO tokenDTO = tokenService.parseToken(token); if (tokenDTO == null || !tokenDTO.getTokenValid() || StringUtils.isEmpty(tokenDTO.getUserId())) { log.error("token is invalid, webSocket connect can`t be establish"); return false; } attributes.put("userId", tokenDTO.getUserId()); return true; } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { } }; private DefaultHandshakeHandler handshakeHandler = new DefaultHandshakeHandler() { @Override protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) { return new UserPrincipal((String) attributes.get("userId")); } }; }
@Slf4j public abstract class AbsWebClient { protected static ThreadFactory sockJsThreadFactory = new ThreadFactoryBuilder() .setNameFormat("socket-client-pool-%d").build(); protected ExecutorService executor = new ThreadPoolExecutor(3, 100, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(100), sockJsThreadFactory, new ThreadPoolExecutor.AbortPolicy()); @LocalServerPort private int port; protected class Connect implements Runnable { /** * 要建立连接需要有token */ private String token; /** * 连接点 */ private String endPoint; /** * 订阅地址 */ private String subAddress; public Connect(String token, String endPoint, String subAddress) { this.token = token; this.endPoint = endPoint; this.subAddress = subAddress; } @Override public void run() { // 客户端 WebSocketStompClient stompClient = createStompClient(); // 订阅、接收消息 StompSessionHandler handler = new PandaQueueSessionHandler(subAddress); // headers WebSocketHttpHeaders headers = new WebSocketHttpHeaders(); headers.add("token", token); stompClient.connect("ws://localhost:{port}/{endPoint}", headers, handler, port, endPoint); } } protected static WebSocketStompClient createStompClient() { List<Transport> transports = new ArrayList<>(); transports.add(new WebSocketTransport(new StandardWebSocketClient())); SockJsClient sockJsClient = new SockJsClient(transports); WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient); stompClient.setMessageConverter(new MappingJackson2MessageConverter()); return stompClient; } /** * 用来建立连接后订阅一个地址,并扩展其他业务 */ class AppQueueSessionHandler extends StompSessionHandlerAdapter { private String subAddress; public PandaQueueSessionHandler(String subAddress) { this.subAddress = subAddress; } @Override public void afterConnected(final StompSession session, StompHeaders connectedHeaders) { session.subscribe(subAddress, new PandaStompFrameHandler()); afterSubscribe(session); } } /** * 用来处理消息 */ static class AppStompFrameHandler implements StompFrameHandler { @Override public Type getPayloadType(StompHeaders headers) { return GreetingResponse.class; } @Override public void handleFrame(StompHeaders headers, Object payload) { GreetingResponse greetingResponse = (GreetingResponse) payload; try { if (greetingResponse != null) { log.info("===============success received===========\n {}", greetingResponse.getContent()); } } catch (Throwable t) { log.info("================failed received~~~~~~~~~~\n", t); } } } protected abstract void afterSubscribe(StompSession session); }
@Slf4j @RunWith(SpringRunner.class) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) public class QueueTest extends AbsWebClient { @Autowired private TokenService tokenService; private String endPoint = "/app-user-endPoint"; private String token = "***"; /** * 两个用户,使用同一个用户id订阅消息,会各自收到一次发送给订阅id用户的消息 * * @throws InterruptedException */ @Test public void broadCastTest() throws InterruptedException { Connect connect1 = new Connect(tokenService.getToken(), endPoint, "/user/S4NbecfYB1ACT6IPAPE5DQ/ping"); Connect connect2 = new Connect(token, endPoint, "/user/S4NbecfYB1ACT6IPAPE5DQ/ping"); executor.execute(connect1); executor.execute(connect2); Thread.sleep(10 * 1000); } /** * 两个用户,使用各自用户id订阅消息,会各自收到订阅id的消息 * * @throws InterruptedException */ @Test public void banBroadCastTest() throws InterruptedException { Connect connect1 = new Connect(tokenService.getToken(), endPoint, "/user/S4NbecfYB1ACT6IPAPE5DQ/ping"); Connect connect2 = new Connect(token, endPoint, "/user/S4NbecfYB1BUUB3Q0232D9/ping"); executor.execute(connect1); executor.execute(connect2); Thread.sleep(10 * 1000); } protected void afterSubscribe(StompSession session) { try { session.send("/app-test/user", "Spring"); } catch (Throwable t) { log.error("client send to server error", t); } } }
@Slf4j @RunWith(SpringRunner.class) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = AppTest.class) public class TopicTest { @Autowired private TokenService tokenService; @LocalServerPort private int port; private WebSocketStompClient stompClient; private final WebSocketHttpHeaders headers = new WebSocketHttpHeaders(); @Before public void setup() { List<Transport> transports = new ArrayList<>(); transports.add(new WebSocketTransport(new StandardWebSocketClient())); SockJsClient sockJsClient = new SockJsClient(transports); this.stompClient = new WebSocketStompClient(sockJsClient); this.stompClient.setMessageConverter(new MappingJackson2MessageConverter()); } /** * 测试一个连接 * @throws Exception */ @Test public void topicTest() throws Exception { final CountDownLatch latch = new CountDownLatch(1); final AtomicReference<Throwable> failure = new AtomicReference<>(); StompSessionHandler handler = new PandaTestSessionHandler(failure, latch); headers.add("token", tokenService.getToken()); this.stompClient.connect("ws://localhost:{port}/app-topic-endPoint", this.headers, handler, this.port); if (latch.await(5, TimeUnit.SECONDS)) { if (failure.get() != null) { throw new AssertionError("not receive after 5 seconds", failure.get()); } } else { fail("Greeting not received"); } } private static class PandaTestSessionHandler extends StompSessionHandlerAdapter { private final AtomicReference<Throwable> failure; private final CountDownLatch latch; public PandaTestSessionHandler(AtomicReference<Throwable> failure, CountDownLatch latch) { this.failure = failure; this.latch = latch; } @Override public void afterConnected(StompSession session, StompHeaders connectedHeaders) { session.subscribe("/topic/ping", new StompFrameHandler() { @Override public Type getPayloadType(StompHeaders headers) { return GreetingResponse.class; } @Override public void handleFrame(StompHeaders headers, Object payload) { GreetingResponse greetingResponse = (GreetingResponse) payload; try { if (greetingResponse != null) { log.info("收到服务端回复消息:{}", greetingResponse.getContent()); assertEquals("Hello,Spring,这是客户端主动拉取的topic消息!", greetingResponse.getContent()); } } catch (Throwable t) { failure.set(t); } finally { session.disconnect(); latch.countDown(); } } }); try { session.send("/app-test/topic", "Spring"); // 以"/topic"开头,可以路由到带注释的控制器中的@MessageMapping方法,而"/topic"和"/queue"消息可以直接路由到消息 broker。 // session.send("/topic/ping", "Spring"); } catch (Throwable t) { failure.set(t); latch.countDown(); } } } }
使用多台机器作为客户端客,每台机器创建线程池,每个现场建立一个和服务器的长连接,经过测试最大可以建立一万个连接。
Websocket框架代码中消息大小的限制和消息发送超时时间都是可配置的。理论上 WebSocket 消息的大小几乎是无限的,但实际上WebSocket服务器强加了限制 -例如,Tomcat上的 8K和 Jetty上的 64K。因此,诸如 stomp.js的 STOMPclients 在 16K边界处拆分较大的 STOMP消息,并将它们作为多个 WebSocket消息发送,因此需要服务器缓冲和重新组装。
消息推送频率和消息体大小关系(与网络状况、硬件环境有关,参考即可)
本文参考
https://docs.spring.io/spring/docs/4.3.3.RELEASE/spring-framework-reference/htmlsingle/#websocket-intro-sub-protocol
https://stomp.github.io/stomp-specification-1.2.html
https://baike.baidu.com/item/WebSocket/1953845?fr=aladdin
https://spring.io/blog/2012/05/08/spring-mvc-3-2-preview-techniques-for-real-time-updates
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。