当前位置:   article > 正文

(十二)springboot实战——SSE服务推送事件案例实现_springboot sse

springboot sse

前言

SSE(Server-Sent Events,服务器推送事件)是一种基于HTTP协议的服务器推送技术。它允许服务器向客户端发送异步的、无限长的数据流,而无需客户端不断地轮询或发起请求。这种技术可以用来实现实时通信、在线聊天、即时更新等功能。使用SSE时,客户端通过简单的HTTP请求与服务器建立连接,并在连接保持打开的情况下接收服务器发送的数据。服务器可以随时向客户端发送新的数据(以文本格式),并在数据前面添加特定的标识符,以便客户端能够正确地解析数据。

相较于WebSocket的服务推送,SSE更为轻量级。SSE在客户端与服务器之间建立一个持久的HTTP连接,可以保持连接打开,并通过该连接发送异步的、无限长的数据流。而WebSocket则是一种全双工的协议,支持双向通信,客户端和服务器之间可以直接发送消息。SSE使用文本格式传输数据,并在数据前添加特定的标识符,以便客户端能够正确地解析数据;WebSocket则支持文本和二进制格式的数据传输。由于SSE基于HTTP协议,每次请求和响应都需要经过完整的HTTP协议栈,因此可能存在一定的延迟。而WebSocket则可以更快地建立连接,从而实现更实时的通信。SSE的支持性比WebSocket更广泛,因为它基于标准的HTTP协议,并不需要特殊的服务器支持。而WebSocket则需要WebSocket服务器的支持,因此在某些环境下可能存在兼容性问题。

本节内容主要介绍传统的springboot web项目与响应式springboot webflux项目下如何实现一个简单的SSE服务。

正文

springboot web项目实现SSE案例

①创建一个SSE的接口SseController实现服务数据推送

  1. package com.ht.atp.plat.controller;
  2. import com.ht.atp.plat.common.SseEmitterUTF8;
  3. import org.springframework.http.MediaType;
  4. import org.springframework.scheduling.annotation.Scheduled;
  5. import org.springframework.stereotype.Controller;
  6. import org.springframework.web.bind.annotation.GetMapping;
  7. import org.springframework.web.bind.annotation.RequestMapping;
  8. import org.springframework.web.bind.annotation.RestController;
  9. import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
  10. import java.io.IOException;
  11. import java.nio.charset.StandardCharsets;
  12. import java.util.ArrayList;
  13. import java.util.List;
  14. @Controller
  15. @RequestMapping("/sse")
  16. public class SseController {
  17. private int count_down_sec = 2 * 60 * 60;
  18. private List<SseEmitterUTF8> emitters = new ArrayList<>();
  19. /**
  20. * 每秒执行一次
  21. *
  22. */
  23. @Scheduled(fixedDelay = 1000)
  24. public void sendMessage() {
  25. // 获取要发送的消息
  26. String message = getCountDownSec();
  27. byte[] bytes = message.getBytes(StandardCharsets.UTF_8);
  28. for (SseEmitter emitter : emitters) {
  29. try {
  30. emitter.send(bytes, MediaType.TEXT_PLAIN);
  31. } catch (Exception e) {
  32. emitter.completeWithError(e);
  33. }
  34. }
  35. }
  36. @GetMapping(path = "/countDown", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  37. public SseEmitterUTF8 sse() throws IOException {
  38. SseEmitterUTF8 emitter = new SseEmitterUTF8(0L);
  39. emitter.send("连接成功");
  40. emitter.onCompletion(() -> emitters.remove(emitter));
  41. emitter.onTimeout(() -> emitters.remove(emitter));
  42. emitters.add(emitter);
  43. return emitter;
  44. }
  45. private String getCountDownSec() {
  46. if (count_down_sec > 0) {
  47. int h = count_down_sec / (60 * 60);
  48. int m = (count_down_sec % (60 * 60)) / 60;
  49. int s = (count_down_sec % (60 * 60)) % 60;
  50. count_down_sec--;
  51. return "活动倒计时:" + h + " 小时 " + m + " 分钟 " + s + " 秒";
  52. }
  53. return "活动倒计时:0 小时 0 分钟 0 秒";
  54. }
  55. }

② SSE服务说明,通过创建一个MediaType.TEXT_EVENT_STREAM_VALUE类型的HTTP请求,向客户端推送服务数据

 ③创建一个SseEmitterUTF8类继承SseEmitter,解决推送中文消息的服务乱码问题

  1. package com.ht.atp.plat.common;
  2. import org.springframework.http.HttpHeaders;
  3. import org.springframework.http.MediaType;
  4. import org.springframework.http.server.ServerHttpResponse;
  5. import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
  6. import java.nio.charset.StandardCharsets;
  7. public class SseEmitterUTF8 extends SseEmitter {
  8. public SseEmitterUTF8(Long timeout) {
  9. super(timeout);
  10. }
  11. @Override
  12. protected void extendResponse(ServerHttpResponse outputMessage) {
  13. super.extendResponse(outputMessage);
  14. HttpHeaders headers = outputMessage.getHeaders();
  15. headers.setContentType(new MediaType(MediaType.TEXT_EVENT_STREAM, StandardCharsets.UTF_8));
  16. }
  17. }

④ 启动项目,通过接口访问该sse服务

⑤创建一个vue项目,安装vue-sse组件消费推送的消息

命令:

npm i vue-sse -S

 

⑥在main.js中引入vue-sse并注册为一个全局插件

⑦ 使用vue-sse创建一个客户端demo

  1. <template>
  2. <div id="app">
  3. <div style="background: white;text-align: center;">
  4. <h5>SSE消息</h5>
  5. <p>{{ sseMsg }}</p>
  6. </div>
  7. </div>
  8. </template>
  9. <script>
  10. export default {
  11. name: 'App',
  12. data() {
  13. return {
  14. sseMsg: '',
  15. }
  16. },
  17. created() {
  18. this.getVueSseMsg();
  19. },
  20. methods: {
  21. getVueSseMsg() {
  22. this.$sse.create('http://127.0.0.1:7777/sse/countDown')
  23. .on('message', (msg) => {
  24. console.info('Message:', msg)
  25. // 处理接收到的消息
  26. this.sseMsg = msg;
  27. })
  28. .on('error', (err) => console.error('Failed to parse or lost connection:', err))
  29. .connect()
  30. .catch((err) => console.error('Failed make initial connection:', err));
  31. }
  32. },
  33. }
  34. </script>
  35. <style>
  36. body {
  37. margin: 0px;
  38. padding: 0px;
  39. }
  40. #app {
  41. -webkit-background-size: cover;
  42. -o-background-size: cover;
  43. background-size: cover;
  44. margin: 0 auto;
  45. height: 100%;
  46. }
  47. </style>

 ⑧查看实现效果

