当前位置:   article > 正文

SSE 服务端推送实现和实际问题解决_sseemitter服务端关闭造成重连

sseemitter服务端关闭造成重连

实现框架springboot

很多文章都用它来和webscoket对比,就不细讲了,简单来说sse 只能server主动向client推送

并且特点是具备主动断开重连功能,这点是和webscoket我认为比较重要的区别点因为这点有实际bug,原因是sse客户端每次重连都是建立新通道,导致服务端找不到之前的通道,但是消息也会推送成功,只是报错就很烦了因为这是个性能问题(如果服务器是巨兽或低并发不考虑)

  1. import lombok.AllArgsConstructor;
  2. import lombok.Data;
  3. import lombok.NoArgsConstructor;
  4. /**
  5. * 消息体
  6. */
  7. @Data
  8. @AllArgsConstructor
  9. @NoArgsConstructor
  10. public class MessageVo {
  11. /**
  12. * 客户端id
  13. */
  14. private String clientId;
  15. /**
  16. * 传输数据体(json)
  17. */
  18. private String data;
  19. }

数据载体类

  1. import com.ruoyi.business.sse.main.MessageVo;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.java_websocket.client.WebSocketClient;
  4. import org.springframework.beans.factory.annotation.Value;
  5. import org.springframework.http.HttpStatus;
  6. import org.springframework.http.MediaType;
  7. import org.springframework.stereotype.Service;
  8. import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
  9. import java.io.IOException;
  10. import java.net.URI;
  11. import java.net.URISyntaxException;
  12. import java.util.HashMap;
  13. import java.util.Map;
  14. import java.util.concurrent.ConcurrentHashMap;
  15. import java.util.concurrent.atomic.AtomicBoolean;
  16. import java.util.function.Consumer;
  17. /**
  18. * 实现
  19. */
  20. @Slf4j
  21. @Service
  22. public class SseEmitterServiceImpl implements SseEmitterService {
  23. @Value("${wss.url}")
  24. private String wssUrl;
  25. /**
  26. * 容器,保存连接,用于输出返回 ;可使用其他方法实现
  27. */
  28. private static final Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();
  29. /**
  30. * 创建连接
  31. *
  32. * @param clientId 客户端ID
  33. */
  34. @Override
  35. public SseEmitter createConnect(String clientId) throws URISyntaxException {
  36. if (sseCache.containsKey(clientId)) {
  37. removeUser(clientId);
  38. }
  39. // 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
  40. SseEmitter sseEmitter = new SseEmitter(50 * 1000L);
  41. // 添加自己的业务
  42. // 注册回调
  43. sseEmitter.onCompletion(completionCallBack(clientId,client)); // 长链接完成后回调接口(即关闭连接时调用)
  44. sseEmitter.onTimeout(timeoutCallBack(clientId,client)); // 连接超时回调
  45. sseEmitter.onError(errorCallBack(clientId,client)); // 推送消息异常时,回调方法
  46. sseCache.put(clientId, sseEmitter);
  47. System.out.println("创建新的sse连接,当前用户:{" + clientId + "} 累计用户:{" + sseCache.size() + "}");
  48. try {
  49. // 注册成功返回用户信息
  50. sseEmitter.send(SseEmitter.event().id(String.valueOf(HttpStatus.OK)).data(clientId, MediaType.APPLICATION_JSON));
  51. } catch (IOException e) {
  52. sseEmitter.complete();
  53. System.out.println("创建长链接异常,客户端ID:{} 异常信息:{" + clientId + "} 异常信息:{}{" + e.getMessage() + "}");
  54. }
  55. return sseEmitter;
  56. }
  57. /**
  58. * 关闭连接
  59. *
  60. * @param clientId 客户端ID
  61. */
  62. @Override
  63. public void closeConnect(String clientId) {
  64. SseEmitter sseEmitter = sseCache.get(clientId);
  65. if (sseEmitter != null) {
  66. sseEmitter.complete();
  67. removeUser(clientId);
  68. }
  69. }
  70. /**
  71. * 长链接完成后回调接口(即关闭连接时调用)
  72. *
  73. * @param clientId 客户端ID
  74. **/
  75. private Runnable completionCallBack(String clientId,SSEWebSocketClient client) {
  76. return () -> {
  77. System.out.println("连接关闭!!!");
  78. closeConnect(clientId);
  79. client.close();
  80. };
  81. }
  82. /**
  83. * 连接超时时调用
  84. *
  85. * @param clientId 客户端ID
  86. **/
  87. private Runnable timeoutCallBack(String clientId,SSEWebSocketClient client) {
  88. return () -> {
  89. System.out.println("连接超时!!!");
  90. closeConnect(clientId);
  91. client.close();
  92. };
  93. }
  94. /**
  95. * 推送消息异常时,回调方法
  96. *
  97. * @param clientId 客户端ID
  98. **/
  99. private Consumer<Throwable> errorCallBack(String clientId,SSEWebSocketClient client) {
  100. return throwable -> {
  101. System.out.println("消息异常!!!");
  102. closeConnect(clientId);
  103. client.close();
  104. };
  105. }
  106. /**
  107. * 移除用户连接
  108. *
  109. * @param clientId 客户端ID
  110. **/
  111. private void removeUser(String clientId) {
  112. sseCache.remove(clientId);
  113. log.info("SseEmitterServiceImpl[removeUser]:移除用户:{}", clientId);
  114. }
  115. }

