赞
踩
基于fastapi,用websocket流式接收音频,经过语音识别后调用minimax大模型,再将大模型的流式返回进行音频合成后流式返回
第一次写流式接口,真的是坑都踩了一圈
本文仅提供代码思路,文中的代码不完整,无法直接复制
本文仅提供代码思路,文中的代码不完整,无法直接复制
本文仅提供代码思路,文中的代码不完整,无法直接复制
WebSocket 是一种网络通信协议,提供了浏览器和服务器之间的全双工通信能力。与传统的HTTP请求不同,WebSocket 协议允许数据在客户端和服务器之间进行实时双向传输,而无需频繁地建立新的连接。这种协议特别适合需要实时数据交换的应用场景,比如在线游戏、实时交易平台、聊天应用等。
说人话就是socket的web版本,两边建立了websocket连接之后,就可以全双工通信
在fastapi中,可以使用 @app.websocket("/url")
来创建websocket接口,
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket("/url")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept() #接收websocket连接
......
此时,当客户端程序调用url,ws://ip:port/url
就可以调用了
在接收音频之前需要先和发送方协商好发送格式,这是我定义的发送格式
{
text: Optional[str] = None #文本输入,若输入文本则默认非音频输入
audio: Optional[str] = None # 默认流式输入 base64
meta_info : {
stream: [bool] = True # 是否流式输出
voice_synthesize: [bool] = False #是否在大模型返回后进行音频合成
is_end: [bool] #是否结束,即音频发送是否完成
encoding: [str] = ['raw', ...] #编码格式,raw表示base64,可以在这里定义一些压缩格式
}
这边我们先初始化我们的asr(auto speech recognizer),我用的是开源的一个项目,然后while true开始循环接受消息,若文本输入不为空,则表示该输入为文本输入,我们将文本信息读取出来之后就直接退出循环
如果接受的是音频输入,则通过"is_end"字段判断是否是最后一帧,用asr进行语音识别后添加到current_message中,若是最后一帧则asr识别并添加之后,再记录一下后续返回类型就退出
response_type = RESPONSE_TEXT asr_start_time = time.perf_counter() if config["main"]["asr"] == LOCAL_ASR: #使用本地ASR asr = FunAutoSpeechRecognizer() #替换成你自己的ASR try: while True: data_json = json.loads(await ws.receive_text()) if data_json["text"]: #若文字输入不为空,则表示该输入为文字输入 if data_json["meta_info"]["voice_synthesize"]: response_type = RESPONSE_AUDIO #查看voice_synthesize判断返回类型 current_message = data_json['text'] break if not data_json['meta_info']['is_end']: #还在发 asr_result = asr.streaming_recognize(data_json["audio"]) current_message += ''.join(asr_result['text']) else: #发完了 asr_result = asr.streaming_recognize(data_json["audio"],is_end=True) session_id = data_json["meta_info"]["session_id"] current_message += ''.join(asr_result['text']) if data_json["meta_info"]["voice_synthesize"]: response_type = RESPONSE_AUDIO #查看voice_synthesize判断返回类型 break except Exception as e: error_info = f"接收用户消息错误: {str(e)}" error_message = {"type":"error","code":"500","msg":error_info} logger.error(error_info) await ws.send_text(json.dumps(error_message,ensure_ascii=False)) await ws.close() return
识别后的文本被放在current_messgae中
和本地比起来,调用接口用时不稳定,我调用讯飞的接口用时基本上在3,4秒左右,但是本地的话稳定能控制在1秒钟之内。
采取异步的方式,一边流式接受音频,一边流式将音频传给讯飞
usr_chat_recv()
是异步接受方法
user_chat_send()
是异步发送方法
在user_chat_send()
里面调用了讯飞的接口,解释一下就是先获取鉴权url,然后定义两个回调函数,再与讯飞开启websocket连接并使用这两个回调函数,这四个回调函数分别是on_open,在websocket连接建立的时候调用,在on_open里面发送音频数据,on_message,收到讯飞识别后的流式返回,在这个里面获取识别结果
这一版代码是根据讯飞的demo调出来的, 若是要用于生产环境,还需要调整
current_message = "" #用于存储用户消息 response_type = RESPONSE_TEXT #用于获取返回类型 session_id = "" q_recv = queue.Queue() #一个函数往队列中存,一个函数同时从队列中取 chat_type = CHAT_UNCERTAIN #判断一下是语音还是文本信息 if config["main"]["asr"] == REMOTE_ASR: logger.info("开始调用讯飞接口") async def usr_chat_recv(): #定义函数用于接受流式输入,并存入队列 nonlocal current_message nonlocal chat_type nonlocal session_id nonlocal response_type try: while True: data_json = await ws.receive_json() q_recv.put(data_json) if data_json["text"]: #如果是文本则一轮直接退出 if data_json["meta_info"]["voice_synthesize"]: response_type = RESPONSE_AUDIO chat_type = CHAT_TEXT current_message = data_json['text'] session_id = data_json["meta_info"]["session_id"] break else: chat_type = CHAT_AUDIO if data_json['meta_info']["is_end"]: #收到结束标志,退出 if data_json["meta_info"]["voice_synthesize"]: response_type = RESPONSE_AUDIO session_id = data_json["meta_info"]["session_id"] break except Exception as e: error_message = {"type":"error","code":500,"msg":f"error occur when receiving data from front: {str(e)}"} print(error_message) ws.send_text(json.dumps(error_message)) async def user_chat_send(): url = generate_xf_satt_url() def on_open(xfws): #定义on_open回调函数,在websocket建立时触发 def run(*args): interval = 0.04 status = FIRST_FRAME while True: data_json = q_recv.get() if data_json["meta_info"]["is_end"]: #收到结束标志位 status = LAST_FRAME if status == FIRST_FRAME: #第一帧要带上common和business d = {"common": {"app_id": config['xfapi']['APPID']}, "business": {"domain": config['satt']['domain'], "language": config['satt']['language'],"accent": config['satt']['accent'], "vad_eos": config['satt']['vad_eos']}, "data": {"status": 0, "format": "audio/L16;rate=16000", "audio": data_json["audio"], "encoding": "raw"}} d = json.dumps(d) xfws.send(d) status = CONTINUE_FRAME elif status == CONTINUE_FRAME: d = {"data": {"status": 1, "format": "audio/L16;rate=16000", "audio": data_json["audio"], "encoding": "raw"}} xfws.send(json.dumps(d)) elif status == LAST_FRAME: d = {"data": {"status": 2, "format": "audio/L16;rate=16000", "audio": data_json["audio"], "encoding": "raw"}} xfws.send(json.dumps(d)) time.sleep(0.05) break; time.sleep(interval) xfws.close() thread.start_new_thread(run,()) def on_message(xfws,message): try: nonlocal current_message code = json.loads(message)["code"] sid = json.loads(message)["sid"] if code != 0: errMsg = json.loads(message)["message"] print("sid:%s call error:%s code is:%s" % (sid, errMsg, code)) else: data = json.loads(message)["data"]["result"]["ws"] # print(json.loads(message)) result = "" for i in data: for w in i["cw"]: result += w["w"] current_message += result #将讯飞接口返回保存 except Exception as e: print("receive msg,but parse exception:", e) websocket.enableTrace(False) xfws = websocket.WebSocketApp(url,on_message=on_message) xfws.on_open=on_open xfws.run_forever() await usr_chat_recv() if chat_type==CHAT_AUDIO: await user_chat_send()
这里用的是minimax的ChatCompletion v2
定义pyload和header后创建request,在payload中将stream设置为True,在request中也把stream设置为True,使得可以获取大模型的流式返回
try: http_send_start_time = time.perf_counter() payload = json.dumps({ "model":"abab5.5-chat", "stream":True, "messages": messages, "tool_choice":"auto", "max_tokens":10000, "temperature":0.9, "top_p":1 }) headers={ 'Authorization':f"Bearer {config['llm']['API_KEY']}", 'Content-Type':'application/json' } response = requests.request("POST",config["llm"]["url"],headers=headers,data=payload,stream=True) http_send_end_time = time.perf_counter() except Exception as e: error_info = f"发送信息给大模型时发生错误: {str(e)}" error_message ={"type":"error","code":500,"msg":error_info} logger.error(error_info) await ws.send_text(json.dumps(error_message,ensure_ascii=False)) await ws.close() return
首先定义一个split_string_with_punctuation(text)
函数,用于把大模型的返回根据标点符号拆分,这样可以让语音合成之后的音频更加自然,不然的话想想一下一句话“我们今天出去玩吧。”,大模型分两次返回给你,“我们今天”,“出去玩吧”,分成两次音频合成的语调就没有一次那么自然了
然后定义一个函数用于解析minimax返回后的chunk
利用response的迭代器(for chunk in response.iter_lines():
)来处理流式返回的chunk,对于每一个chunk我们都先对其进行解析后读出文本数据,在根据标点拆分后进行音频合成,最后利用await ws.send_text(json.dumps(text_response,ensure_ascii=False))
返回二进制流数据,await ws.send_bytes(audio)
返回文本数据
此处的tts
是我本地的vits,网上也有不少开源库,需自行配置
def split_string_with_punctuation(text): punctuations = ",!?。" result = [] current_sentence = "" for char in text: current_sentence += char if char in punctuations: result.append(current_sentence) current_sentence = "" # 判断最后一个字符是否为标点符号 if current_sentence and current_sentence[-1] not in punctuations: # 如果最后一段不以标点符号结尾,则加入拆分数组 result.append(current_sentence) return result llm_response = "" response_buf = "" def parseChunkDelta(chunk) : decoded_data = chunk.decode('utf-8') parsed_data = json.loads(decoded_data[6:]) if 'delta' in parsed_data['choices'][0]: delta_content = parsed_data['choices'][0]['delta'] return delta_content['content'] else: return "" try: if config["main"]['tts'] == LOCAL_TTS: if response.status_code == 200: for chunk in response.iter_lines(): if chunk: if response_type == RESPONSE_AUDIO: #若返回类型是音频则需要语音合成 chunk_data = parseChunkDelta(chunk) llm_response += chunk_data response_buf += chunk_data split_buf = split_string_with_punctuation(response_buf) response_buf = "" if len(split_buf) != 0: for sentence in split_buf: sr,audio = tts.synthesize(sentence,0,103,0.1,0.668,1.2,return_bytes=True) text_response = {"type":"text","code":200,"msg":sentence} await ws.send_text(json.dumps(text_response,ensure_ascii=False)) #返回文本数据 await ws.send_bytes(audio) #返回二进制流数据 if response_type == RESPONSE_TEXT: chunk_data = parseChunkDelta(chunk) llm_response += chunk_data text_response = {"type":"text","code":200,"msg":chunk_data} await ws.send_text(json.dumps(text_response,ensure_ascii=False)) elif config["main"]['tts'] == REMOTE_TTS: error_info = f"暂不支持远程音频合成" error_message = {"type":"error","code":500,"msg":error_info} logger.error(error_info) await ws.send_text(json.dumps(error_message,ensure_ascii=False)) await ws.close() return logger.info(f"llm消息: {llm_response}") except Exception as e: error_info = f"音频合成与向前端返回时错误: {str(e)}" error_message = {"type":"error","code":500,"msg":error_info} logger.error(error_info) await ws.send_text(json.dumps(error_message,ensure_ascii=False)) await ws.close() return receive_stt_end_time = time.perf_counter()
本文利用利用fastapi接受websocket请求,接收客户端发送的流式音频数据,在本地进行语音识别或使用讯飞接口进行识别,再将识别出的文字发送给minimax大模型,接收minimax的流式返回并进行音频合成,最终把音频以及文本流式返回给客户端。
附上一个客户端代码
import asyncio import websockets import json import base64 from datetime import datetime #你的音频文件 pcm_file_path = 'example_recording.wav' def read_pcm_file_in_chunks(chunk_size): #将音频文件切割成一个一个chunk with open(pcm_file_path, 'rb') as pcm_file: while True: data = pcm_file.read(chunk_size) if not data: break yield data data = { "text": "", "audio": "", "meta_info": { "session_id":"7d0546dd-36b6-4bc2-8008-fc77c78aaa14", "stream": False, "voice_synthesize": True, "is_end": False, "encoding": "raw" } } async def send_audio_chunk(websocket, chunk): # 将PCM数据进行Base64编码 encoded_data = base64.b64encode(chunk).decode('utf-8') # 更新data字典中的"audio"键的值为Base64编码后的音频数据 data["audio"] = encoded_data # 将JSON数据对象转换为JSON字符串 message = json.dumps(data) # 发送JSON字符串到WebSocket接口 await websocket.send(message) async def send_json(): async with websockets.connect('ws://ip:port/url') as websocket: chunks = read_pcm_file_in_chunks(2048) # 读取PCM文件并生成数据块 for chunk in chunks: await send_audio_chunk(websocket, chunk) await asyncio.sleep(0.01) # 等待0.04秒 # 设置data字典中的"is_end"键为True,表示音频流结束 data["meta_info"]["is_end"] = True # 发送最后一个数据块和流结束信号 await send_audio_chunk(websocket, b'') # 发送空数据块表示结束 # 等待并打印接收到的数据 print("等待接收:",datetime.now()) audio_bytes = b'' while True: data_ws = await websocket.recv() try: message_json = json.loads(data_ws) print(message_json) # 打印接收到的消息 if message_json["type"] == "close": break # 如果没有接收到消息,则退出循环 except Exception as e: audio_bytes += data_ws print(e) print("接收完毕:", datetime.now()) # 在此处播放二进制流数据,这里的注释的代码只是示意,用不了 # player = AudioPlayer(RATE=22050) # player.play(audio_bytes) await asyncio.sleep(0.04) # 等待0.04秒后断开连接 await websocket.close() # 启动事件循环 try: asyncio.run(send_json()) except websockets.exceptions.ConnectionClosedOK: print("成功")
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。