当前位置:   article > 正文

*【python flask】给多个webocket客户端推送信息_from geventwebsocket.handler import websockethandl

from geventwebsocket.handler import websockethandler

目录

一、初步实现 

服务端

客户端

测试

二、改进

三、异步发送信息

四、调用一次HTTP接口,就发送一次WS信息


一、初步实现 

服务端

  1. from flask import Flask,request
  2. from geventwebsocket.handler import WebSocketHandler
  3. from gevent.pywsgi import WSGIServer
  4. import time,json
  5. app = Flask(__name__)
  6. websocketClient_list = [] # 用于存放所有的websocket客户端对象
  7. @app.route("/")
  8. def index():
  9. ws = request.environ.get("wsgi.websocket")
  10. if not ws: # 使用的是http协议"
  11. pass
  12. else: # 使用的是websocket协议
  13. websocketClient_list.append(ws)
  14. while True:
  15. # 监听连接的websocket客户端,如果message为none,就移除这个客户端
  16. message = ws.receive()
  17. if not message:
  18. websocketClient_list.remove(ws)
  19. ws.close()
  20. break
  21. else:
  22. # 给所有在线的websocket客户端发信息
  23. for client in websocketClient_list:
  24. client.send(json.dumps({"time:":str(time.time())}, ensure_ascii=False))
  25. time.sleep(1)
  26. print("当前在线人数:",len(websocketClient_list)) # 查看当前在线的websocket对象
  27. return "..."
  28. if __name__ == '__main__':
  29. http_server = WSGIServer(("127.0.0.1", 5000), app, handler_class=WebSocketHandler)
  30. print("server start")
  31. http_server.serve_forever()

客户端

地址:websocket在线测试

使用该在线工具作为websocket客户端

测试

我这里开了两个客户端:

当服务端收到任何一个客户端发送的消息时,就会给所有在线的客户端发送信息,并打印当前在线人数。当有客户端断开后,就清除这个客户端对象,然后再次打印在线人数。

二、改进

但是上述代码的问题是只有接收到客户端的信息时,服务端才会发送下一条信息。于是我将监听客户端的代码作为一个线程分离出去,就解决了这个问题,如下所示:

(该段代码不论有没有接收到客户端的信息,都会主动向所有在线的客户端发送信息,而且当有客户端断开时,服务端会将这个断开的客户端close)

  1. from flask import Flask,request
  2. from geventwebsocket.handler import WebSocketHandler
  3. from gevent.pywsgi import WSGIServer
  4. from gevent import monkey
  5. import time,json,threading
  6. app = Flask(__name__)
  7. monkey.patch_all()
  8. def thread_listenClient(ws):
  9. while True:
  10. # 监听连接的websocket客户端,如果message为none,就移除这个客户端
  11. message = ws.receive()
  12. if not message:
  13. websocketClient_list.remove(ws)
  14. ws.close()
  15. break
  16. websocketClient_list = [] # 用于存放所有的websocket客户端对象
  17. @app.route("/")
  18. def index():
  19. ws = request.environ.get("wsgi.websocket")
  20. thread_obj = threading.Thread(target=thread_listenClient, args=(ws,))
  21. thread_obj.daemon = True
  22. thread_obj.start()
  23. if not ws: # 使用的是http协议"
  24. pass
  25. else: # 使用的是websocket协议
  26. websocketClient_list.append(ws)
  27. while True:
  28. # 给所有在线的websocket客户端发信息
  29. for client in websocketClient_list:
  30. client.send(json.dumps({"time:":str(time.time())}, ensure_ascii=False))
  31. time.sleep(1)
  32. print("当前在线人数:",len(websocketClient_list)) # 查看当前在线的websocket对象
  33. return "..."
  34. if __name__ == '__main__':
  35. http_server = WSGIServer(("127.0.0.1", 5000), app, handler_class=WebSocketHandler)
  36. print("server start")
  37. http_server.serve_forever()