业务实现类

  1. import com.alibaba.fastjson2.JSONObject;
  2. import com.ruoyi.business.sse.main.MessageVo;
  3. import lombok.SneakyThrows;
  4. import org.java_websocket.client.WebSocketClient;
  5. import org.java_websocket.handshake.ServerHandshake;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.http.HttpStatus;
  8. import org.springframework.http.MediaType;
  9. import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
  10. import javax.annotation.Resource;
  11. import java.io.IOException;
  12. import java.net.URI;
  13. import java.util.HashMap;
  14. import java.util.Map;
  15. public class SSEWebSocketClient extends WebSocketClient {
  16. private String clientId;
  17. private SseEmitter sseEmitter;
  18. public String getClientId() {
  19. return clientId;
  20. }
  21. public void setClientId(String clientId) {
  22. this.clientId = clientId;
  23. }
  24. public SseEmitter getSseEmitter() {
  25. return sseEmitter;
  26. }
  27. public void setSseEmitter(SseEmitter sseEmitter) {
  28. this.sseEmitter = sseEmitter;
  29. }
  30. public SSEWebSocketClient(URI serverUri, Map<String, String> httpHeaders) {
  31. super(serverUri, httpHeaders);
  32. }
  33. @Override
  34. public void onOpen(ServerHandshake handshakedata) {
  35. System.out.println("新连接已打开");
  36. super.send("{\"type\":\"connection_init\"}");
  37. try {
  38. Thread.sleep(700L);
  39. } catch (InterruptedException e) {
  40. e.printStackTrace();
  41. }
  42. String param = "private-search-" + clientId;
  43. super.send("{\"id\": \"" + param + "\",\"payload\": {\"data\": \"{\\\"query\\\":\\\"subscription Subscribe($name: String!) {\\\\n subscribe(name: $name) {\\\\n name\\\\n data\\\\n __typename\\\\n }\\\\n}\\\\n\\\",\\\"variables\\\":{\\\"name\\\":\\\"" + param + "\\\"}}\",\"extensions\": {\"authorization\": {\"host\": \"apndpknqrjhgpbhqwfwdmcx2du.appsync-api.cn-northwest-1.amazonaws.com.cn\",\"x-amz-date\": \"20240412T033202Z\",\"x-api-key\": \"da2-rduvi64eizehdotnp3a3jhokuq\",\"x-amz-user-agent\": \"aws-amplify/6.0.27 api/1 framework/2\"}}},\"type\": \"start\"}");
  44. }
  45. @Override
  46. public void onMessage(String message) {
  47. System.out.println("接收到消息: " + message);
  48. if (!message.contains("连接成功")) {
  49. JSONObject result1 = JSONObject.parse(message);
  50. if (result1.getString("type").equals("data")) {
  51. if (result1.getJSONObject("payload").getJSONObject("data").getJSONObject("subscribe").getJSONObject("data").getString("message").equals("streaming")) {
  52. MessageVo messageVo = new MessageVo(clientId, result1.getJSONObject("payload").getJSONObject("data").getJSONObject("subscribe").getString("data"));
  53. SseEmitter.SseEventBuilder sendData = SseEmitter.event().id(String.valueOf(HttpStatus.OK))
  54. .data(messageVo, MediaType.APPLICATION_JSON);
  55. try {
  56. sseEmitter.send(sendData);
  57. } catch (IOException e) {
  58. e.printStackTrace();
  59. sseEmitter.complete();
  60. }
  61. }
  62. if (result1.getJSONObject("payload").getJSONObject("data").getJSONObject("subscribe").getJSONObject("data").getString("message").equals("streaming_end")) {
  63. MessageVo messageVo = new MessageVo(clientId, result1.getJSONObject("payload").getJSONObject("data").getJSONObject("subscribe").getString("data"));
  64. SseEmitter.SseEventBuilder sendData = SseEmitter.event().id(String.valueOf(HttpStatus.OK))
  65. .data(messageVo, MediaType.APPLICATION_JSON);
  66. try {
  67. sseEmitter.send(sendData);
  68. sseEmitter.complete();
  69. } catch (IOException e) {
  70. e.printStackTrace();
  71. sseEmitter.complete();
  72. }
  73. }
  74. } else if (result1.getString("type").equals("start_ack")) {
  75. System.out.println("连接成功!!!");
  76. } else {
  77. System.out.println(result1.toJSONString());
  78. }
  79. }
  80. }
  81. @Override
  82. public void onClose(int code, String reason, boolean remote) {
  83. System.out.println("连接已关闭");
  84. }
  85. @Override
  86. public void onError(Exception ex) {
  87. ex.printStackTrace();
  88. }
  89. }

