当前位置:   article > 正文

python + fastapi + websocket 流式调用minimax 大模型_python 流式接口 大模型

python 流式接口 大模型

python + fastapi + websocket 流式调用minimax 大模型

基于fastapi,用websocket流式接收音频,经过语音识别后调用minimax大模型,再将大模型的流式返回进行音频合成后流式返回

前言

第一次写流式接口,真的是坑都踩了一圈

本文仅提供代码思路,文中的代码不完整,无法直接复制
本文仅提供代码思路,文中的代码不完整,无法直接复制
本文仅提供代码思路,文中的代码不完整,无法直接复制

websocket介绍

WebSocket 是一种网络通信协议,提供了浏览器和服务器之间的全双工通信能力。与传统的HTTP请求不同,WebSocket 协议允许数据在客户端和服务器之间进行实时双向传输,而无需频繁地建立新的连接。这种协议特别适合需要实时数据交换的应用场景,比如在线游戏、实时交易平台、聊天应用等。

说人话就是socket的web版本,两边建立了websocket连接之后,就可以全双工通信

在fastapi中编写websocket接口

在fastapi中,可以使用 @app.websocket("/url")来创建websocket接口,

from fastapi import FastAPI, WebSocket

app = FastAPI()

@app.websocket("/url")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept() #接收websocket连接
    ......
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

此时,当客户端程序调用url,ws://ip:port/url就可以调用了

流式接收音频并进行本地语音识别

发送格式

在接收音频之前需要先和发送方协商好发送格式,这是我定义的发送格式

