赞
踩
第一次写文章,若有不对的地方请多多包涵并指正。
本篇后续:【python+pyqt5】B站直播弹幕姬
认识WebSocket
Websocket是一种在单个TCP连接上进行全双工通信的协议。
WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。
python中常用的WebSocket库有websocket-client,websockets,aiowebsocket三个。
WebSocket工作原理
1.客户端向服务端发起建立请求,服务端响应后便在客户端与服务端之间建立起一个信道,这一步称之为握手。
2.建立好信道后,服务端就将新消息推送到信道,每当信道中有新的消息,客户端便可以拿到。
3.服务端为了不占用过多资源,需要定时检查与客户端是否还在连接,因此需要客户端定时向服务端发送心跳包(HeartBeat),确保连接不断开。
认识AioWebSocket
而 AioWebSocket 是python里一个遵循 WebSocket 规范异步 WebSocket 客户端,相对于其他库它更轻、更快。而且异步的特性使得它可以同时接收消息和发送心跳包,更为方便,因此本文选用AioWebSocket。
跟传统的http/https协议相比,WebSocket是长连接,只需向客户端请求一次便可获得永久性连接;而http/https是短连接,向客户端请求一次后便断开连接。因此在获取直播弹幕这种场景下显然WebSocket更具优势。
import requests baseurl = 'https://api.live.bilibili.com/xlive/web-room/v1/dM/gethistory' # 要获取的弹幕的直播间号 roomid = None # 请求头 headers = { 'Host': 'api.live.bilibili.com', "User-Agent": "Mozilla / 5.0(Windows NT 10.0; Win64; x64) AppleWebKit / 537.36(KHTML, like Gecko) Chrome / 80.0.3987.122 Safari / 537.36" } # 传递的参数 data = { 'roomid': roomid, 'csrf_token': '', 'csrf': '', 'visit_id': '', } def getDANMU(): req = requests.post(url=baseurl, headers=headers, data=data) html = req.json() code = html['code'] if req.status_code == 200 and code == 0: for dic in html['data']['room']: name = dic['nickname'] timeline = dic['timeline'].split(' ')[-1] text = dic['text'] # msg = timeline + '' + name + ':' + text + '\n' msg = timeline + ' ' + name + ':' + text print(msg) if __name__ == '__main__': roomid = input('请输入房间号:') getDANMU()
输出结果:
可以看到,http只能一次性获取历史弹幕,这种方法虽然方便、简短,但若需长时间接收新弹幕,就需要循环请求。而且请求间隔不能太长也不能太短:太短会占用网络资源,甚至被封IP;太长会导致丢失一部分弹幕,因为每次请求只返回最新10条弹幕。
而http/https的这些缺点,正是WebSocket的优点。
实际上,B站直播弹幕也是通过WebSocket协议来实现的。那上文中的gethistory是怎么回事呢?其实进入直播间时会先初始化,我们看到一些历史弹幕,就是由gethistory这个链接返回的数据,后面新的弹幕,都是由WebSocket协议来接收的了。
下面我们来看看如何实现用WebSocket协议来接收弹幕。
我们随便找个直播间,打开F12,点击Network,搜索sub,可以看到,这个就是我们客户端与服务端通信的WebSocket协议了。
知道了b站直播也是靠WebSocket实现的,那么下一步,我们怎么用python去模拟客户端跟服务端建立连接呢?
github上其实已经有b站的api了我们不需要自己研究,只需要移植过来。
Bilibili-Live-API
我们只需要关注几部分:调用地址、数据包格式、消息类型
地址直接选择未加密的
按照操作类型分类
按消息类型分类
想要获得哪些信息只需要根据字段捕获就ok!详细可见下面代码,有DANMU_MSG和SEND_GIFT字段消息的捕获
接下来看看如何在python中写代码
安装
pip install aiowebsocket
导入模块
import asyncio
import zlib
from aiowebsocket.converses import AioWebSocket
import json
创建异步任务
remote = 'ws://broadcastlv.chat.bilibili.com:2244/sub'
roomid = '21733344'
data_raw = '000000{headerLen}0010000100000007000000017b22726f6f6d6964223a{roomid}7d'
data_raw = data_raw.format(headerLen=hex(27 + len(roomid))[2:],
roomid=''.join(map(lambda x: hex(ord(x))[2:], list(roomid))))
async def startup():
async with AioWebSocket(remote) as aws:
converse = aws.manipulator
await converse.send(bytes.fromhex(data_raw))
tasks = [receDM(converse), sendHeartBeat(converse)]
await asyncio.wait(tasks)
发送heartbeat包
hb='00 00 00 10 00 10 00 01 00 00 00 02 00 00 00 01'
async def sendHeartBeat(websocket):
while True:
await asyncio.sleep(30)
await websocket.send(bytes.fromhex(hb))
print('[Notice] Sent HeartBeat.')
接收消息
(这里的 if recv_text == None 整个if语句必须加上,否则会断开连接,不知道是阿b的问题还是代码的问题。)
async def receDM(websocket): while True: recv_text = await websocket.receive() if recv_text == None: recv_text = b'\x00\x00\x00\x1a\x00\x10\x00\x01\x00\x00\x00\x08\x00\x00\x00\x01{"code":0}' printDM(recv_text) # 将数据包传入: def printDM(data): # 获取数据包的长度,版本和操作类型 packetLen = int(data[:4].hex(), 16) ver = int(data[6:8].hex(), 16) op = int(data[8:12].hex(), 16) # 有的时候可能会两个数据包连在一起发过来,所以利用前面的数据包长度判断, if (len(data) > packetLen): printDM(data[packetLen:]) data = data[:packetLen] # 有时会发送过来 zlib 压缩的数据包,这个时候要去解压。 if (ver == 2): data = zlib.decompress(data[16:]) printDM(data) return # ver 为1的时候为进入房间后或心跳包服务器的回应。op 为3的时候为房间的人气值。 if (ver == 1): if (op == 3): print('[RENQI] {}'.format(int(data[16:].hex(), 16))) return # ver 不为2也不为1目前就只能是0了,也就是普通的 json 数据。 # op 为5意味着这是通知消息,cmd 基本就那几个了。 if (op == 5): try: jd = json.loads(data[16:].decode('utf-8', errors='ignore')) if (jd['cmd'] == 'DANMU_MSG'): print('[DANMU] ', jd['info'][2][1], ': ', jd['info'][1]) elif (jd['cmd'] == 'SEND_GIFT'): print('[GITT]', jd['data']['uname'], ' ', jd['data']['action'], ' ', jd['data']['num'], 'x', jd['data']['giftName']) elif (jd['cmd'] == 'LIVE'): print('[Notice] LIVE Start!') elif (jd['cmd'] == 'PREPARING'): print('[Notice] LIVE Ended!') else: print('[OTHER] ', jd['cmd']) except Exception as e: pass
入口
if __name__ == '__main__':
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(startup())
except Exception as e:
print('退出')
输入结果:
[DANMU]开头的就是接收到的弹幕内容了。
开始还报了一个warning,是因为
tasks = [receDM(converse), sendHeartBeat(converse)]
await asyncio.wait(tasks)
这种写法已经过期了,在以后新版本的python解释器中将不在适用。不过问题不大,以后不能用再改罢。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。