赞
踩
利用mac 笔记本自带的mic和喇叭,实现智能语音问答功能,具体的实现链路如下:
参考百度智能云平台,可以免费领取试用包。
可以参考官网,按照步骤申请个人免费,有200w的tokens免费使用量,练手足够
和讯飞星火模型是通过websocket进行通信的,因此需要websocket的一些基础知识,会用即可。
python-socket、websocket协议相关知识_后山蓬蒿人的博客-CSDN博客
python线程比较简单,参考如下文章,快速看下即可。
python-线程(threading)实用总结_后山蓬蒿人的博客-CSDN博客
1. 主函数主要实现了逻辑框架,先进行语音录制->asr识别->星火模型->tts播放
2. 实现语音交互和文本交互的开关。
3. 做了部分异常的兼容性处理。
- import json
- from os import path
-
- from speech_recognition import WaitTimeoutError
-
- from com.wsm.xunfei.baidu.baidu_asr import BaiduASR
- from com.wsm.xunfei.baidu.baidu_tts import BaiDuTTS
- from com.wsm.xunfei.large_model.xunfei import HuoXingModel
-
-
- class ReadCfg:
- def __init__(self, cfg_path):
- self.cfg_path = cfg_path
- self.hx_app_id = ""
- self.hx_app_id = ""
- self.hx_api_secret = ""
- self.hx_api_key = ""
- self.hx_ws_url = ""
- self.hx_domain = ""
- self.bd_secret_key = ""
- self.bd_app_id = ""
- self.bd_app_key = ""
-
- def get_cfg_info(self):
- with open(self.cfg_path, encoding='utf-8') as f:
- cfg = json.load(f)
- self.hx_app_id = cfg["hx_model"]["app_id"]
- self.hx_api_secret = cfg["hx_model"]["api_secret"]
- self.hx_api_key = cfg["hx_model"]["api_key"]
- self.hx_ws_url = cfg["hx_model"]["ws_url"]
- self.hx_domain = cfg["hx_model"]["domain"]
- self.bd_app_id = cfg["baidu_asr"]["app_id"]
- self.bd_app_key = cfg["baidu_asr"]["app_key"]
- self.bd_secret_key = cfg["baidu_asr"]["secret_key"]
-
-
- question_text = []
- # 设置交互方式,2: 键盘文本输入,1:语音交互(默认)
- switch_input = 1
-
- if __name__ == "__main__":
- cfg_file = "config.json"
- cfg = ReadCfg(cfg_file)
- cfg.get_cfg_info()
- print("智能对话准备开启,如需停止,请输入'关闭对话'即可")
- while True:
- if switch_input == 1:
- try:
- asr = BaiduASR(cfg.bd_app_id, cfg.bd_app_key, cfg.bd_secret_key)
- input_text = asr.speech_to_text()
- print("我:%s" % input_text)
- if "关闭对话" in input_text:
- tts = BaiDuTTS(cfg.bd_app_id, cfg.bd_app_key, cfg.bd_secret_key)
- tts.text_to_speech_baidu_and_play("已为您关闭对话")
- break
- except WaitTimeoutError:
- tts = BaiDuTTS(cfg.bd_app_id, cfg.bd_app_key, cfg.bd_secret_key)
- tts.text_to_speech_baidu_and_play("1分钟超时,应用已退出")
- break
- except:
- tts = BaiDuTTS(cfg.bd_app_id, cfg.bd_app_key, cfg.bd_secret_key)
- tts.text_to_speech_baidu_and_play("应用异常退出")
- break
- else:
- input_text = input("我:")
- if "关闭对话" in input_text:
- break
- if len(input_text.strip()) == 0:
- print("输入为空,请重新输入")
- continue
- spark_client = HuoXingModel(cfg.hx_app_id, cfg.hx_api_secret, cfg.hx_api_key, cfg.hx_ws_url, cfg.hx_domain)
- question_text += spark_client.checkQuestionLen(spark_client.getText("user", input_text))
- # print("full question :%s" % question_text)
- # 生成request的请求参数
- request_data = spark_client.gen_params(question_text)
- spark_client.start(request_data)
- if switch_input == 1:
- tts = BaiDuTTS(cfg.bd_app_id, cfg.bd_app_key, cfg.bd_secret_key)
- tts.text_to_speech_baidu_and_play(spark_client.answer)
- question_text = spark_client.checkQuestionLen(spark_client.getText("assistant", spark_client.answer))
- # print("full answer:%s" % question_text)
1. 利用mac电脑的mic进行录音(采样率16khz),并临时存放到wav格式音频,主要是用模块为speech_recognition。
2. 调用百度AipSpeech模块(百度官网申请,有免费试用项目),语音转文本。
- import os
- from os import path
-
- from aip import AipSpeech
- import speech_recognition as sr
-
-
- class BaiduASR():
- def __init__(self, APP_ID, API_KEY, SECRET_KEY):
- self.APP_ID = APP_ID
- self.API_KEY = API_KEY
- self.SECRET_KEY = SECRET_KEY
- self.client = AipSpeech(self.APP_ID, self.API_KEY, self.SECRET_KEY)
- self.r = sr.Recognizer()
- self.path = path.dirname(__file__)
- # 清除上一次保留的音频文件
- self.del_asr()
- # print(self.path)
-
- # 定义从mic录制音频方法
- def _record(self, if_cmu: bool = False, rate=16000):
- with sr.Microphone(sample_rate=rate) as source:
- self.r.adjust_for_ambient_noise(source, duration=1)
- print("可以开始说话了")
- audio = self.r.listen(source=source, timeout=20, phrase_time_limit=2)
- file_name = self.path + "/audio/speech.wav"
- with open(file_name, "wb") as f:
- f.write(audio.get_wav_data())
- if if_cmu:
- return audio
- else:
- return self._get_file_content(file_name)
-
- # 从本地获取音频文件内容,作为后续asr识别
- def _get_file_content(self, file_name):
- with open(file_name, "rb") as f:
- audio_data = f.read()
- return audio_data
-
- def speech_to_text(self, audi_path: str = "test.wav", if_microphone: bool = True):
- if if_microphone:
- asr_result = self.client.asr(self._record(), 'wav', 16000, {'dev_pid': 1537, })
- else:
- asr_result = self.client.asr(self._get_file_content(audi_path), 'wav', 16000, {'dev_pid': 1537, })
- if asr_result["err_msg"] != "success.":
- return "语音识别错误:" + asr_result["err_msg"]
- else:
- return asr_result["result"][0]
-
- def del_asr(self):
- print("del asr file...")
- if os.path.exists(self.path + "/audio/speech.wav"):
- os.remove(self.path + "/audio/speech.wav")
同样,星火大模型,也有个人的免费版,可以去官网申请。本类input 问题的文本,out为星火的回复文本。
- import base64
- import hashlib
- import hmac
- import ssl
- from datetime import datetime
- from time import mktime
- from urllib.parse import urlparse, urlencode
- from wsgiref.handlers import format_date_time
-
- from com.wsm.xunfei.ws_client.ws_client import WsClient
-
-
- class HuoXingModel:
-
- def __init__(self, app_id, api_secret, api_key, url, domain):
- self.app_id = app_id
- self.api_secret = api_secret
- self.api_key = api_key
- self.url = url
- self.domain = domain
- self.text = []
- self.answer = ""
-
- # 接口鉴权,返回最终url
- def getAuthApi(self):
- print("接口鉴权")
- # 1. 获取host,从url中截取
- host = urlparse(self.url).netloc
- path = urlparse(self.url).path
- # 2. 获取data,生成RFC1123格式的时间戳
- cur_time = datetime.now()
- date = format_date_time(mktime(cur_time.timetuple()))
- # 3. 拼接字符串
- tmp = "host: " + host + "\n"
- tmp += "date: " + date + "\n"
- tmp += "GET " + path + " HTTP/1.1"
- """上方拼接生成的tmp字符串如下
- host: spark-api.xf-yun.com
- date: Fri, 05 May 2023 10:43:39 GMT
- GET /v1.1/chat HTTP/1.1
- """
- print("拼接后的字符串为%s" % tmp)
- # 4.对上述3中的tmp进行签名(hmac-sha256算法+api_secret)
- tmp_sha = hmac.new(self.api_secret.encode('utf-8'), tmp.encode('utf-8'), digestmod=hashlib.sha256).digest()
- # 5. 将上方的tmp_sha进行base64编码生成signature
- signature = base64.b64encode(tmp_sha).decode(encoding='utf-8')
- # 6.利用上面生成的signature,拼接下方的字符串生成authorization_origin
- authorization_origin = f'api_key="{self.api_key}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature}"'
- # 7.最后再将上方的authorization_origin进行base64编码,生成最终的authorization
- authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
- # 8. 将鉴权参数组合成最终的键值对,并urlencode生成最终的握手url。开发者可先根据上面的步骤一步步进行参数校验,确保生成的参数无误
- v = {
- "authorization": authorization, # 上方鉴权生成的authorization
- "date": date, # 步骤1生成的date
- "host": host # 请求的主机名,根据具体接口替换
- }
- url_auth = self.url + '?' + urlencode(v)
- """生成的最终url如下
- wss://spark-api.xf-yun.com/v1.1/chat?authorization=YXBpX2tleT0iYWRkZDIyNzJiNmQ4YjdjOGFiZGQ3OTUzMTQyMGNhM2IiLCBhbGdvcml0aG09ImhtYWMtc2hhMjU2IiwgaGVhZGVycz0iaG9zdCBkYXRlIHJlcXVlc3QtbGluZSIsIHNpZ25hdHVyZT0iejVnSGR1M3B4VlY0QURNeWs0Njd3T1dEUTlxNkJRelIzbmZNVGpjL0RhUT0i&date=Fri%2C+05+May+2023+10%3A43%3A39+GMT&host=spark-api.xf-yun.com
- """
- print("鉴权后的url:%s" % url_auth)
- return url_auth
-
- # 定义问题输入的内容长度小于8000字符
- def checkQuestionLen(self, text):
- while (self.getContentlength(text) > 8000):
- del text[0]
- return text
-
- # 获取content字符长度
- def getContentlength(self, text):
- length = 0
- for content in text:
- temp = content["content"]
- leng = len(temp)
- length += leng
- return length
-
- # 组合请求-payload-message-text中的内容
- def getText(self, role, content):
- #print("1:%s"%self.text)
- jsoncon = {}
- jsoncon["role"] = role
- jsoncon["content"] = content
- self.text.append(jsoncon)
- #print("2:%s" % self.text)
- return self.text
-
- def gen_params(self, question):
- """
- 通过appid和用户的提问来生成请参数
- """
- data = {
- "header": {
- "app_id": self.app_id,
- "uid": "1234"
- },
- "parameter": {
- "chat": {
- "domain": self.domain,
- "random_threshold": 0.5,
- "max_tokens": 2048,
- "auditing": "default"
- }
- },
- "payload": {
- "message": {
- "text": question
- }
- }
- }
- return data
-
- def start(self, question_text):
- ws_client = WsClient(url=self.getAuthApi(), question=question_text)
- ws_client.start()
- self.answer = ws_client.answer
星火模型是通过websocket建立连接的,需要注意的是,星火每次回复后,会断开websocket连接。如需再次问答,需要重新建立ws连接。
- import json
- import ssl
- import threading
- import websocket
-
-
- class WsClient:
- def __init__(self, url, question):
- self.send_msg = question
- self.url = url
- self.ws = None
- self.answer = ""
-
- def on_open(self, ws):
- print("onOpen")
- threading.Thread(target=self.run, args=()).start()
-
- def on_message(self, ws, message):
- #print("onMessage:%s" % message)
- data = json.loads(message)
- code = data['header']['code']
- if code != 0:
- print(f'请求错误: {code}, {data}')
- self.ws.close()
- else:
- choices = data["payload"]["choices"]
- status = choices["status"]
- content = choices["text"][0]["content"]
- self.answer += content
- # print(1)
- # status 0:首结果,1中间结果,2最后一个结果
- if status == 2:
- print("星火:%s" % self.answer)
- ws.close()
-
- def on_close(self, ws, one, two):
- print("onClose...")
-
- def on_error(self, ws, error):
- print("onError: %s" % error)
-
- def run(self):
- print("thread running...")
- data = json.dumps(self.send_msg)
- self.ws.send(data)
-
- def start(self):
- websocket.enableTrace(False)
- self.ws = websocket.WebSocketApp(url=self.url,
- on_error=self.on_error,
- on_open=self.on_open,
- on_close=self.on_close,
- on_message=self.on_message)
- self.ws.run_forever(ping_interval=60, ping_timeout=5, sslopt={"cert_reqs": ssl.CERT_NONE})
-
-
- if __name__ == "__main__":
- print("test")
星火大模型返回到文本,再次调用百度AipSpeech模块生成tts音频
- import os.path
- from os import path
-
- from aip import AipSpeech
- from playsound import playsound
-
-
- class BaiDuTTS():
-
- def __init__(self, APP_ID, API_KEY, SECRET_KEY):
- self.APP_ID = APP_ID
- self.API_KEY = API_KEY
- self.SECRET_KEY = SECRET_KEY
- self.client = AipSpeech(self.APP_ID, self.API_KEY, self.SECRET_KEY)
- # 存放音频目录
- self.path = path.dirname(__file__)
- # 清空上一次保存的音频文件
- self.del_tts()
- def text_to_speech_baidu_and_play(self, text):
- result = self.client.synthesis(text, 'zh', 1, {
- 'vol': 5,
- }) # 得到tts的二进制文件
- if not isinstance(result, dict):
- print("tts合成成功")
- with open(self.path + '/audio/audio_tts.wav', 'wb') as f:
- f.write(result)
- else:
- print('tts 合成失败')
- # 播放tts音频
- playsound(self.path + "/audio/audio_tts.wav")
-
- def del_tts(self):
- print("del...")
- if os.path.exists(self.path + "/audio/audio_tts.wav"):
- os.remove(self.path + "/audio/audio_tts.wav")
-
-
- if __name__ == "__main__":
- d = path.dirname(__file__)
- print(d)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。