当前位置:   article > 正文

springboot -sse -flux 服务器推送消息_async support must be enabled on a servlet and for

async support must be enabled on a servlet and for all filters involved in a

先说BUG处理,遇到提示异步问题 Async support must be enabled on a servlet and for all filters involved in async request processing. This is done in Java code using the Servlet API or by adding "<async-supported>true</async-supported>" to servlet and filter declarations in web.xml.

springboot在@WebFilter注解处,加入urlPatterns = { "/*" },asyncSupported = true

springmvc在web.xml处理

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee"
  3.          xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
  4.          version="3.0">
  5. <filter-mapping>
  6.   <filter-name>shiroFilter</filter-name>
  7.   <url-pattern>/*</url-pattern>
  8.   <dispatcher>REQUEST</dispatcher>
  9.   <dispatcher>ASYNC</dispatcher>
  10. </filter-mapping>
  • demo1,服务器间隔一定时间推送内容
  1.     接口方法
  1. @GetMapping(path = "/sse/{userId}",produces = MediaType.TEXT_EVENT_STREAM_VALUE )
  2. public Flux<ServerSentEvent<String>> sse(@PathVariable String userId) {
  3. // 每两秒推送一次
  4. return Flux.interval(Duration.ofSeconds(2)).map(seq->
  5. Tuples.of(seq, LocalDateTime.now())).log()//序号和时间
  6. .map(data-> ServerSentEvent.<String>builder().id(userId).data(data.getT1().toString()).build());//推送内容
  7. }

2.前端代码

  1. <!DOCTYPE html>
  2. <html xmlns:th="http://www.thymeleaf.org">
  3. <head>
  4. <meta charset="UTF-8"/>
  5. <title>服务器推送事件</title>
  6. </head>
  7. <body>
  8. <div>
  9. <div id="data"></div>
  10. <div id="result"></div><br/>
  11. </div>
  12. <script th:inline="javascript" >
  13. //服务器推送事件
  14. if (typeof (EventSource) !== "undefined") {
  15. var source1 = new EventSource("http://localhost:9000/api/admin/test/sse/1");
  16. //当抓取到消息时
  17. source1.onmessage = function (evt) {
  18. document.getElementById("data").innerHTML = document.getElementById("data").innerHTML+"股票行情:" + evt.data;
  19. };
  20. } else {
  21. //注意:ie浏览器不支持
  22. document.getElementById("result").innerHTML = "抱歉,你的浏览器不支持 server-sent 事件...";
  23. var xhr;
  24. var xhr2;
  25. if (window.XMLHttpRequest){
  26. //IE7+, Firefox, Chrome, Opera, Safari浏览器支持该方法
  27. xhr=new XMLHttpRequest();
  28. xhr2=new XMLHttpRequest();
  29. }else{
  30. //IE6, IE5 浏览器不支持,使用ActiveXObject方法代替
  31. xhr=new ActiveXObject("Microsoft.XMLHTTP");
  32. xhr2=new ActiveXObject("Microsoft.XMLHTTP");
  33. }
  34. console.log(xhr);
  35. console.log(xhr2);
  36. xhr.open('GET', '/sse/countDown');
  37. xhr.send(null);//发送请求
  38. xhr.onreadystatechange = function() {
  39. console.log("s响应状态:" + xhr.readyState);
  40. //2是空响应,3是响应一部分,4是响应完成
  41. if (xhr.readyState > 2) {
  42. //这儿可以使用response(对应json)与responseText(对应text)
  43. var newData = xhr.response.substr(xhr.seenBytes);
  44. newData = newData.replace(/\n/g, "#");
  45. newData = newData.substring(0, newData.length - 1);
  46. var data = newData.split("#");
  47. console.log("获取到的数据:" + data);
  48. document.getElementById("result").innerHTML = data;
  49. //长度重新赋值,下次截取时需要使用
  50. xhr.seenBytes = xhr.response.length;
  51. }
  52. }
  53. xhr2.open('GET', '/sse/retrieve');
  54. xhr2.send(null);//发送请求
  55. xhr2.onreadystatechange = function() {
  56. console.log("s响应状态:" + xhr2.readyState);
  57. //0: 请求未初始化,2 请求已接收,3 请求处理中,4 请求已完成,且响应已就绪
  58. if (xhr2.readyState > 2) {
  59. //这儿可以使用response(对应json)与responseText(对应text)
  60. var newData1 = xhr2.response.substr(xhr2.seenBytes);
  61. newData1 = newData1.replace(/\n/g, "#");
  62. newData1 = newData1.substring(0, newData1.length - 1);
  63. var data1 = newData1.split("#");
  64. console.log("获取到的数据:" + data1);
  65. document.getElementById("data").innerHTML = data1;
  66. //长度重新赋值,下次截取时需要使用
  67. xhr2.seenBytes = xhr2.response.length;
  68. }
  69. }
  70. }
  71. </script>
  72. </body>
  73. </html>
  • demo2 订阅服务器消息,服务器send推送消息完成后,关闭sse.close

1.接口方法以及工具类

  1. @GetMapping(path = "/sse/sub",produces = MediaType.TEXT_EVENT_STREAM_VALUE )
  2. public SseEmitter subscribe(@RequestParam String questionId,HttpServletResponse response) {
  3. // 简单异步发消息 ====
  4. //questionId 订阅id,id对应了sse对象
  5. new Thread(() -> {
  6. try {
  7. Thread.sleep(1000);
  8. for (int i = 0; i < 10; i++) {
  9. Thread.sleep(500);
  10. SSEUtils.pubMsg(questionId, questionId + " - kingtao come " + i);
  11. }
  12. } catch (Exception e) {
  13. e.printStackTrace();
  14. } finally {
  15. // 消息发送完关闭订阅
  16. SSEUtils.closeSub(questionId);
  17. }
  18. }).start();
  19. // =================
  20. return SSEUtils.addSub(questionId);
  21. }

工具类

  1. import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
  2. import java.util.Map;
  3. import java.util.concurrent.ConcurrentHashMap;
  4. public class SSEUtils {
  5. // timeout
  6. private static Long DEFAULT_TIME_OUT = 2*60*1000L;
  7. // 订阅表
  8. private static Map<String, SseEmitter> subscribeMap = new ConcurrentHashMap<>();
  9. /** 添加订阅 */
  10. public static SseEmitter addSub(String questionId) {
  11. if (null == questionId || "".equals(questionId)) {
  12. return null;
  13. }
  14. SseEmitter emitter = subscribeMap.get(questionId);
  15. if (null == emitter) {
  16. emitter = new SseEmitter(DEFAULT_TIME_OUT);
  17. subscribeMap.put(questionId, emitter);
  18. }
  19. return emitter;
  20. }
  21. /** 发消息 */
  22. public static void pubMsg(String questionId, String msg) {
  23. SseEmitter emitter = subscribeMap.get(questionId);
  24. if (null != emitter) {
  25. try {
  26. // 更规范的消息结构看源码
  27. emitter.send(SseEmitter.event().data(msg));
  28. } catch (Exception e) {
  29. // e.printStackTrace();
  30. }
  31. }
  32. }
  33. /**
  34. * 关闭订阅
  35. * @param questionId
  36. */
  37. public static void closeSub(String questionId) {
  38. SseEmitter emitter = subscribeMap.get(questionId);
  39. if (null != emitter) {
  40. try {
  41. emitter.complete();
  42. subscribeMap.remove(questionId);
  43. } catch (Exception e) {
  44. e.printStackTrace();
  45. }
  46. }
  47. }
  48. }

2.前端代码

  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4. <meta charset="UTF-8">
  5. <title>sse</title>
  6. </head>
  7. <body>
  8. <div>
  9. <label>问题id</label>
  10. <input type="text" id="questionId">
  11. <button onclick="subscribe()">订阅</button>
  12. <hr>
  13. <label>F12-console控制台查看消息</label>
  14. </div>
  15. <script>
  16. function subscribe() {
  17. let questionId = document.getElementById('questionId').value;
  18. let url = 'http://localhost:9000/api/admin/test/sse/sub?questionId=' + questionId;
  19. let eventSource = new EventSource(url);
  20. eventSource.onmessage = function (e) {
  21. console.log(e.data);
  22. };
  23. eventSource.onopen = function (e) {
  24. console.log(e,1);
  25. // todo
  26. };
  27. eventSource.onerror = function (e) {
  28. // todo
  29. console.log(e,2);
  30. eventSource.close()
  31. };
  32. }
  33. </script>
  34. </body>
  35. </html>

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/915751
推荐阅读
相关标签
  

闽ICP备14008679号