赞
踩
最近在学习Python服务器框架,考虑到未来可能会用到推送服务,就在此记录一下学习过程。实现推送目前有两种方案,一种是基于HTTP协议的SSE,另外一种是不同于HTTP协议的WebSocket协议,纯理论的我也不太明白,不做说明,网上资料很多。
SSE是基于HTTP协议之上的,单向发送消息,开销小,因此就得有支持HTTP协议的web框架,这里我对Tornado、Flask和fastapi都做了记录
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>测试服务器推送技术</title> <script type="text/javascript"> var push_data = new EventSource("http://localhost:5000/sse/data") push_data.onopen = function (event) { // open事件 console.log("open event"); }; push_data.addEventListener('message', function (event) { // var data = JSON.parse(event.data); document.getElementById("result").innerText = event.data // alert("The server says " + data.message); }, false); push_data.addEventListener('error', function (event) { // error事件 console.log("error event"); console.log(event); }, false); xmlHttp = null; function myFunction() { try {// Firefox, Opera 8.0+, Safari, IE7 xmlHttp = new XMLHttpRequest(); } catch (e) {// Old IE try { xmlHttp = new ActiveXObject("Microsoft.XMLHTTP"); } catch (e) { alert("Your browser does not support XMLHTTP!"); return; } } url = "http://localhost:5000/"; xmlHttp.open("GET", url, false); xmlHttp.send(null); document.getElementById("response").innerHTML = xmlHttp.responseText; } </script> </head> <body> <h1>SSE动态数据</h1> <div id="result"></div> <h1>返回结果</h1> <div id="response"></div> <button onclick="myFunction()">关闭</button> </body> </html>
import tornado.ioloop import tornado.web times=0 push_flag=True#这个本应该靠redies维护的,因为每个客户端都应该有唯一的关系 # 推送路由,保持长连接,我们需要去触发才会推送 class PushHandler(tornado.web.RequestHandler): def initialize(self): #关闭自动关闭,改成手动关闭,否则SSE服务会不断重连 self._auto_finish = False #定义推送状态控制字 self.push_status = False print("initialize") # 设置请求头 这很重要 def set_default_headers(self): self.set_header('Content-Type', "text/event-stream") # self.set_header('Content-Control', "no-cache") # self.set_header('Connection', "keep-alive") self.set_header('Access-Control-Allow-Origin', "*") # 建立连接 def get(self): # tornado.ioloop.PeriodicCallback(callback, callback_time, io_loop=None) # callback设定定时调用的方法 callback_time设定每次调用之间的间隔,单位毫秒 self._loop=tornado.ioloop.PeriodicCallback(self.push_data, 1000*1) self._loop.start() # 断开连接 def on_finish(self): print("断开连接") return super().on_finish() @tornado.gen.coroutine#异步处理 # 定时执行推送函数,在此函数中你可以增加业务逻辑,判断是否要推送,推送给谁 def push_data(self): global times global push_flag print("alive...") times+=1 try: if push_flag: result_text = "data:" + "你已经请求"+str(times)+"次" + "\n\n" self.write(result_text) yield self.flush() else: self._loop.stop() self.finish()#结束长连接 except tornado.iostream.StreamClosedError as e: # 断开连接的时候 要清除任务 self._loop.stop() self.finish() except RuntimeError as e: self._loop.stop() self.finish() # 定义一个路由来触发 times的变化 本应该是由内部业务触发 class IndexHandler(tornado.web.RequestHandler): # 设置请求头 这很重要 def set_default_headers(self): self.set_header('Access-Control-Allow-Origin', "*") def get(self): global push_flag push_flag=False self.write("Switch:"+str(push_flag)) def make_app(): return tornado.web.Application([ (r"/", IndexHandler), (r"/sse/data", PushHandler), ]) if __name__ == "__main__": app = make_app() app.listen(5000) tornado.ioloop.IOLoop.current().start()
结果:测试例子比较简单,我也在探索中,就是保证长连接,可以看看这篇文章
在我主动断开连接之后但前端页面还在,前端在试图重新连接,但是还是被我给强制断开了,当我不进行干预时,我擦掉前端页面,后端自动断开连接
flask对于sse有更好的处理方式,flask_sse模块,flask_sse可以绑定通道等信息,但是需要redies数据库支持
from flask import Flask, make_response from flask_sse import sse from flask_cors import CORS app = Flask(__name__) times =0 app.config["REDIS_URL"] = "redis://127.0.0.1" app.register_blueprint(sse, url_prefix='/sse/data') cors = CORS(app)#解决跨域问题 # 定义一个路由来触发 通知 本应该是由内部业务触发 @app.route('/') def indexHandler(): global times times=times+1 rst = make_response("Test:"+str(times)) result_text = "data:" + "times"+str(times)+ "\n\n" sse.publish(result_text,type='message') return rst if __name__ == "__main__": app.run(threaded = True,port=5000,debug=True)
结果:
别忘记开启 Redies
利用flask_sse可以实现丰富的功能,集成好的东西很香,上手简单,但是 高级接口屏蔽底层实现。
import uvicorn import asyncio from fastapi import FastAPI,Request from fastapi.middleware.cors import CORSMiddleware from sse_starlette.sse import EventSourceResponse times=0 app = FastAPI() origins = [ "*" ] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/sse/data") async def root(request: Request): event_generator = status_event_generator(request) return EventSourceResponse(event_generator) status_stream_delay = 1 # second status_stream_retry_timeout = 30000 # milisecond # 其实就是绑定函数事件 一直在跑循环 async def status_event_generator(request): global times while True: if not await request.is_disconnected()==True: yield { "event": "message", "retry": status_stream_retry_timeout, "data": "data:" + "times"+str(times)+ "\n\n" } print("alive") times+=1 await asyncio.sleep(status_stream_delay) if __name__ == '__main__': uvicorn.run("apifastMain:app", host="0.0.0.0", port=5000, log_level="info", reload=True, debug=True,forwarded_allow_ips ='*')
值得注意的是,当我前端擦掉时,改事件也就结束了,是根据request.is_disconnected()进行判断
关于SSE我在学习,具体原理不太清楚,用SSE主要是为了实现推送服务,websocket是双向通信,开销大,而SSE是单向推送,适合不密集推送,git上应该有不少push的项目,这里只是学习探索,欢迎大家一起讨论学习。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。