赞
踩
import json import redis from flask import Flask, current_app from flask_cors import CORS from flask_sse import sse from redis import Redis app=Flask(__name__) CORS(app, supports_credentials=True) app.config["REDIS_URL"]="redis://localhost" app.register_blueprint(sse,url_prefix='/stream') # sse请求地址(订阅消息) @app.route('/send') # 发布消息 def send_message(): sse.publish({"message":"Hello!"},type='greeting') return"Message sent!"
页面demo
<!DOCTYPE html>
<html>
<head>
<title>SSE Demo</title>
</head>
<body>
<div id="message-div"></div>
<script>
var source = new EventSource("http://127.0.0.1:5000/stream") // 创建EventSource对象,连接到SSE流
source.addEventListener("message", function (event) { // 监听"message"事件
console.log(event.data)});
</script>
</body>
</html>
import json import loguru import redis from flask import Flask, current_app, request from flask_cors import CORS class PubSub(object): '''发布订阅者''' def __init__(self): self._conn = redis.Redis(connection_pool=redis.ConnectionPool(decode_responses=True)) def pub(self, message, channel_name, type): ''' 发布 @params message --> 消息内容; 字符串格式; @params channel_name --> 频道; @params type --> 自定义类型; ''' self._conn.publish(channel_name, json.dumps({'data': {'message': message}, 'type': type})) def sub(self, channel_name): ''' 订阅 @params channel_name --> 频道; 响应格式: event:greeting data:{"message": "Hello!"} event:greeting data:{"message": "Hello!"} ''' pubsub = self._conn.pubsub() # 生成订阅对象 pubsub.subscribe(channel_name) try: for pubsub_message in pubsub.listen(): loguru.logger.debug(f'消息中间件:{pubsub_message}') yield getattr(self, pubsub_message['type'])(pubsub_message) finally: try: pubsub.unsubscribe(channel_name) except ConnectionError: ... def message(self, pubsub_message): '''常规消息''' msg_dict = json.loads(pubsub_message['data']) return f'event:{msg_dict["type"]}' + '\n' + f"data:{str(msg_dict['data'])}" + '\n\n' def subscribe(self, *args): '''发起连接消息''' return f'event:connect' + '\n' + f"data:ok" + '\n\n' app=Flask(__name__) CORS(app, supports_credentials=True) p = PubSub() @app.route('/sse') def sse(): ''' 订阅消息 Eg: <script> var source = new EventSource("/sse?channel_name=demo") // 创建EventSource对象,连接到SSE流 source.addEventListener("message", function (event) { // 监听"message"事件 console.log(event.data)}); </script> ''' channel_name = request.args.get('channel_name') return current_app.response_class( p.sub(channel_name), mimetype='text/event-stream', ) @app.route('/send') def send(): ''' 发布消息 /send?channel_name=demo&mes=测试下 ''' mes = request.args.get('mes') channel_name = request.args.get('channel_name') p.pub(channel_name=channel_name, message=mes, type='message') return f'send:{mes}' if __name__ == '__main__': app.run(debug=True)
server { listen 443 default ssl; listen [::]:443 default ssl; server_name _; ssl_certificate /root/project/code/tree_hole_gpt/ssl/(文件); ssl_certificate_key /root/project/code/tree_hole_gpt/ssl/(文件); ssl_session_timeout 5m; ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:ECDHE:ECDH:AES:HIGH:!NULL:!aNULL:!MD5:!ADH:!RC4; ssl_protocols TLSv1 TLSv1.1 TLSv1.2; ssl_prefer_server_ciphers on; gzip on; gzip_buffers 4 32k; gzip_types "*"; gzip_vary on; gzip_min_length 1k; gzip_comp_level 6; gzip_http_version 1.1; proxy_set_header Host $http_host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; proxy_read_timeout 120s; #nginx代理等待后端服务器的响应时间 proxy_connect_timeout 120s; #nginx代理与后端服务器连接超时时间(代理连接超时) proxy_send_timeout 120s; #后端服务器数据回传给nginx代理超时时间 location / { proxy_pass http://127.0.0.1:8001; } location /ws/ {#ws协议时 proxy_http_version 1.1; proxy_set_header Host $host; proxy_set_header X-Real-Ip $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Nginx-Proxy true; proxy_redirect off; client_max_body_size 10m; proxy_pass http://127.0.0.1:8001; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_connect_timeout 300s; proxy_read_timeout 300s; proxy_send_timeout 300s; } location /sse/ { proxy_set_header Host $http_host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; proxy_cache off; proxy_buffering off; proxy_pass http://127.0.0.1:8001; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "Upgrade"; } location /media/ { alias /root/project/code/tree_hole_gpt/media/; expires 30d; } location /static/ { alias /root/project/static/; expires 30d; } } }
REDIS_CACHE = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=1, password='Admin123.', decode_responses=True) TIXIAN_CHANNEL_NAME = 'TIXIAN' # 提现消息订阅者频道名 class SubMessageView(View): '''消息订阅''' def get(self, request, *args, **kwargs) -> StreamingHttpResponse: '''消息订阅者''' response = StreamingHttpResponse() response.streaming_content = self.chat() response['Cache-Control'] = 'no-cache' # response['Content-Type'] = 'application/json' response['Content-Type'] = 'text/event-stream' return response def chat(self): pubsub = REDIS_CACHE.pubsub() pubsub.subscribe(TIXIAN_CHANNEL_NAME) try: for pubsub_message in pubsub.listen(): loguru.logger.debug(f'消息中间件:{pubsub_message}') if pubsub_message['type'] == 'subscribe': yield f'event:connect' + '\n' + f"data:ok" + '\n\n' else: msg_dict = json.loads(pubsub_message['data']) yield f'event:{msg_dict["type"]}' + '\n' + f"data:{json.dumps(msg_dict['data'])}" + '\n\n' finally: try: pubsub.unsubscribe(TIXIAN_CHANNEL_NAME) except ConnectionError: ...
发布
REDIS_CACHE.publish(TIXIAN_CHANNEL_NAME,
json.dumps({'data':
{'message': f'{SERVER_URL}/media/music/music.mp3'},
'type': 'tixian_message'}
))
在Python中,可以使用redis-py库来操作Redis数据库中的列表数据结构。以下是一些常用的Redis列表操作方法:
lpush(key, value)
: 在列表的左侧插入一个或多个值
rpush(key, value)
: 在列表的右侧插入一个或多个值
lpop(key)
: 移除并返回列表的左侧第一个元素
rpop(key)
: 移除并返回列表的右侧第一个元素
lrange(key, start, end)
: 获取列表指定范围内的元素
llen(key)
: 获取列表的长度
lindex(key, index)
: 获取列表指定索引位置的元素
lset(key, index, value)
: 设置列表指定索引位置的元素的值
lrem(key, count, value)
: 移除列表中指定值的元素
ltrim(key, start, end)
: 截取列表指定范围内的元素
这些方法可以帮助你对Redis中的列表数据进行增删改查操作。你可以根据具体的需求选择合适的方法来操作Redis列表。
# 定义用户信息 user_id = 1 user_info = { 'username': 'john_doe', 'email': 'john@example.com', 'age': 30, 'city': 'New York' } # 将用户信息存储到Redis的Hash中 r.hmset(f'user:{user_id}', user_info) # 获取用户信息 stored_user_info = r.hgetall(f'user:{user_id}') # 打印用户信息 print("Stored User Info:") for field, value in stored_user_info.items(): print(f"{field.decode('utf-8')}: {value.decode('utf-8')}") # 修改用户信息 r.hset(f'user:{user_id}', 'age', 31) # 获取修改后的用户信息 updated_user_info = r.hgetall(f'user:{user_id}') # 打印修改后的用户信息 print("\nUpdated User Info:") for field, value in updated_user_info.items(): print(f"{field.decode('utf-8')}: {value.decode('utf-8')}") # 删除用户信息 r.delete(f'user:{user_id}') # 检查用户信息是否被删除 deleted_user_info = r.hgetall(f'user:{user_id}') if not deleted_user_info: print("\nUser info has been deleted.")
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。