当前位置:   article > 正文

AioWebSocket实现python异步接收B站直播弹幕

aiowebsocket

前言

第一次写文章,若有不对的地方请多多包涵并指正。
本篇后续:【python+pyqt5】B站直播弹幕姬

AioWebSocket是什么

  • 认识WebSocket
    Websocket是一种在单个TCP连接上进行全双工通信的协议。

    WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

    python中常用的WebSocket库有websocket-clientwebsocketsaiowebsocket三个。

  • WebSocket工作原理
    1.客户端向服务端发起建立请求,服务端响应后便在客户端与服务端之间建立起一个信道,这一步称之为握手
    2.建立好信道后,服务端就将新消息推送到信道,每当信道中有新的消息,客户端便可以拿到。
    3.服务端为了不占用过多资源,需要定时检查与客户端是否还在连接,因此需要客户端定时向服务端发送心跳包(HeartBeat),确保连接不断开。

  • 认识AioWebSocket
    而 AioWebSocket 是python里一个遵循 WebSocket 规范异步 WebSocket 客户端,相对于其他库它更轻、更快。而且异步的特性使得它可以同时接收消息和发送心跳包,更为方便,因此本文选用AioWebSocket。

AioWebSocket相比于http/https的优势

跟传统的http/https协议相比,WebSocket是长连接,只需向客户端请求一次便可获得永久性连接;而http/https是短连接,向客户端请求一次后便断开连接。因此在获取直播弹幕这种场景下显然WebSocket更具优势。

  • http/https协议获取B站直播弹幕
    我们到直播间,打开F12,Network,搜索gethistory,可以看到历史弹幕,那么请求这个链接就能获取直播的弹幕。
    在这里插入图片描述
    代码如下
import requests

baseurl = 'https://api.live.bilibili.com/xlive/web-room/v1/dM/gethistory'
# 要获取的弹幕的直播间号
roomid = None
# 请求头
headers = {
    'Host': 'api.live.bilibili.com',
    "User-Agent": "Mozilla / 5.0(Windows NT 10.0; Win64; x64) AppleWebKit / 537.36(KHTML, like Gecko) Chrome / 80.0.3987.122  Safari / 537.36"
}
# 传递的参数
data = {
    'roomid': roomid,
    'csrf_token': '',
    'csrf': '',
    'visit_id': '',
}

def getDANMU():
    req = requests.post(url=baseurl, headers=headers, data=data)
    html = req.json()
    code = html['code']

    if req.status_code == 200 and code == 0:
        for dic in html['data']['room']:
            name = dic['nickname']
            timeline = dic['timeline'].split(' ')[-1]
            text = dic['text']
            # msg = timeline + '' + name + ':' + text + '\n'
            msg = timeline + ' ' + name + ':' + text
            print(msg)

if __name__ == '__main__':
    roomid = input('请输入房间号:')
    getDANMU()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

输出结果:
在这里插入图片描述
可以看到,http只能一次性获取历史弹幕,这种方法虽然方便、简短,但若需长时间接收新弹幕,就需要循环请求。而且请求间隔不能太长也不能太短:太短会占用网络资源,甚至被封IP;太长会导致丢失一部分弹幕,因为每次请求只返回最新10条弹幕。
而http/https的这些缺点,正是WebSocket的优点。

AioWebSocket实现接收弹幕功能

实际上,B站直播弹幕也是通过WebSocket协议来实现的。那上文中的gethistory是怎么回事呢?其实进入直播间时会先初始化,我们看到一些历史弹幕,就是由gethistory这个链接返回的数据,后面新的弹幕,都是由WebSocket协议来接收的了。
下面我们来看看如何实现用WebSocket协议来接收弹幕。

我们随便找个直播间,打开F12,点击Network,搜索sub,可以看到,这个就是我们客户端与服务端通信的WebSocket协议了。
在这里插入图片描述知道了b站直播也是靠WebSocket实现的,那么下一步,我们怎么用python去模拟客户端跟服务端建立连接呢?
github上其实已经有b站的api了我们不需要自己研究,只需要移植过来。
Bilibili-Live-API
我们只需要关注几部分:调用地址、数据包格式、消息类型