{
    text: Optional[str] = None	#文本输入,若输入文本则默认非音频输入	
    audio: Optional[str] = None    # 默认流式输入 base64
    meta_info : {
        stream: [bool] = True     # 是否流式输出
        voice_synthesize: [bool] = False #是否在大模型返回后进行音频合成
        is_end: [bool] #是否结束,即音频发送是否完成
        encoding: [str] = ['raw', ...] #编码格式,raw表示base64,可以在这里定义一些压缩格式
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
流式接收并进行本地语音识别

这边我们先初始化我们的asr(auto speech recognizer),我用的是开源的一个项目,然后while true开始循环接受消息,若文本输入不为空,则表示该输入为文本输入,我们将文本信息读取出来之后就直接退出循环

如果接受的是音频输入,则通过"is_end"字段判断是否是最后一帧,用asr进行语音识别后添加到current_message中,若是最后一帧则asr识别并添加之后,再记录一下后续返回类型就退出

response_type = RESPONSE_TEXT
asr_start_time = time.perf_counter()
if config["main"]["asr"] == LOCAL_ASR:    #使用本地ASR
    asr = FunAutoSpeechRecognizer() #替换成你自己的ASR
    try:
        while True:
            data_json = json.loads(await ws.receive_text())
            if data_json["text"]: #若文字输入不为空,则表示该输入为文字输入
                if data_json["meta_info"]["voice_synthesize"]:
                    response_type = RESPONSE_AUDIO #查看voice_synthesize判断返回类型
                current_message = data_json['text']
                break
            if not data_json['meta_info']['is_end']: #还在发
                asr_result = asr.streaming_recognize(data_json["audio"])
                current_message += ''.join(asr_result['text'])
            else: #发完了
                asr_result = asr.streaming_recognize(data_json["audio"],is_end=True)
                session_id = data_json["meta_info"]["session_id"]
                current_message += ''.join(asr_result['text'])
                if data_json["meta_info"]["voice_synthesize"]:
                    response_type = RESPONSE_AUDIO #查看voice_synthesize判断返回类型
                break

    except Exception as e:
        error_info = f"接收用户消息错误: {str(e)}"
        error_message = {"type":"error","code":"500","msg":error_info}
        logger.error(error_info)
        await ws.send_text(json.dumps(error_message,ensure_ascii=False))
        await ws.close()
        return
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

识别后的文本被放在current_messgae中

流式接收并调用讯飞流式接口进行语音识别

讯飞流式语音识别接口

和本地比起来,调用接口用时不稳定,我调用讯飞的接口用时基本上在3,4秒左右,但是本地的话稳定能控制在1秒钟之内。

采取异步的方式,一边流式接受音频,一边流式将音频传给讯飞

usr_chat_recv()是异步接受方法

user_chat_send()是异步发送方法

user_chat_send()里面调用了讯飞的接口,解释一下就是先获取鉴权url,然后定义两个回调函数,再与讯飞开启websocket连接并使用这两个回调函数,这四个回调函数分别是on_open,在websocket连接建立的时候调用,在on_open里面发送音频数据,on_message,收到讯飞识别后的流式返回,在这个里面获取识别结果

这一版代码是根据讯飞的demo调出来的, 若是要用于生产环境,还需要调整

current_message = ""	#用于存储用户消息
response_type = RESPONSE_TEXT	#用于获取返回类型
session_id = ""
q_recv = queue.Queue()	#一个函数往队列中存,一个函数同时从队列中取
chat_type = CHAT_UNCERTAIN #判断一下是语音还是文本信息

if config["main"]["asr"] == REMOTE_ASR:
    logger.info("开始调用讯飞接口")
    async def usr_chat_recv():	#定义函数用于接受流式输入,并存入队列
        nonlocal current_message
        nonlocal chat_type
        nonlocal session_id
        nonlocal response_type
        try:
            while True:
                data_json = await ws.receive_json()
                q_recv.put(data_json)
                if data_json["text"]: #如果是文本则一轮直接退出
                    if data_json["meta_info"]["voice_synthesize"]:
                        response_type = RESPONSE_AUDIO
                    chat_type = CHAT_TEXT
                    current_message = data_json['text']
                    session_id = data_json["meta_info"]["session_id"]
                    break
                else:
                    chat_type = CHAT_AUDIO

                if data_json['meta_info']["is_end"]: #收到结束标志,退出
                    if data_json["meta_info"]["voice_synthesize"]:
                        response_type = RESPONSE_AUDIO
                    session_id = data_json["meta_info"]["session_id"]
                    break
        except Exception as e:
            error_message = {"type":"error","code":500,"msg":f"error occur when receiving data from front: {str(e)}"}
            print(error_message)
            ws.send_text(json.dumps(error_message))

    async def user_chat_send():
        url = generate_xf_satt_url()
        def on_open(xfws):  #定义on_open回调函数,在websocket建立时触发
            def run(*args):
                interval = 0.04
                status = FIRST_FRAME
                while True:
                    data_json = q_recv.get()
                    if data_json["meta_info"]["is_end"]: #收到结束标志位
                        status = LAST_FRAME
                    if status == FIRST_FRAME: #第一帧要带上common和business
                        d = {"common": {"app_id": config['xfapi']['APPID']},
                            "business": {"domain": config['satt']['domain'], "language": config['satt']['language'],"accent": config['satt']['accent'], "vad_eos": config['satt']['vad_eos']},
                            "data": {"status": 0, "format": "audio/L16;rate=16000",
                                    "audio": data_json["audio"],
                                    "encoding": "raw"}}
                        d = json.dumps(d)
                        xfws.send(d)
                        status = CONTINUE_FRAME
                    elif status == CONTINUE_FRAME:
                        d = {"data": {"status": 1, "format": "audio/L16;rate=16000",
                                    "audio": data_json["audio"],
                                    "encoding": "raw"}}
                        xfws.send(json.dumps(d))
                    elif status == LAST_FRAME:
                        d = {"data": {"status": 2, "format": "audio/L16;rate=16000",
                                    "audio": data_json["audio"],
                                    "encoding": "raw"}}
                        xfws.send(json.dumps(d))
                        time.sleep(0.05)
                        break;
                    time.sleep(interval)
                xfws.close()
            thread.start_new_thread(run,())

        def  on_message(xfws,message):       
            try:
                nonlocal current_message
                code = json.loads(message)["code"]
                sid = json.loads(message)["sid"]
                if code != 0:
                    errMsg = json.loads(message)["message"]
                    print("sid:%s call error:%s code is:%s" % (sid, errMsg, code))
                else:
                    data = json.loads(message)["data"]["result"]["ws"]
                    # print(json.loads(message))
                    result = ""
                    for i in data:
                        for w in i["cw"]:
                            result += w["w"]
                    current_message += result #将讯飞接口返回保存
            except Exception as e:
                print("receive msg,but parse exception:", e)

        websocket.enableTrace(False)
        xfws = websocket.WebSocketApp(url,on_message=on_message)
        xfws.on_open=on_open
        xfws.run_forever()

    await usr_chat_recv()
    if chat_type==CHAT_AUDIO:
        await user_chat_send()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99

调用minimax大模型

这里用的是minimax的ChatCompletion v2

minimaxAPI文档

定义pyload和header后创建request,在payload中将stream设置为True,在request中也把stream设置为True,使得可以获取大模型的流式返回

    try:
        http_send_start_time = time.perf_counter()
        payload = json.dumps({
            "model":"abab5.5-chat",
            "stream":True,
            "messages": messages,
            "tool_choice":"auto",
            "max_tokens":10000,
            "temperature":0.9,
            "top_p":1
        })
        headers={
            'Authorization':f"Bearer {config['llm']['API_KEY']}",
            'Content-Type':'application/json'
        }
        response = requests.request("POST",config["llm"]["url"],headers=headers,data=payload,stream=True)
        http_send_end_time = time.perf_counter()
    except Exception as e:
        error_info = f"发送信息给大模型时发生错误: {str(e)}"
        error_message ={"type":"error","code":500,"msg":error_info} 
        logger.error(error_info)
        await ws.send_text(json.dumps(error_message,ensure_ascii=False))
        await ws.close()
        return
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

获取大模型流式返回并进行语音合成

首先定义一个split_string_with_punctuation(text)函数,用于把大模型的返回根据标点符号拆分,这样可以让语音合成之后的音频更加自然,不然的话想想一下一句话“我们今天出去玩吧。”,大模型分两次返回给你,“我们今天”,“出去玩吧”,分成两次音频合成的语调就没有一次那么自然了

然后定义一个函数用于解析minimax返回后的chunk

利用response的迭代器(for chunk in response.iter_lines():)来处理流式返回的chunk,对于每一个chunk我们都先对其进行解析后读出文本数据,在根据标点拆分后进行音频合成,最后利用await ws.send_text(json.dumps(text_response,ensure_ascii=False))返回二进制流数据,await ws.send_bytes(audio)返回文本数据

此处的tts是我本地的vits,网上也有不少开源库,需自行配置

    def split_string_with_punctuation(text):
        punctuations = ",!?。"
        result = []
        current_sentence = ""
        for char in text:
            current_sentence += char
            if char in punctuations:
                result.append(current_sentence)
                current_sentence = ""
        # 判断最后一个字符是否为标点符号
        if current_sentence and current_sentence[-1] not in punctuations:
            # 如果最后一段不以标点符号结尾,则加入拆分数组
            result.append(current_sentence)
        return result
    
    llm_response = ""
    response_buf = ""
    def parseChunkDelta(chunk) :
        decoded_data = chunk.decode('utf-8')
        parsed_data = json.loads(decoded_data[6:])
        if 'delta' in parsed_data['choices'][0]:
            delta_content = parsed_data['choices'][0]['delta']
            return delta_content['content']
        else:
            return ""
    try:
        if config["main"]['tts'] == LOCAL_TTS:
            if response.status_code == 200:
                for chunk in response.iter_lines():
                    if chunk:
                        if response_type == RESPONSE_AUDIO: #若返回类型是音频则需要语音合成
                            chunk_data = parseChunkDelta(chunk)
                            llm_response += chunk_data
                            response_buf += chunk_data
                            split_buf = split_string_with_punctuation(response_buf)
                            response_buf = ""
                            if len(split_buf) != 0:
                                for sentence in split_buf:
                                    sr,audio = tts.synthesize(sentence,0,103,0.1,0.668,1.2,return_bytes=True)
                                    text_response = {"type":"text","code":200,"msg":sentence}
                                    await ws.send_text(json.dumps(text_response,ensure_ascii=False)) #返回文本数据
                                    await ws.send_bytes(audio) #返回二进制流数据
                        if response_type == RESPONSE_TEXT:
                            chunk_data = parseChunkDelta(chunk)
                            llm_response += chunk_data
                            text_response = {"type":"text","code":200,"msg":chunk_data} 
                            await ws.send_text(json.dumps(text_response,ensure_ascii=False))

        elif config["main"]['tts'] == REMOTE_TTS:
            error_info = f"暂不支持远程音频合成"
            error_message = {"type":"error","code":500,"msg":error_info}
            logger.error(error_info)
            await ws.send_text(json.dumps(error_message,ensure_ascii=False))
            await ws.close()
            return

        logger.info(f"llm消息: {llm_response}")
    except Exception as e:
        error_info = f"音频合成与向前端返回时错误: {str(e)}"
        error_message = {"type":"error","code":500,"msg":error_info}
        logger.error(error_info)
        await ws.send_text(json.dumps(error_message,ensure_ascii=False))
        await ws.close()
        return
    receive_stt_end_time = time.perf_counter()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65

总结

本文利用利用fastapi接受websocket请求,接收客户端发送的流式音频数据,在本地进行语音识别或使用讯飞接口进行识别,再将识别出的文字发送给minimax大模型,接收minimax的流式返回并进行音频合成,最终把音频以及文本流式返回给客户端。

附录

附上一个客户端代码

import asyncio
import websockets
import json
import base64
from datetime import datetime

#你的音频文件
pcm_file_path = 'example_recording.wav'

def read_pcm_file_in_chunks(chunk_size): #将音频文件切割成一个一个chunk
    with open(pcm_file_path, 'rb') as pcm_file:
        while True:
            data = pcm_file.read(chunk_size)
            if not data:
                break
            yield data

data = {
    "text": "",
    "audio": "",
    "meta_info": {
        "session_id":"7d0546dd-36b6-4bc2-8008-fc77c78aaa14",
        "stream": False,
        "voice_synthesize": True,
        "is_end": False,
        "encoding": "raw"
    }
}

async def send_audio_chunk(websocket, chunk): 
    # 将PCM数据进行Base64编码
    encoded_data = base64.b64encode(chunk).decode('utf-8')
    # 更新data字典中的"audio"键的值为Base64编码后的音频数据
    data["audio"] = encoded_data
    # 将JSON数据对象转换为JSON字符串
    message = json.dumps(data)
    # 发送JSON字符串到WebSocket接口
    await websocket.send(message)

async def send_json():
    async with websockets.connect('ws://ip:port/url') as websocket:
        chunks = read_pcm_file_in_chunks(2048)  # 读取PCM文件并生成数据块
        for chunk in chunks:
            await send_audio_chunk(websocket, chunk)
            await asyncio.sleep(0.01)  # 等待0.04秒
        # 设置data字典中的"is_end"键为True,表示音频流结束
        data["meta_info"]["is_end"] = True
        # 发送最后一个数据块和流结束信号
        await send_audio_chunk(websocket, b'')  # 发送空数据块表示结束
        # 等待并打印接收到的数据

        print("等待接收:",datetime.now())
        audio_bytes = b''
        while True:
            data_ws = await websocket.recv()
            try:
                message_json = json.loads(data_ws)
                print(message_json)  # 打印接收到的消息
                if message_json["type"] == "close":
                    break  # 如果没有接收到消息,则退出循环
            except Exception as e:
                audio_bytes += data_ws
                print(e)
        print("接收完毕:", datetime.now())

        # 在此处播放二进制流数据,这里的注释的代码只是示意,用不了
        # player = AudioPlayer(RATE=22050)
        # player.play(audio_bytes)
        
        await asyncio.sleep(0.04)  # 等待0.04秒后断开连接
        await websocket.close()

# 启动事件循环
try:
    asyncio.run(send_json())
except websockets.exceptions.ConnectionClosedOK:
    print("成功")

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/天景科技苑/article/detail/875651
推荐阅读
相关标签
  

闽ICP备14008679号