springboot webflux项目实现SSE案例

①创建一个SSE的接口SseController实现服务数据推送

  1. package com.yundi.atp.controller;
  2. import io.swagger.v3.oas.annotations.Operation;
  3. import io.swagger.v3.oas.annotations.tags.Tag;
  4. import org.springframework.http.MediaType;
  5. import org.springframework.http.codec.ServerSentEvent;
  6. import org.springframework.web.bind.annotation.GetMapping;
  7. import org.springframework.web.bind.annotation.RequestMapping;
  8. import org.springframework.web.bind.annotation.RestController;
  9. import reactor.core.publisher.Flux;
  10. import reactor.util.function.Tuples;
  11. import java.time.Duration;
  12. @Tag(name = "事件流", description = "事件流")
  13. @RestController
  14. @RequestMapping("/sse")
  15. public class SseController {
  16. private int count_down_sec = 3 * 60 * 60;
  17. @Operation(summary = "获取倒计时数据", description = "获取倒计时数据")
  18. @GetMapping(value = "/countDown", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  19. public Flux<ServerSentEvent<Object>> countDown() {
  20. Flux<ServerSentEvent<Object>> countDown = Flux.interval(Duration.ofSeconds(1))
  21. .map(seq -> Tuples.of(seq, getCountDownSec()))
  22. .map(data -> ServerSentEvent.builder()
  23. .event("countDown")
  24. .id(Long.toString(data.getT1()))
  25. .data(data.getT2())
  26. .build());
  27. return countDown;
  28. }
  29. private String getCountDownSec() {
  30. if (count_down_sec > 0) {
  31. int h = count_down_sec / (60 * 60);
  32. int m = (count_down_sec % (60 * 60)) / 60;
  33. int s = (count_down_sec % (60 * 60)) % 60;
  34. count_down_sec--;
  35. return "活动倒计时:" + h + " 小时 " + m + " 分钟 " + s + " 秒";
  36. }
  37. return "活动倒计时:0 小时 0 分钟 0 秒";
  38. }
  39. }

② 启动项目,使用浏览器访问接口

③将vue的客户端消息sse改为webflux项目的地址

④webflux项目的sse消息效果

结语

本节内容到这里就结束了,我们下期见。。。。。。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/酷酷是懒虫/article/detail/849907
推荐阅读
相关标签
  

闽ICP备14008679号