当前位置:   article > 正文

Flask框架里面sse的使用_flask sse

flask sse

可以在一个 Flask 应用中注册多个 Blueprint,每个 Blueprint 可以对应一个 SSE 接口。例如:

  1. from flask import Flask, Response, Blueprint
  2. import time
  3. app = Flask(__name__)
  4. bp1 = Blueprint('sse1', __name__)
  5. bp2 = Blueprint('sse2', __name__)
  6. @bp1.route('/sse1')
  7. def sse1():
  8. def generate():
  9. while True:
  10. yield 'data: {}\n\n'.format(time.time())
  11. time.sleep(1)
  12. return Response(generate(), mimetype='text/event-stream')
  13. @bp2.route('/sse2')
  14. def sse2():
  15. def generate():
  16. while True:
  17. yield 'data: {}\n\n'.format(time.strftime('%Y-%m-%d %H:%M:%S'))
  18. time.sleep(1)
  19. return Response(generate(), mimetype='text/event-stream')
  20. app.register_blueprint(bp1)
  21. app.register_blueprint(bp2)
  22. if __name__ == '__main__':
  23. app.run(debug=True)

在这个例子中,我们创建了两个 Blueprint,分别对应 `/sse1` 和 `/sse2` 接口。每个 Blueprint 中的函数都返回一个 SSE 数据流,使用 `Response` 类型的响应对象封装。最后在 Flask 应用中注册这两个 Blueprint,即可同时开启两个 SSE 接口。

  1. python
  2. from flask import Flask, Response, request
  3. app = Flask(__name__)
  4. # 存储所有的订阅者
  5. subscribers = {}
  6. # 订阅接口,每个浏览器对应一个topic
  7. @app.route('/subscribe/<topic>')
  8. def subscribe(topic):
  9. def stream():
  10. # 将当前请求的客户端添加到订阅者列表中
  11. subscribers.setdefault(topic, []).append(stream)
  12. # 无限循环,等待新的消息
  13. while True:
  14. # 等待新的消息
  15. message = request.args.get('message')
  16. if message:
  17. # 向所有订阅者推送新的消息
  18. for subscriber in subscribers.get(topic, []):
  19. subscriber.put(message)
  20. # 休眠一段时间,减少服务器压力
  21. time.sleep(1)
  22. # 设置响应头,告诉客户端这是一个SSE流
  23. response = Response(stream(), mimetype='text/event-stream')
  24. response.headers['Cache-Control'] = 'no-cache'
  25. response.headers['Connection'] = 'keep-alive'
  26. return response
  27. # 推送接口,向指定topic的所有订阅者推送消息
  28. @app.route('/publish/<topic>')
  29. def publish(topic):
  30. # 获取要推送的消息
  31. message = request.args.get('message')
  32. if message:
  33. # 向所有订阅者推送新的消息
  34. for subscriber in subscribers.get(topic, []):
  35. subscriber.put(message)
  36. return 'OK'

在上面的代码中,我们定义了两个接口,`/subscribe/<topic>`用于订阅指定的topic,`/publish/<topic>`用于向指定topic的所有订阅者推送消息。
在订阅接口中,我们将当前请求的客户端添加到订阅者列表中,并通过一个无限循环等待新的消息。当有新的消息到达时,我们会向所有订阅者推送这条消息。
在推送接口中,我们获取要推送的消息,并向指定topic的所有订阅者推送这条消息。
使用这个代码,你可以轻松地实现一个简单的SSE推送服务,每个浏览器对应一个topic。

  1. import uuid
  2. from flask import Flask, jsonify, request, Response, g
  3. from flask_cors import CORS
  4. from flask_sse import sse
  5. import time
  6. import json
  7. app = Flask(__name__)
  8. cros = CORS(app)
  9. app.config['REDIS_URL'] = 'redis://localhost'
  10. app.register_blueprint(sse, url_prefix='/stream')
  11. # SSE 推送函数
  12. # SSE 推送路由
  13. @app.route('/register', methods=["GET"])
  14. def register():
  15. # 获取客户端标识符
  16. client_id = str(uuid.uuid4())
  17. # 返回 SSE 响应
  18. return jsonify({"client_id": client_id})
  19. # SSE 推送路由
  20. @app.route('/sse', methods=['POST'])
  21. def stream():
  22. # 获取客户端标识符
  23. data = request.get_json()
  24. client_id = data['clientId']
  25. print("client_id", client_id)
  26. def aa():
  27. # 循环发送 SSE 数据
  28. for i in range(10):
  29. data = 'Hello, %s!' % client_id+str(i)
  30. sse.publish(data, channel=client_id, type='message')
  31. time.sleep(1)
  32. sse.publish("end", channel=client_id, type='message')
  33. # 返回 SSE 响应
  34. response = Response(aa(), mimetype='text/event-stream')
  35. response.headers.add('Cache-Control', 'no-cache')
  36. response.headers.add('Connection', 'keep-alive')
  37. response.headers.add('X-Accel-Buffering', 'no')
  38. return response
  39. if __name__ == '__main__':
  40. app.run(debug=True, port=5000)

在上面的代码中,我们使用了 Flask-SSE 扩展来管理 SSE 通道。我们在 `/register` 路由中为每个客户端创建了唯一的标识符,并将其存储在请求环境中。在 `/sse` 路由中,我们使用 `sse.publish` 方法发送 SSE 数据。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/880300
推荐阅读
相关标签
  

闽ICP备14008679号