地址直接选择未加密的在这里插入图片描述
按照操作类型分类
在这里插入图片描述
按消息类型分类
想要获得哪些信息只需要根据字段捕获就ok!详细可见下面代码,有DANMU_MSG和SEND_GIFT字段消息的捕获

接下来看看如何在python中写代码
安装

pip install aiowebsocket
  • 1

导入模块

import asyncio
import zlib
from aiowebsocket.converses import AioWebSocket
import json
  • 1
  • 2
  • 3
  • 4

创建异步任务

remote = 'ws://broadcastlv.chat.bilibili.com:2244/sub'
roomid = '21733344'

data_raw = '000000{headerLen}0010000100000007000000017b22726f6f6d6964223a{roomid}7d'
data_raw = data_raw.format(headerLen=hex(27 + len(roomid))[2:],
                           roomid=''.join(map(lambda x: hex(ord(x))[2:], list(roomid))))

async def startup():
    async with AioWebSocket(remote) as aws:
        converse = aws.manipulator
        await converse.send(bytes.fromhex(data_raw))
        tasks = [receDM(converse), sendHeartBeat(converse)]
        await asyncio.wait(tasks)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

发送heartbeat包

hb='00 00 00 10 00 10 00 01  00 00 00 02 00 00 00 01'
async def sendHeartBeat(websocket):
    while True:
        await asyncio.sleep(30)
        await websocket.send(bytes.fromhex(hb))
        print('[Notice] Sent HeartBeat.')
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

接收消息
(这里的 if recv_text == None 整个if语句必须加上,否则会断开连接,不知道是阿b的问题还是代码的问题。)

async def receDM(websocket):
    while True:
        recv_text = await websocket.receive()

        if recv_text == None:
            recv_text = b'\x00\x00\x00\x1a\x00\x10\x00\x01\x00\x00\x00\x08\x00\x00\x00\x01{"code":0}'

        printDM(recv_text)


# 将数据包传入:
def printDM(data):
    # 获取数据包的长度,版本和操作类型
    packetLen = int(data[:4].hex(), 16)
    ver = int(data[6:8].hex(), 16)
    op = int(data[8:12].hex(), 16)

    # 有的时候可能会两个数据包连在一起发过来,所以利用前面的数据包长度判断,
    if (len(data) > packetLen):
        printDM(data[packetLen:])
        data = data[:packetLen]

    # 有时会发送过来 zlib 压缩的数据包,这个时候要去解压。
    if (ver == 2):
        data = zlib.decompress(data[16:])
        printDM(data)
        return

    # ver 为1的时候为进入房间后或心跳包服务器的回应。op 为3的时候为房间的人气值。
    if (ver == 1):
        if (op == 3):
            print('[RENQI]  {}'.format(int(data[16:].hex(), 16)))
        return


    # ver 不为2也不为1目前就只能是0了,也就是普通的 json 数据。
    # op 为5意味着这是通知消息,cmd 基本就那几个了。
    if (op == 5):
        try:
            jd = json.loads(data[16:].decode('utf-8', errors='ignore'))
            if (jd['cmd'] == 'DANMU_MSG'):
                print('[DANMU] ', jd['info'][2][1], ': ', jd['info'][1])
            elif (jd['cmd'] == 'SEND_GIFT'):
                print('[GITT]', jd['data']['uname'], ' ', jd['data']['action'], ' ', jd['data']['num'], 'x',
                      jd['data']['giftName'])
            elif (jd['cmd'] == 'LIVE'):
                print('[Notice] LIVE Start!')
            elif (jd['cmd'] == 'PREPARING'):
                print('[Notice] LIVE Ended!')
            else:
                print('[OTHER] ', jd['cmd'])
        except Exception as e:
            pass
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

入口

if __name__ == '__main__':
    try:
        loop = asyncio.get_event_loop()
        loop.run_until_complete(startup())
    except Exception as e:
        print('退出')
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

输入结果:
在这里插入图片描述
[DANMU]开头的就是接收到的弹幕内容了。
开始还报了一个warning,是因为

tasks = [receDM(converse), sendHeartBeat(converse)]
await asyncio.wait(tasks)
  • 1
  • 2

这种写法已经过期了,在以后新版本的python解释器中将不在适用。不过问题不大,以后不能用再改罢。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/85313
推荐阅读
相关标签
  

闽ICP备14008679号