当前位置:   article > 正文

SpringBoot集成websocket(2)|(websocket服务端实现以及websocket中转实现)

websocket中转

SpringBoot集成websocket(2)|(websocket服务端实现以及websocket转发实现)


章节
第一章链接: SpringBoot集成websocket(1)|(websocket客户端实现)
第二章链接: SpringBoot集成websocket(2)|(websocket服务端实现以及websocket中转实现)

前言

本节主要介绍的是springboot实现websocket的客户端服务端,以及作为中转服务实现客户端长连接服务端,服务端长连接第三方websocket服务的数据传输。

一、websocket服务端依赖引入

   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-websocket</artifactId>
   </dependency>
  • 1
  • 2
  • 3
  • 4

二、websocket服务代码实现

1.WebSocketConfig配置

springboot接入websocket需要启用对应的配置

@Configuration
@EnableWebSocket
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return new ServerEndpointExporter();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

2.WebSocketServer服务实现

springboot提供对外的websocket接口实现


/**
 * @author Oak
 * Created on 2023/5/15.
 * Description:
 */
@Component
@Data
@Slf4j
@ServerEndpoint(value = "/ws/{name}")
public class WebSocketServer {
	// 调用第三方接口插件
    @Autowired
    private GptRestApi gptRestApi;

    @Autowired
    public MilvusLargeService largeService;

    @Autowired
    public DocumentParagraphRepository paragraphRepository;


    /**
     * 与某个客户端的连接对话,需要通过它来给客户端发送消息
     */
    private Session session;

    /**
     * 标识当前连接客户端的用户名
     */
    private String name;

    /**
     * 用于存所有的连接服务的客户端,这个对象存储是安全的
     */
    private static ConcurrentHashMap<String, WebSocketServer> webSocketSet = new ConcurrentHashMap<>();

    /**
     * 用于存所有的连接第三方ws服务的客户端
     */
    private static ConcurrentHashMap<String, WebSocketClient> wsClientSet = new ConcurrentHashMap<>();


    @OnOpen
    public void OnOpen(Session session, @PathParam(value = "name") String name) {
        this.session = session;
        this.name = name;
        webSocketSet.put(name, this);
        WebSocketClient webSocketClient = getWebSocketClient(session);
        wsClientSet.put(name, webSocketClient);
        log.info("[web -> server] 连接成功,当前连接人数为:={}", webSocketSet.size());
    }


    @OnClose
    public void OnClose() {
        webSocketSet.remove(this.name);
        wsClientSet.remove(this.name);
        log.info("[web -> server] 退出成功,当前连接人数为:={}", webSocketSet.size());
    }

    @OnMessage
    public void OnMessage(String message) {
        log.info("[web -> server] 收到消息:{}", message);
        SparkWSClient sparkWSClient = (SparkWSClient) wsClientSet.get(name);
        // ping 消息响应 pong
        if ("ping".equals(message)) {
            try {
                session.getBasicRemote().sendText("pong");
                return;
            } catch (IOException e) {
                log.error("消息发送失败");
            }
        } else {
        // 业务消息
            if (sparkWSClient != null && sparkWSClient.isOpen()) {
                    sparkWSClient.send(message);
                } catch (Exception e) {
                    log.warn("异常:{}", e.getMessage());
                }
                return;
            } else { // 重新连接spark websocket
                WebSocketClient webSocketClient = getWebSocketClient(session);
                wsClientSet.put(name, webSocketClient);
            }
        }
    }

    /**
     * 群发
     *
     * @param message
     */
    public void GroupSending(String message) {
        for (String name : webSocketSet.keySet()) {
            try {
                webSocketSet.get(name).session.getBasicRemote().sendText(message);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

// 第三方websocket接口,自己可以模拟写一个
    public WebSocketClient getWebSocketClient(Session session) {
        String chaturl = "ws://xxx/chat";
        UserSession build1 = UserSession.builder()
                .session(session)
                .name(name)
                .build();
        WebSocketClient client = new SparkWSClient(URI.create(chaturl ), build1);
        client.connect();
        return client;
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117

userSession代码如下

@Data
@Builder
public class UserSession {
    public  Session session;
    public String name;

    public UserSession(Session session,String name) {
        this.session = session;
        this.name = name;
    }

    public void sendUserMsg(String msg) {
        sendUserMsg(msg, false);
    }

    public void sendUserMsg(String msg, boolean close) {
        synchronized (session) {
            if (!session.isOpen()) {
                return;
            }
            try {
                session.getBasicRemote().sendText(msg);
                if (close) {
                    session.close();
                }
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

3.WebSocketClient连接第三方客户端实现

springboot提供对第三方websocket连接的客户端
实现代码如下

思路,两个websocket各自监听自己的数据,想要串起来可以通过消息中间件来实现,但这样无疑会增加系统的复杂性,还可以通过注入Session的方式,与客户端建立连接后,可以通过session进行数据推送,这个时候只要在 连接第三方的websocket的时候注入这个session兑现。那调用第三方的websocketClient就能通过session 将监听到的数据直接返回给客户端。

@Slf4j
public class SparkWSClient extends WebSocketClient {

    UserSession userSession;

    public SparkWSClient(URI serverUri, UserSession session) {
        super(serverUri);
        this.userSession = session;
    }

    @Override
    public void onOpen(ServerHandshake serverHandshake) {
        log.info("[server-> spark] 连接成功");
    }

    @Override
    public void onMessage(String message) {
        log.debug("[server-> spark] 收到消息={}...", message);
        //收到远程的第三方返回数据,这里通过注入的session直接返回给客户端
        userSession.sendUserMsg(message);
    }

    @Override
    public void onClose(int i, String s, boolean b) {
        log.debug("[server-> spark] 退出连接");
        String errMsg = String.format(MVS_ERR_MSG_FORMAT, userSession.getName(), "spark connection closed!");
        userSession.sendUserMsg(errMsg, true);
    }

    @Override
    public void onError(Exception e) {
        //转发给用户端
        String errMsg = String.format(MVS_ERR_MSG_FORMAT, userSession.getName(), "spark connection refused!");
        userSession.sendUserMsg(errMsg, true);
        log.error("[server-> spark] 连接错误={}", e.getMessage());
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

总结

本文主要介绍websocket客户端、服务端的实现,以及客户端通过websocket连接服务端,服务端连接第三方的websocket接口,并且将第三个方数据直接返回给客户端,起一个中间websockey代理服务作用,实现数据的中转。

第一章链接: SpringBoot集成websocket(1)|(websocket客户端实现)
第二章链接: SpringBoot集成websocket(2)|(websocket服务端实现以及websocket中转实现)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/356679
推荐阅读
相关标签
  

闽ICP备14008679号