接口类

  1. import com.ruoyi.business.sse.service.SseEmitterService;
  2. import com.ruoyi.common.core.domain.AjaxResult;
  3. import com.ruoyi.common.utils.StringUtils;
  4. import org.apache.http.client.methods.CloseableHttpResponse;
  5. import org.apache.http.client.methods.HttpPost;
  6. import org.apache.http.entity.StringEntity;
  7. import org.apache.http.impl.client.CloseableHttpClient;
  8. import org.apache.http.impl.client.HttpClients;
  9. import org.apache.http.util.EntityUtils;
  10. import org.springframework.beans.factory.annotation.Value;
  11. import org.springframework.http.HttpEntity;
  12. import org.springframework.http.HttpHeaders;
  13. import org.springframework.http.MediaType;
  14. import org.springframework.http.ResponseEntity;
  15. import org.springframework.web.bind.annotation.*;
  16. import org.springframework.web.client.RestTemplate;
  17. import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
  18. import javax.annotation.Resource;
  19. import java.io.IOException;
  20. import java.net.URISyntaxException;
  21. import java.util.Map;
  22. /**
  23. * SSE长链接
  24. */
  25. @RestController
  26. @RequestMapping("/sse")
  27. public class SseEmitterController {
  28. @Resource
  29. private SseEmitterService sseEmitterService;
  30. @Value("${appsync.url}")
  31. private String url;
  32. @CrossOrigin
  33. @GetMapping("/createConnect/{clientId}")
  34. public SseEmitter createConnect(@PathVariable("clientId") String clientId) throws URISyntaxException {
  35. return sseEmitterService.createConnect(clientId);
  36. }
  37. @CrossOrigin
  38. @GetMapping("/closeConnect")
  39. public void closeConnect(@RequestParam(required = true) String clientId) {
  40. sseEmitterService.closeConnect(clientId);
  41. }
  42. @CrossOrigin
  43. @PostMapping("/sendAjaxMessage")
  44. public AjaxResult sendMessage(@RequestBody Map map) {
  45. // map参数体校验
  46. if (map == null) {
  47. return AjaxResult.error(601, "请求接口需要传递正确的JSON数据");
  48. }
  49. // query 参数校验
  50. String query = (String) map.get("query");
  51. if (StringUtils.isEmpty((String) map.get("query"))) {
  52. return AjaxResult.error(601, " query 不能为空");
  53. }
  54. // query 参数校验
  55. String messageId = (String) map.get("messageId");
  56. if (StringUtils.isEmpty((String) map.get("messageId"))) {
  57. return AjaxResult.error(601, " messageId 不能为空");
  58. }
  59. // connectionId 参数校验
  60. String connectionId = (String) map.get("connectionId");
  61. if (StringUtils.isEmpty((String) map.get("connectionId"))) {
  62. return AjaxResult.error(601, " connectionId 不能为空");
  63. }
  64. HttpHeaders headers = new HttpHeaders();
  65. headers.setContentType(MediaType.APPLICATION_JSON);
  66. headers.add("connectionId", connectionId);
  67. String indexName = "smart_search_qa_test";
  68. String prompt = "";
  69. String jsonBody = "{\n" +
  70. " \"action\": \"search\",\n" +
  71. " \"configs\": {\n" +
  72. " \"name\": \"demo\",\n" +
  73. " \"searchEngine\": \"opensearch\",\n" +
  74. " \"llmData\": {\n" +
  75. " \"strategyName\": \"unimportant\",\n" +
  76. " \"type\": \"sagemaker_endpoint\",\n" +
  77. " \"embeddingEndpoint\": \"huggingface-inference-eb\",\n" +
  78. " \"modelType\": \"non_llama2\",\n" +
  79. " \"recordId\": \"dji-inference-chatglm3-6b-62277\",\n" +
  80. " \"sagemakerEndpoint\": \"dji-inference-chatglm3-6b\",\n" +
  81. " \"streaming\": true\n" +
  82. " },\n" +
  83. " \"role\": \"\",\n" +
  84. " \"language\": \"chinese\",\n" +
  85. " \"taskDefinition\": \"\",\n" +
  86. " \"outputFormat\": \"stream\",\n" +
  87. " \"isCheckedGenerateReport\": false,\n" +
  88. " \"isCheckedContext\": false,\n" +
  89. " \"isCheckedKnowledgeBase\": true,\n" +
  90. " \"indexName\": \"" + indexName + "\",\n" +
  91. " \"topK\": 2,\n" +
  92. " \"searchMethod\": \"vector\",\n" +
  93. " \"txtDocsNum\": 0,\n" +
  94. " \"vecDocsScoreThresholds\": 0,\n" +
  95. " \"txtDocsScoreThresholds\": 0,\n" +
  96. " \"isCheckedScoreQA\": true,\n" +
  97. " \"isCheckedScoreQD\": true,\n" +
  98. " \"isCheckedScoreAD\": true,\n" +
  99. " \"contextRounds\": 2,\n" +
  100. " \"isCheckedEditPrompt\": false,\n" +
  101. " \"prompt\": \"" + prompt + "\",\n" +
  102. " \"tokenContentCheck\": \"\",\n" +
  103. " \"responseIfNoDocsFound\": \"Cannot find the answer\"\n" +
  104. " },\n" +
  105. " \"query\": \"" + query + "\"," +
  106. " \"messageId\": \"" + messageId + "\"" +
  107. "}";
  108. HttpEntity<String> entity = new HttpEntity<>(jsonBody, headers);
  109. try {
  110. RestTemplate restTemplate = new RestTemplate();
  111. ResponseEntity<String> response = restTemplate.postForEntity(url, entity, String.class);
  112. return AjaxResult.success("操作成功");
  113. } catch (Exception e) {
  114. e.printStackTrace();
  115. return AjaxResult.error("操作失败");
  116. }
  117. }
  118. }

