赞
踩
WebSocket是一种在Web应用程序中实现实时双向通信的技术。它允许服务器主动向客户端推送消息,而不需要客户端发起请求。在Spring WebFlux中,我们可以使用WebSocketHandler
接口来处理WebSocket连接和消息。
在本篇博客中,我们将介绍如何使用MyWebSocketHandler2
类来构建一个简单的WebSocket处理器,实现实时聊天和文件上传功能。
首先,我们创建一个名为MyWebSocketHandler2
的Java类,并实现WebSocketHandler
接口。它是一个Spring组件,用于处理WebSocket连接和消息。
@Component
@Slf4j
public class MyWebSocketHandler2 implements WebSocketHandler {
// ...
@NotNull
@Override
public Mono<Void> handle(WebSocketSession session) {
// 实现WebSocket连接和消息的处理逻辑
// ...
}
// ...
}
MyWebSocketHandler2
类使用@Component
注解将其声明为Spring组件,以便能够在应用程序中自动进行依赖注入。
在handle
方法中,我们首先处理WebSocket连接的逻辑。当有新的WebSocket连接建立时,会调用handle
方法,并将WebSocketSession
作为参数传递进来。
@NotNull @Override public Mono<Void> handle(WebSocketSession session) { // 生成唯一的会话ID UUID uuid = UUID.randomUUID(); String uuidStr = uuid.toString(); // 获取连接的URI和查询参数 URI uri = session.getHandshakeInfo().getUri(); Map<String, String> queryMap = getQueryMap(uri.getQuery()); String group = queryMap.get("group"); String username = queryMap.get("username"); // 处理连接逻辑 // ... return Mono.empty(); }
在上述代码中,我们首先生成一个唯一的会话ID,并从WebSocket连接的URI中获取查询参数。查询参数可以包含group
和username
,用于标识连接所属的组和用户名。
接下来,我们将处理WebSocket消息的逻辑添加到handle
方法中。我们使用session.receive()
方法来接收来自客户端的消息,并根据消息类型进行不同的处理。
@NotNull @Override public Mono<Void> handle(WebSocketSession session) { // ... return session.receive() .flatMap(message -> { if (message.getType().equals(WebSocketMessage.Type.TEXT)) { // 处理文本消息 String payload = message.getPayloadAsText(); // ... } else if (message.getType().equals(WebSocketMessage.Type.BINARY)) { // 处理二进制消息 // ... } else if (message.getType().equals(WebSocketMessage.Type.PING)) { // 处理PING消息 // ... } else if (message.getType().equals(WebSocketMessage.Type.PONG)) { // 处理PONG消息 // ... } return Mono.empty(); }) .then(); }
在上述代码中,我们使用flatMap
操作符处理接收到的消息。根据消息类型的不同,我们可以执行不同的逻辑,例如处理文本消息、处理二进制消息、处理PING消息或处理PONG消息。
除了接收消息外,我们还可以使用session.send()
方法向客户端发送消息。在处理完接收到的消息后,我们可以使用session.send(Flux)
方法将响应消息发送给客户端。
@NotNull @Override public Mono<Void> handle(WebSocketSession session) { // ... return session.receive() .flatMap(message -> { if (message.getType().equals(WebSocketMessage.Type.TEXT)) { // 处理文本消息 String payload = message.getPayloadAsText(); // ... // 发送响应消息 return session.send(Flux.just(session.textMessage("Response"))); } return Mono.empty(); }) .then(); ## 5. 注册WebSocket处理器 要在Spring WebFlux应用程序中使用WebSocket处理器,我们需要将其注册到`WebSocketHandlerAdapter`中。 ```java @Configuration @EnableWebSocket public class WebSocketConfig implements WebSocketConfigurer { @Autowired private MyWebSocketHandler2 myWebSocketHandler; @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(myWebSocketHandler, "/websocket") .setAllowedOrigins("*"); } }
在上述代码中,我们创建了一个名为WebSocketConfig
的配置类,并实现了WebSocketConfigurer
接口。在registerWebSocketHandlers
方法中,我们使用registry.addHandler()
方法将MyWebSocketHandler2
注册为WebSocket处理器,并指定了处理的URL路径为/websocket
。同时,我们使用setAllowedOrigins("*")
设置允许的来源,以便允许跨域访问。
在客户端,可以使用JavaScript或其他编程语言来连接WebSocket并发送/接收消息。以下是一个简单的JavaScript示例:
// 创建WebSocket对象 const socket = new WebSocket('ws://localhost:8080/websocket?group=mygroup&username=myuser'); // 打开WebSocket连接 socket.onopen = function() { console.log('WebSocket连接已打开'); }; // 接收服务器发送的消息 socket.onmessage = function(event) { const message = event.data; console.log('收到消息:', message); }; // 发送消息给服务器 socket.send('Hello, server!'); // 关闭WebSocket连接 socket.onclose = function() { console.log('WebSocket连接已关闭'); };
在上述代码中,我们首先创建了一个WebSocket对象,指定了服务器的URL和查询参数。然后,我们定义了onopen
、onmessage
和onclose
等事件处理程序,以处理与服务器的连接、消息发送和关闭。
以下是socket全生命周期的代码案例,可以实现自定义类型映射,可以通过不同的json格式映射,解析message的类型实现全生命周期的不同的操作
package com.example.webfluxdemo.handler; import com.example.webfluxdemo.Entity.SocketEntity.PrivateMassage; import com.example.webfluxdemo.protocol.MessageType; import com.example.webfluxdemo.protocol.UriProtocol; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import org.springframework.web.reactive.socket.WebSocketHandler; import org.springframework.web.reactive.socket.WebSocketMessage; import org.springframework.web.reactive.socket.WebSocketSession; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import java.io.FileOutputStream; import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.*; import java.util.concurrent.atomic.AtomicReference; /** * @Author: kingdol * @Description: a test to websockthandler */ @Component @Slf4j public class MyWebSocketHandler2 implements WebSocketHandler { /** * @Description: 第一个Sting是组号, 第二个是session列表 * @Query ws://localhost:8081/ws/test?group=1&username=123 */ private final Map<String, List<WebSocketSession>> allGroup = new HashMap<>(); private final Map<String, WebSocketSession> allUserSessionMap = new HashMap<>(); private final List<WebSocketSession> allSession = new ArrayList<>(); private static final ObjectMapper objectMapper = new ObjectMapper(); @NotNull @Override public Mono<Void> handle(WebSocketSession session) { UUID uuid = UUID.randomUUID(); String uuidStr = uuid.toString(); AtomicReference<String> fileName = new AtomicReference<>(uuidStr + ".txt"); URI uri = session.getHandshakeInfo().getUri(); Map<String, String> queryMap2 = getQueryMap(uri.getQuery()); String group = queryMap2.get("group"); String username = queryMap2.get("username"); UriProtocol uriProtocol = new UriProtocol(username, group); return session.receive().doOnSubscribe(s -> { session.getHandshakeInfo().getHeaders().forEach((k, v) -> { log.info("header:{}", k + ":" + v); }); System.out.println("queryMap2 = " + queryMap2); allGroup.computeIfAbsent(group, k -> new ArrayList<>()); allGroup.get(group).add(session); System.out.println("allGroup = " + allGroup); allUserSessionMap.put(username, session); allSession.add(session); ConnectMessage(uriProtocol); log.info("发起连接:{}", s); }).flatMap(message -> { if (message.getType().equals(WebSocketMessage.Type.BINARY)) { log.info("收到二进制消息"); BinaryMessageHandler(session, message, fileName.get()); } else if (message.getType().equals(WebSocketMessage.Type.TEXT)) { MessageType messageType; String payload = message.getPayloadAsText(); try { messageType = objectMapper.readValue(payload, MessageType.class); } catch (JsonProcessingException e) { e.printStackTrace(); // 发送错误消息给客户端 return session.send(Flux.just(session.textMessage("Error: " + e.getMessage()))); // 抛出异常,或者进行其他错误处理 } String content = messageType.getContent(); PrivateMassage privateMassage = messageType.getPrivateMassage(); switch (messageType.getCode()) { // 广播信息 case "1" -> { BroadToAllSession(session, content, username); log.info("收到文本消息:{}", messageType.getContent()); } // 组聊信息 case "2" -> { log.info("收到组发信息" + messageType.getContent() + "-> 发送到第" + group + "组!"); GroupSendMessage(session, content, group, username); } // 私聊信息 case "3" -> { try { PrivateSendMassage(session, privateMassage, username); log.info("收到私聊信息: " + content); } catch (JsonProcessingException e) { return session.send(Flux.just(session.textMessage("Error: " + e.getMessage()))); } } } } else if (message.getType().equals(WebSocketMessage.Type.PING)) { log.info("收到ping消息"); PingTypeHandler(session, message); } else if (message.getType().equals(WebSocketMessage.Type.PONG)) { log.info("收到pong消息"); PongTypeHandler(session, message); } return session.send(Mono.empty()); }).doOnTerminate(() -> { log.info("doOnTerminate"); }).doOnComplete(() -> { allUserSessionMap.remove(username); log.info("doOnComplete"); }).publishOn(Schedulers.boundedElastic()).doOnCancel(() -> { session.close().subscribe(); log.info("doOnCancel"); }).doOnError(e -> { e.printStackTrace(); log.error("doOnError"); }).doOnRequest(r -> { log.info("doOnRequest"); }).then(); } /* * 示例json * {"code": "3", "privateMassage": {"targetname": "123","content":"111"}} * */ private void PrivateSendMassage(WebSocketSession session, PrivateMassage privateMassage, String username) throws JsonProcessingException { String targetname = privateMassage.getTargetname(); String message = privateMassage.getContent(); if (allUserSessionMap.containsKey(targetname)) { WebSocketSession webSocketSession = allUserSessionMap.get(targetname); webSocketSession.send(Flux.just(session.textMessage(username + "对你说: " + message))).subscribe(); } else { session.send(Flux.just(session.textMessage("该用户未上线!"))).subscribe(); } } /* * 示例json * {"code": "1", "content": "111"} * */ private void BroadToAllSession(WebSocketSession session, String content, String username) { for (var sessions : allSession) { sessions.send(Flux.just(session.textMessage(username + "说: " + content))).subscribe(); } } private void ConnectMessage(UriProtocol uriProtocol) { allSession.forEach(s -> { s.send(Flux.just(s.textMessage(uriProtocol.toString()))).subscribe(); }); } /* * 示例json * {"code": "2", "content": "111"} * */ private void GroupSendMessage(WebSocketSession session, String content, String group, String username) { List<WebSocketSession> webSocketSessions = allGroup.get(group); for (var sessions : webSocketSessions) { sessions.send(Flux.just(session.textMessage(username + "说:" + content))).subscribe(); } } private void BinaryMessageHandler(WebSocketSession session, WebSocketMessage message, String fileName) { DataBuffer dataBuffer = message.getPayload(); // 获取字节数组 ByteBuffer byteBuffer = dataBuffer.toByteBuffer(); byte[] byteArray = new byte[byteBuffer.remaining()]; byteBuffer.get(byteArray); // 将字节数组写入文件 try (FileOutputStream fileOutputStream = new FileOutputStream("files/" + fileName)) { FileChannel fileChannel = fileOutputStream.getChannel(); fileChannel.write(ByteBuffer.wrap(byteArray)); } catch (IOException e) { // 处理文件写入错误 e.printStackTrace(); session.send(Flux.just(session.textMessage("fail to upload file"))).subscribe(); } } private void PongTypeHandler(WebSocketSession session, WebSocketMessage message) { } private void PingTypeHandler(WebSocketSession session, WebSocketMessage message) { } private void TestMessageHandler(WebSocketSession session, WebSocketMessage message) { session.send(Flux.just(session.textMessage(message.getPayloadAsText()))).subscribe(); } private Map<String, String> getQueryMap(String queryStr) { Map<String, String> queryMap = new HashMap<>(); if (StringUtils.hasText(queryStr)) { String[] queryParam = queryStr.split("&"); Arrays.stream(queryParam).forEach(s -> { String[] kv = s.split("=", 2); String value = kv.length == 2 ? kv[1] : ""; queryMap.put(kv[0], value); }); } return queryMap; } }
package com.example.webfluxdemo.protocol;
import lombok.Data;
@Data
public class UriProtocol {
private String username;
private String group;
public UriProtocol(String username, String group) {
this.username = username;
this.group = group;
}
}
package com.example.webfluxdemo.protocol; import com.example.webfluxdemo.Entity.SocketEntity.PrivateMassage; import jakarta.annotation.Nullable; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @Data @AllArgsConstructor @NoArgsConstructor public class MessageType { private String code; private String content; @Nullable private PrivateMassage privateMassage; }
package com.example.webfluxdemo.Entity.SocketEntity; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /* * code对应3 * */ @Data @AllArgsConstructor @NoArgsConstructor public class PrivateMassage { private String content; // 目标用户name private String targetname; }
(运行之后生命周期的流程(控制台输出结果)
(附带两个实体映射类)
该文章仅供参考,实际项目开发中的映射类和根据不同的code写不同业务,更加繁琐,但基础就是要理解webflux的输入输出流Flux和Mono的转换以及全生命周期执行的流程
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。