三、异步发送信息

为了提高消息发送的效率和实时性,我们可以考虑使用异步的方式来发送消息

  1. from flask import Flask, request
  2. import asyncio
  3. import json
  4. import time
  5. import websockets
  6. from threading import Thread
  7. app = Flask(__name__)
  8. # 存储所有的WS客户端对象
  9. websocketClient_list = []
  10. async def send_message():
  11. while True:
  12. if websocketClient_list:
  13. message = json.dumps({"time": str(time.time())}, ensure_ascii=False)
  14. await asyncio.wait([client.send(message) for client in websocketClient_list])
  15. await asyncio.sleep(1)
  16. async def websocket_handler(websocket, path):
  17. websocketClient_list.append(websocket)
  18. try:
  19. async for message in websocket:
  20. pass # 在这里来处理传入的消息
  21. except websockets.ConnectionClosed:
  22. pass
  23. finally:
  24. websocketClient_list.remove(websocket)
  25. @app.route("/framework/pushMessage/framework_czc")
  26. def index():
  27. return "WebSocket server is running. Connect to ws://<server-address>:8001/websocket"
  28. def run_websocket_server():
  29. asyncio.set_event_loop(asyncio.new_event_loop())
  30. start_server = websockets.serve(websocket_handler, "0.0.0.0", 8001)
  31. asyncio.get_event_loop().run_until_complete(start_server)
  32. asyncio.get_event_loop().run_until_complete(send_message())
  33. asyncio.get_event_loop().run_forever()
  34. if __name__ == '__main__':
  35. websocket_thread = Thread(target=run_websocket_server)
  36. websocket_thread.start()
  37. app.run(host="0.0.0.0", port=8000)

四、调用一次HTTP接口,就发送一次WS信息

  1. from flask import Flask, request, jsonify
  2. import asyncio
  3. import json
  4. import time
  5. import websockets
  6. from threading import Thread
  7. app = Flask(__name__)
  8. # 存放所有的WS客户端对象
  9. websocketClient_list = []
  10. async def send_message(message):
  11. if websocketClient_list:
  12. sendData = json.dumps(message, ensure_ascii=False) # 转成Json格式字符串
  13. await asyncio.wait([client.send(sendData) for client in websocketClient_list]) # 向所有的WS客户端发送信息
  14. async def websocket_handler(websocket, path):
  15. websocketClient_list.append(websocket)
  16. try:
  17. async for message in websocket:
  18. pass # 这里可以处理WS传来的信息message
  19. except websockets.ConnectionClosed:
  20. pass
  21. finally:
  22. websocketClient_list.remove(websocket)
  23. @app.route("/framework/pushMessage/framework_czc") # WS接口
  24. def index():
  25. return "WebSocket server is running. Connect to ws://<server-address>:8001/websocket"
  26. @app.route("/test", methods=['GET','POST']) # HTTP接口
  27. def connected_clients():
  28. data = json.loads(request.data) # 接收Json格式数据
  29. print(data)
  30. # 发送ws数据
  31. loop = asyncio.new_event_loop()
  32. asyncio.set_event_loop(loop)
  33. loop.run_until_complete(send_message(message=data))
  34. loop.close()
  35. return jsonify({"connected_clients": len(websocketClient_list)})
  36. def run_websocket_server():
  37. asyncio.set_event_loop(asyncio.new_event_loop())
  38. start_server = websockets.serve(websocket_handler, "0.0.0.0", 8001)
  39. asyncio.get_event_loop().run_until_complete(start_server)
  40. asyncio.get_event_loop().run_forever()
  41. if __name__ == '__main__':
  42. websocket_thread = Thread(target=run_websocket_server)
  43. websocket_thread.start()
  44. app.run(host="0.0.0.0", port=8000)
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/运维做开发/article/detail/887712
推荐阅读
相关标签
  

闽ICP备14008679号