当前位置:   article > 正文

Python & FastAPI, 使用websocket时,recive_text()会一直阻塞直到收到从浏览器的数据_python fastapi websocket因为前一个消息后端还在执行,无法发送第二个消息

python fastapi websocket因为前一个消息后端还在执行,无法发送第二个消息

当使用websocket进行实时显示数据时,由于接收数据会阻塞线程,而此时await send_text(msg)

无法发送,如果在新的线程中调用发送,是无法调用await send_text()函数的,而如果在recv_text()后再发送,因为recive_text()会阻塞,致使之后服务器不断推送的数据无法发送,所以,经过多方尝试后,可以使用心跳的方式巧妙地实现不断的推送,在web浏览器端,不断的发送心跳,而在服务器端,当接收到的是心跳的信息后,跳过转发指令,直接读取数据。

如下:

服务端:

  1. @app.websocket("/ws/{client_id}")
  2. async def websocket_endpoint(websocket: WebSocket, client_id: str):
  3. global manager
  4. global isWhile
  5. await manager.connect(client_id,websocket)
  6. print("client %s: connected." % client_id)
  7. try:
  8. while True:
  9. dataFromExplor = await manager.recivemessage(websocket) #从网页socket中获取到的值;
  10. jsonData = json.loads(dataFromExplor) #json字符串解析成json对象;
  11. if jsonData["cmd"] != "Heart.Beat": #若不是心跳,则需要将指令发送出去;通过心跳机制,以使recv不再完全阻塞,疏通了接收数据。
  12. channelPacket = packetChannelData(client_id,jsonData,{}) #以 真实数据长度(4字节) + 真实数据(json字符串) 的格式进行封装;
  13. manager.sendDataToWebServer(channelPacket) #通过管道将数据发送给TcpSocket线程;
  14. await manager.trySendFromWebServer() #尝试接收返回的数据并发送回web浏览器客户端;
  15. except WebSocketDisconnect:
  16. print("client %s exit!" % client_id)
  17. manager.disconnect(client_id,websocket)

客户端:

  1. var ws
  2. var timerId = -1
  3. window.onload = function()
  4. {
  5. if(timerId >= 0){
  6. window.clearInterval(timerId) //先关闭之前的定时器;
  7. }
  8. if ("WebSocket" in window){ //表明此浏览器支持 WebSocket!
  9. // 打开一个 web socket;
  10. var client_id = document.getElementById("username").value + "."+Date.now()
  11. ws = new WebSocket(`ws://192.168.110.37:8001/ws/${client_id}`);
  12. ws.onopen = function(){ //表明WebSocket已连接上;
  13. //alert("连接成功!");
  14. var cmdJson = {"cmd":"Fetch.Clients","params":{}}
  15. ws.send(JSON.stringify(cmdJson))
  16. timerId = self.setInterval("heartbeat()",100)
  17. }
  18. ws.onmessage = function(event){ //当服务器推送消息时,浏览器收到消息;
  19. var msg = event.data
  20. //alert(msg)
  21. var obj = JSON.parse(msg)
  22. var cmd = obj['query']['cmd']
  23. var reply = obj['reply'] //jsonObject;
  24. if(cmd == "Fetch.Clients"){
  25. document.getElementById("client_ids").innerHTML = JSON.stringify(reply)
  26. return;
  27. }
  28. if(cmd == "Session.Error"){
  29. document.getElementById("session_errmsg").innerHTML = JSON.stringify(reply)
  30. return ;
  31. }
  32. if(cmd == "Session.Status"){
  33. document.getElementById("session_status").innerHTML = JSON.stringify(reply)
  34. return ;
  35. }
  36. if(cmd == "Session.System"){
  37. document.getElementById("session_sys").innerHTML = JSON.stringify(reply)
  38. return ;
  39. }
  40. if(cmd == "Session.Zone"){
  41. document.getElementById("session_zone").innerHTML = JSON.stringify(reply)
  42. return ;
  43. }
  44. if(cmd == "Session.Alarm"){
  45. document.getElementById("session_alarm").innerHTML = JSON.stringify(reply)
  46. return ;
  47. }
  48. if(cmd == "Session.Profile"){
  49. document.getElementById("session_profile").innerHTML = JSON.stringify(reply)
  50. return ;
  51. }
  52. };
  53. ws.onclose = function(){ //关闭WebSocket连接时,可以做些处理;
  54. //alert("连接已关闭...");
  55. if(timerId >= 0){
  56. window.clearInterval(timerId) //先关闭之前的定时器;
  57. }
  58. };
  59. }
  60. else { // 浏览器不支持 WebSocket;
  61. alert("您的浏览器不支持 WebSocket!");
  62. }
  63. }
  64. function heartbeat(){
  65. var cmdJson = {"cmd":"Heart.Beat","params":{}}
  66. ws.send(JSON.stringify(cmdJson))
  67. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/一键难忘520/article/detail/892274
推荐阅读
相关标签
  

闽ICP备14008679号