赞
踩
可以在一个 Flask 应用中注册多个 Blueprint,每个 Blueprint 可以对应一个 SSE 接口。例如:
- from flask import Flask, Response, Blueprint
- import time
-
- app = Flask(__name__)
-
- bp1 = Blueprint('sse1', __name__)
- bp2 = Blueprint('sse2', __name__)
-
- @bp1.route('/sse1')
- def sse1():
- def generate():
- while True:
- yield 'data: {}\n\n'.format(time.time())
- time.sleep(1)
- return Response(generate(), mimetype='text/event-stream')
-
- @bp2.route('/sse2')
- def sse2():
- def generate():
- while True:
- yield 'data: {}\n\n'.format(time.strftime('%Y-%m-%d %H:%M:%S'))
- time.sleep(1)
- return Response(generate(), mimetype='text/event-stream')
-
- app.register_blueprint(bp1)
- app.register_blueprint(bp2)
-
- if __name__ == '__main__':
- app.run(debug=True)
在这个例子中,我们创建了两个 Blueprint,分别对应 `/sse1` 和 `/sse2` 接口。每个 Blueprint 中的函数都返回一个 SSE 数据流,使用 `Response` 类型的响应对象封装。最后在 Flask 应用中注册这两个 Blueprint,即可同时开启两个 SSE 接口。
- python
- from flask import Flask, Response, request
-
- app = Flask(__name__)
-
- # 存储所有的订阅者
- subscribers = {}
-
- # 订阅接口,每个浏览器对应一个topic
- @app.route('/subscribe/<topic>')
- def subscribe(topic):
- def stream():
- # 将当前请求的客户端添加到订阅者列表中
- subscribers.setdefault(topic, []).append(stream)
- # 无限循环,等待新的消息
- while True:
- # 等待新的消息
- message = request.args.get('message')
- if message:
- # 向所有订阅者推送新的消息
- for subscriber in subscribers.get(topic, []):
- subscriber.put(message)
- # 休眠一段时间,减少服务器压力
- time.sleep(1)
-
- # 设置响应头,告诉客户端这是一个SSE流
- response = Response(stream(), mimetype='text/event-stream')
- response.headers['Cache-Control'] = 'no-cache'
- response.headers['Connection'] = 'keep-alive'
- return response
-
- # 推送接口,向指定topic的所有订阅者推送消息
- @app.route('/publish/<topic>')
- def publish(topic):
- # 获取要推送的消息
- message = request.args.get('message')
- if message:
- # 向所有订阅者推送新的消息
- for subscriber in subscribers.get(topic, []):
- subscriber.put(message)
- return 'OK'
-
在上面的代码中,我们定义了两个接口,`/subscribe/<topic>`用于订阅指定的topic,`/publish/<topic>`用于向指定topic的所有订阅者推送消息。
在订阅接口中,我们将当前请求的客户端添加到订阅者列表中,并通过一个无限循环等待新的消息。当有新的消息到达时,我们会向所有订阅者推送这条消息。
在推送接口中,我们获取要推送的消息,并向指定topic的所有订阅者推送这条消息。
使用这个代码,你可以轻松地实现一个简单的SSE推送服务,每个浏览器对应一个topic。
- import uuid
- from flask import Flask, jsonify, request, Response, g
- from flask_cors import CORS
- from flask_sse import sse
- import time
- import json
-
- app = Flask(__name__)
- cros = CORS(app)
- app.config['REDIS_URL'] = 'redis://localhost'
- app.register_blueprint(sse, url_prefix='/stream')
- # SSE 推送函数
-
-
- # SSE 推送路由
-
-
- @app.route('/register', methods=["GET"])
- def register():
- # 获取客户端标识符
- client_id = str(uuid.uuid4())
-
- # 返回 SSE 响应
- return jsonify({"client_id": client_id})
-
- # SSE 推送路由
-
-
- @app.route('/sse', methods=['POST'])
- def stream():
- # 获取客户端标识符
- data = request.get_json()
- client_id = data['clientId']
- print("client_id", client_id)
-
- def aa():
- # 循环发送 SSE 数据
- for i in range(10):
- data = 'Hello, %s!' % client_id+str(i)
- sse.publish(data, channel=client_id, type='message')
- time.sleep(1)
- sse.publish("end", channel=client_id, type='message')
- # 返回 SSE 响应
- response = Response(aa(), mimetype='text/event-stream')
- response.headers.add('Cache-Control', 'no-cache')
- response.headers.add('Connection', 'keep-alive')
- response.headers.add('X-Accel-Buffering', 'no')
- return response
-
-
- if __name__ == '__main__':
- app.run(debug=True, port=5000)
在上面的代码中,我们使用了 Flask-SSE 扩展来管理 SSE 通道。我们在 `/register` 路由中为每个客户端创建了唯一的标识符,并将其存储在请求环境中。在 `/sse` 路由中,我们使用 `sse.publish` 方法发送 SSE 数据。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。