赞
踩
如果觉得有用的话,麻烦点赞+关注!
进入下面小程序可以体验效果:
websocket是一个全双工通信协议,允许socket客户端和socket服务端双向推送数据进行交互。大部分都是在后端做socket服务端的搭建,前端作为socket客户端进行访问。但是也会有一种情况,需要来自后端内部的消息发送给socket服务端然后再推给其他socket客户端,在springboot的实现方式中,并不需要自身去实现过程,有本身已封装完善的方案。如下:
需要引入maven 依赖
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-websocket</artifactId>
- </dependency>
配置socket 服务端bean对象管理
- import lombok.extern.log4j.Log4j2;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.web.socket.server.standard.ServerEndpointExporter;
-
- import javax.websocket.server.ServerEndpointConfig;
-
- @Log4j2
- @Configuration
- public class WebSocketConfig extends ServerEndpointConfig.Configurator {
-
- @Bean
- public ServerEndpointExporter serverEndpointExporter(){
- return new ServerEndpointExporter();
- }
-
- }
服务端消息处理
ICacheService 使用的是redis发布订阅功能,使用的是redission技术实现。
redis 使用redisson缓存api模板_Garc的博客-CSDN博客
- import com.fusionfintrade.cache.ICacheService;
- import com.fusionfintrade.config.WebSocketConfig;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Component;
- import org.springframework.util.CollectionUtils;
-
- import javax.annotation.PostConstruct;
- import javax.annotation.Resource;
- import javax.websocket.*;
- import javax.websocket.server.PathParam;
- import javax.websocket.server.ServerEndpoint;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.CopyOnWriteArraySet;
-
- /**
- * @author Garcia
- * 集群原理:
- * 使用reids队列发布消息至各个集群节点
- * 各个节点监听redis对应的key消息,并将消息散发至各个节点对应的客户端中。
- */
- @Component
- @ServerEndpoint(value = "/ws/{from}", configurator = WebSocketConfig.class)
- @Slf4j
- public class SocketServer {
-
- @Resource
- private ICacheService cacheService;
-
- private static ConcurrentHashMap<String, CopyOnWriteArraySet<Session>> socketMap = new ConcurrentHashMap<>();
-
- private SocketServer socketServer;
-
- /**
- * 监听redis消息,并将对应的key消息使用socket发散至前端对应的功能
- */
- @PostConstruct
- private void init(){
- socketServer = this;
- cacheService.addListener(SocketClientEnum.RISK_WEB.getKey(), String.class,(channel, msg) ->socketServer.singleSendMessage(SocketClientEnum.RISK_WEB.getKey(),msg));
- }
-
- @OnOpen
- public void onOpen(Session session, @PathParam("from") String from){
- //将session会话保存,根据来源存。
- CopyOnWriteArraySet<Session> sessionSet = socketMap.get(from);
- if (CollectionUtils.isEmpty(sessionSet)){
- sessionSet = new CopyOnWriteArraySet<>();
- sessionSet.add(session);
- socketMap.put(from,sessionSet);
- }else {
- sessionSet.add(session);
- }
- log.info("连接:{},[{}]集当前连接数:{}",session.getRequestURI(),from,sessionSet.size());
- }
-
- @OnMessage
- public void onMessage(String message,Session session){
- //接收其他客户端的socket消息,并发给指定客户端socket,key可以通过再message中携带进来
- // sendMessage(key,message);
- }
-
- @OnClose
- public void onClose(Session session, @PathParam("from") String from){
- CopyOnWriteArraySet<Session> sessions = socketMap.get(from);
- sessions.remove(session);
- }
-
- @OnError
- public void onError(Throwable e, @PathParam("from") String from){
- log.error("{}socket连接异常",from,e);
- }
-
- /**
- * 集群节点发布消息
- * key:对应功能的客户端
- * message:消息内容
- * @param key
- * @param message
- */
- public void clusterSendMessage(String key,String message){
- cacheService.publish(key,message);
- }
-
- /**
- * 单节点发布消息
- * key:对应功能的客户端
- * message:消息内容
- * @param message
- */
- public void singleSendMessage(String key,String message){
- try {
- CopyOnWriteArraySet<Session> session = socketMap.get(key);
- if (CollectionUtils.isEmpty(session)){
- return;
- }
- session.stream().parallel().forEach(s ->{
- try {
- s.getBasicRemote().sendText(message);
- } catch (Exception e) {
- log.error("websocket消息推送异常",e);
- }
- });
- }catch (Exception e){
- log.error("socket发送消息失败",e);
- }
- }
- }
客户端需要创建连接后才能使用,但是只需要创建一次即可,所以可以使用spring管理这个对象,或者跟我这个一样,写成单例模式,随时随地抽取使用。真正使用的方法是sendMessage
- import lombok.extern.log4j.Log4j2;
-
- import javax.websocket.*;
- import java.io.IOException;
- import java.net.URI;
-
- @Log4j2
- @ClientEndpoint
- public class SocketClient {
-
- private static SocketClient instance;
-
- private SocketClient(){}
-
- private Session session;
-
- public static synchronized SocketClient getInstance(){
- if (instance == null){
- instance = new SocketClient();
- }
- instance.create();
- return instance;
- }
-
- private void create(){
- if (session==null||!session.isOpen()){
- try {
- WebSocketContainer container = ContainerProvider.getWebSocketContainer();
- container.connectToServer(instance,new URI("ws://127.0.0.1:18110/g-alarm/ws/system"));
- }catch (Exception e){
- log.error("socket client init error",e);
- }
- }
- }
-
- @OnOpen
- public void onOpen(Session session){
- this.session = session;
- }
-
- @OnMessage
- public void onMessage(String message,Session session){
-
- }
-
- @OnClose
- public void onClose(Session session, CloseReason closeReason){
-
- }
-
- @OnError
- public void onError(Throwable e){
- log.error("socket连接异常",e);
- }
-
- public void sendMessage(String message){
- synchronized (session){
- try {
- session.getBasicRemote().sendText(message);
- } catch (IOException e) {
- log.error("socket客户端发送消息异常");
- }
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。