控制类

MessageVo messageVo = new MessageVo(clientId,"你想推送给页面的消息这段代码写在自己的也业务");
                    SseEmitter.SseEventBuilder sendData = SseEmitter.event().id(String.valueOf(HttpStatus.OK))
                            .data(messageVo, MediaType.APPLICATION_JSON);
                    try {
                        sseEmitter.send(sendData);
                        sseEmitter.complete();

                    } catch (IOException e) {
                        e.printStackTrace();
                        sseEmitter.complete();
                    }

有一点很抱歉接口封装的发送消息代码是个未完成的代码一定要用上者SseEmitter对象要注意甄别这个类似通道的对象

还有一点注意如果发送消息完成之后一定要complete() 不然会出现异常这个问题资料都避而不谈网上一直强调sse使用简单但是实现并且维护一个高压sse很复杂异常百出资料稀少;

还有一点注意sseEmitter.complete();要在你设置超时时间之内;

其实可以设置0L永不超时但是对服务器压力巨大,因为用户量也大,也根据实际情况(我这边是aws服务产品二开请求或连接不能超过1分钟,超过主动断开)

结语:虽然我实现了webscoket 和 sse 两个版本但是我还是最后采用了sse ,没有别的新技术还是要支持即使会有问题但是我也要多发现暴露出来别让后人踩坑,注意代码要结合自己的业务符合java语言整改,我也是找别人改的。

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号