赞
踩
服务端注册的客户端的列表;服务端向客户端发送广播消息;服务端向指定客户端发送消息;服务端向多个客户端发送消息;客户端给服务端发送消息;
效果:
jdk:1.8
SpringBoot:2.4.17
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
@EnableWebSocket
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; /** * date created : Created in 2024/3/18 16:57 * description : WebSocketConfig 主要解决使用了@ServerEndpoint注解的websocket endpoint不被springboot扫描到的问题 * class name : WebSocketConfig */ @Configuration public class WebSocketConfig { /** * 注入ServerEndpointExporter, * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint */ @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
/** * date created : Created in 2024/3/18 16:31 * description : 服务端实现,方法的封装 * class name : WebSocketServer */ @Component @Slf4j @ServerEndpoint("/websocket/{applicationName}") public class WebSocketServer { //与某个客户端的连接会话,需要通过它来给客户端发送数据 private Session session; // 应用名称 private String applicationName; //虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。 private static final CopyOnWriteArraySet<WebSocketServer> webSockets = new CopyOnWriteArraySet<>(); // 用来存在线连接用户信息 private static final ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<>(); /** * 链接成功调用的方法 */ @OnOpen public void onOpen(Session session, @PathParam(value = "applicationName") String applicationName) { try { this.session = session; this.applicationName = applicationName; webSockets.add(this); sessionPool.put(applicationName, session); log.info("【websocket消息】有新的连接,总数为:" + webSockets.size()); log.info("【当前客户端列表】:"+ sessionPool.keySet()); } catch (Exception e) { } } /** * description : 有连接断开之后的处理方法 * method name : onClose * param : [] * return : void */ @OnClose public void onClose() { try { webSockets.remove(this); sessionPool.remove(this.applicationName); log.info("【websocket消息】连接断开,总数为:" + webSockets.size()); log.info("【当前客户端列表】:"+ sessionPool.keySet()); } catch (Exception e) { } } /** * description : 收到客户端消息的处理方法 * method name : onMessage * param : [message] * return : void */ @OnMessage public void onMessage(String message) { log.info("【websocket消息】收到客户端消息:" + message); } /** * description : 错误处理 * method name : onError * param : [session, error] * return : void */ @OnError public void onError(Session session, Throwable error) { log.error("用户错误,原因:" + error.getMessage()); error.printStackTrace(); } /** * description : 广播消息 给所有注册的客户端发送消息 * method name : sendBroadcastMessage * param : [message] * return : void */ public void sendBroadcastMessage(String message) { log.info("【websocket消息】广播消息:" + message); for (WebSocketServer webSocket : webSockets) { try { if (webSocket.session.isOpen()) { webSocket.session.getAsyncRemote().sendText(message); } } catch (Exception e) { e.printStackTrace(); } } } /** * description : 给指定的客户端发送消息 * method name : sendApplicationMessage * param : [applicationName 客户端的应用名称, message 要发送的消息] * return : void */ public void sendApplicationMessage(String applicationName, String message) { Session session = sessionPool.get(applicationName); if (session != null && session.isOpen()) { try { log.info("【websocket消息】 单点消息:" + message); session.getAsyncRemote().sendText(message); } catch (Exception e) { e.printStackTrace(); } } } /** * description : 给多个客户端发送消息 * method name : sendMassApplicationMessage * param : [applicationNames 注册的客户端的应用名称, message 要发送的消息] * return : void */ public void sendMassApplicationMessage(String[] applicationNames, String message) { for (String userId : applicationNames) { Session session = sessionPool.get(userId); if (session != null && session.isOpen()) { try { log.info("【websocket消息】 单点消息:" + message); session.getAsyncRemote().sendText(message); } catch (Exception e) { e.printStackTrace(); } } } } }
yaml文件的末尾添加
# websocket的配置
websocket:
host: localhost
port: 19022
prefix: websocket
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; /** * date created : Created in 2024/3/19 14:36 * description : 注入配置文件中的参数 并生成服务端的对应的url * class name : WebSocketProperties */ @Data @AllArgsConstructor @NoArgsConstructor @Component @ConfigurationProperties(prefix = "websocket") @Configuration public class WebSocketProperties { @Value("${spring.application.name}") String appName; String host; String port; String prefix; public String getUrl() { return String.format("ws://%s:%s/%s/%s", host, port, prefix,appName); } }
import org.springframework.boot.autoconfigure.AutoConfigureBefore; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.stereotype.Component; import javax.websocket.ClientEndpoint; import javax.websocket.*; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; /** * date created : Created in 2024/3/18 16:36 * description : 客户端接收服务端的实时消息、发送消息等方法的封装 * class name : WebSocketClient */ @ClientEndpoint @AutoConfigureBefore(WebSocketProperties.class) @Component @Import(WebSocketProperties.class) @Configuration public class WebSocketClient { private Session session; public WebSocketClient() { try { WebSocketProperties webSocketProperties = SpringUtils.getBean(WebSocketProperties.class); WebSocketContainer container = ContainerProvider.getWebSocketContainer(); container.connectToServer(this, new URI(webSocketProperties.getUrl())); } catch (DeploymentException | URISyntaxException | IOException e) { e.printStackTrace(); } } @OnOpen public void onOpen(Session session) { this.session = session; System.out.println("Connected to server"); } @OnMessage public String onMessage(String message) { System.out.println("来自WebSocket的消息: " + message); return message; } @OnClose public void onClose() { System.out.println("Disconnected from server"); } public void register() { try { session.getBasicRemote().sendText("register"); System.out.println("Registered with server"); } catch (IOException e) { e.printStackTrace(); } } public void unregister() { try { session.getBasicRemote().sendText("unregister"); System.out.println("Unregistered from server"); } catch (IOException e) { e.printStackTrace(); } } }
使用@Autowired注入配置类无法注入,使用工具类获取,工具类:
* Copyright (c) 2020 pig4cloud Authors. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; @Component public class SpringUtils implements ApplicationContextAware { private static ApplicationContext context; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { context = applicationContext; } public static Object getBean(String name) { return context.getBean(name); } public static <T> T getBean(Class<T> clazz) { return context.getBean(clazz); } public static <T> T getBean(String name, Class<T> clazz) { return context.getBean(name, clazz); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。