当前位置:   article > 正文

Python异步高性能websockets库简单入门(含重连机制与定时任务)_websockets python

websockets python

前言

websockets 是现在 python 最火的 websocket 依赖库,优于原版 websocket 和 ws4。

项目地址:aaugustin / websockets

文档地址:官方文档

重连机制

我就不搬官网 demo 了,想看自己一看即可。

这里我们研究一下断线重连。

服务端

服务端很简单,只接受消息:

import asyncio
import websockets

async def hello(websocket, path):
    while True:
        name = await websocket.recv()
        print(f"< {name}")


start_server = websockets.serve(hello, "localhost", 8765)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
客户端

我们先要知道的是:

  • 当服务端拒绝连接(没有打开 ws ),客户端会抛出 ConnectionRefusedError 错误。
  • 当服务端将连接上的 ws 关闭时(无论是正常关闭还是异常关闭),会抛出 ConnectionClosed 错误(是 websockets 内的错误类)

那就很简单了,我们捕获这两个错误即可。

import asyncio
import websockets as ws
from websockets import ConnectionClosed

count = 0

async def hello():
    uri = "ws://localhost:8765"

    while True:
        try:
            async with ws.connect(uri) as websocket:
                await websocket.send('start')
                while True:
                    try:
                        await websocket.recv()
                    except ConnectionClosed as e:
                        print(e.code)
                        if e.code == 1006:
                            print('restart')
                            await asyncio.sleep(2)
                            break
        except ConnectionRefusedError as e:
            print(e)
            global count
            if count == 10: 
                return
            count += 1
            await asyncio.sleep(2)


asyncio.get_event_loop().run_until_complete(hello())
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

这段代码的核心是两个 while :

  • 第一层 while 的作用是循环 ws 连接上下文,当服务端拒绝连接时,会抛出 ConnectionRefusedError ,我们每隔 2 秒重试一次,最多重试 10 次。
  • 第二层 while 的作用是保证 ws 连接一直处于接收状态(长连接),当 ws 被服务端关闭时,会抛出 ConnectionClosed ,一般我们会收到 1000 正常关闭码和 1006 服务端内部错误异常关闭码两种,在上文的代码中,我们收到异常 1006 关闭码时,就 break 退出 while 循环,从而自动关闭 ws 连接上下文,进行一次新的 ws 上下文连接。

注:在第二层不可以使用 continue 跳过本次循环,必须要重建 ws 上下文连接,否则 ws 连接总是处于被关闭状态。

定时任务

如果要加入定时任务,如自定义的心跳,我们需要创建一个异步 task 任务。

任务函数如下:

async def ping(ws):
    while True:
        try:
            await ws.send('ping')
            await asyncio.sleep(10)
        except:
            break
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

之后在 ws 上下文流程中加入异步 task 执行任务。

	async with ws.connect(uri) as websocket:
	    await websocket.send('start')
	
	    asyncio.create_task(ping(websocket))
	
	    while True:
	        try:
	            await websocket.recv()
	        except ConnectionClosed as e:
	            print(e.code)
	            if e.code == 1006:
	                print('restart')
	                await asyncio.sleep(2)
	                break
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

函数 ping() 是我们自定义的心跳 ping ,他会每隔 10 秒给服务端发送文本为 ping 的消息。

无论服务端将 ws 关闭还是 ws 连接不成功,此 task 必由于 ws 不可用而报错,我们 break 掉该 while 循环即可,从而此 task 执行完毕,当 ws 又被成功建立时,新的 task 定时任务又会被启动。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/85318
推荐阅读
相关标签
  

闽ICP备14008679号