当前位置:   article > 正文

python-实现智能语音交互

python-实现智能语音交互

需求功能

利用mac 笔记本自带的mic和喇叭,实现智能语音问答功能,具体的实现链路如下:

  1. python调用mac笔记本mic硬件进行收音
  2. 百度asr模型进行语音识别
  3. 讯飞星火大模型进行语义理解并智能回复
  4. 百度tts模型文本转音频
  5. python调用笔记本喇叭播放音频文件

使用的技术栈

百度语音识别&tts接入

参考百度智能云平台,可以免费领取试用包。

短语音识别标准版API - 语音技术

讯飞星火模型接入

可以参考官网,按照步骤申请个人免费,有200w的tokens免费使用量,练手足够

讯飞星火认知大模型-AI大语言模型-星火大模型-科大讯飞

websocket基础知识

和讯飞星火模型是通过websocket进行通信的,因此需要websocket的一些基础知识,会用即可。

python-socket、websocket协议相关知识_后山蓬蒿人的博客-CSDN博客

线程基础知识

python线程比较简单,参考如下文章,快速看下即可。

python-线程(threading)实用总结_后山蓬蒿人的博客-CSDN博客

代码实现

主函数类

1. 主函数主要实现了逻辑框架,先进行语音录制->asr识别->星火模型->tts播放

2. 实现语音交互和文本交互的开关。

3. 做了部分异常的兼容性处理。

  1. import json
  2. from os import path
  3. from speech_recognition import WaitTimeoutError
  4. from com.wsm.xunfei.baidu.baidu_asr import BaiduASR
  5. from com.wsm.xunfei.baidu.baidu_tts import BaiDuTTS
  6. from com.wsm.xunfei.large_model.xunfei import HuoXingModel
  7. class ReadCfg:
  8. def __init__(self, cfg_path):
  9. self.cfg_path = cfg_path
  10. self.hx_app_id = ""
  11. self.hx_app_id = ""
  12. self.hx_api_secret = ""
  13. self.hx_api_key = ""
  14. self.hx_ws_url = ""
  15. self.hx_domain = ""
  16. self.bd_secret_key = ""
  17. self.bd_app_id = ""
  18. self.bd_app_key = ""
  19. def get_cfg_info(self):
  20. with open(self.cfg_path, encoding='utf-8') as f:
  21. cfg = json.load(f)
  22. self.hx_app_id = cfg["hx_model"]["app_id"]
  23. self.hx_api_secret = cfg["hx_model"]["api_secret"]
  24. self.hx_api_key = cfg["hx_model"]["api_key"]
  25. self.hx_ws_url = cfg["hx_model"]["ws_url"]
  26. self.hx_domain = cfg["hx_model"]["domain"]
  27. self.bd_app_id = cfg["baidu_asr"]["app_id"]
  28. self.bd_app_key = cfg["baidu_asr"]["app_key"]
  29. self.bd_secret_key = cfg["baidu_asr"]["secret_key"]
  30. question_text = []
  31. # 设置交互方式,2: 键盘文本输入,1:语音交互(默认)
  32. switch_input = 1
  33. if __name__ == "__main__":
  34. cfg_file = "config.json"
  35. cfg = ReadCfg(cfg_file)
  36. cfg.get_cfg_info()
  37. print("智能对话准备开启,如需停止,请输入'关闭对话'即可")
  38. while True:
  39. if switch_input == 1:
  40. try:
  41. asr = BaiduASR(cfg.bd_app_id, cfg.bd_app_key, cfg.bd_secret_key)
  42. input_text = asr.speech_to_text()
  43. print("我:%s" % input_text)
  44. if "关闭对话" in input_text:
  45. tts = BaiDuTTS(cfg.bd_app_id, cfg.bd_app_key, cfg.bd_secret_key)
  46. tts.text_to_speech_baidu_and_play("已为您关闭对话")
  47. break
  48. except WaitTimeoutError:
  49. tts = BaiDuTTS(cfg.bd_app_id, cfg.bd_app_key, cfg.bd_secret_key)
  50. tts.text_to_speech_baidu_and_play("1分钟超时,应用已退出")
  51. break
  52. except:
  53. tts = BaiDuTTS(cfg.bd_app_id, cfg.bd_app_key, cfg.bd_secret_key)
  54. tts.text_to_speech_baidu_and_play("应用异常退出")
  55. break
  56. else:
  57. input_text = input("我:")
  58. if "关闭对话" in input_text:
  59. break
  60. if len(input_text.strip()) == 0:
  61. print("输入为空,请重新输入")
  62. continue
  63. spark_client = HuoXingModel(cfg.hx_app_id, cfg.hx_api_secret, cfg.hx_api_key, cfg.hx_ws_url, cfg.hx_domain)
  64. question_text += spark_client.checkQuestionLen(spark_client.getText("user", input_text))
  65. # print("full question :%s" % question_text)
  66. # 生成request的请求参数
  67. request_data = spark_client.gen_params(question_text)
  68. spark_client.start(request_data)
  69. if switch_input == 1:
  70. tts = BaiDuTTS(cfg.bd_app_id, cfg.bd_app_key, cfg.bd_secret_key)
  71. tts.text_to_speech_baidu_and_play(spark_client.answer)
  72. question_text = spark_client.checkQuestionLen(spark_client.getText("assistant", spark_client.answer))
  73. # print("full answer:%s" % question_text)

ASR类

1. 利用mac电脑的mic进行录音(采样率16khz),并临时存放到wav格式音频,主要是用模块为speech_recognition。

2. 调用百度AipSpeech模块(百度官网申请,有免费试用项目),语音转文本。

  1. import os
  2. from os import path
  3. from aip import AipSpeech
  4. import speech_recognition as sr
  5. class BaiduASR():
  6. def __init__(self, APP_ID, API_KEY, SECRET_KEY):
  7. self.APP_ID = APP_ID
  8. self.API_KEY = API_KEY
  9. self.SECRET_KEY = SECRET_KEY
  10. self.client = AipSpeech(self.APP_ID, self.API_KEY, self.SECRET_KEY)
  11. self.r = sr.Recognizer()
  12. self.path = path.dirname(__file__)
  13. # 清除上一次保留的音频文件
  14. self.del_asr()
  15. # print(self.path)
  16. # 定义从mic录制音频方法
  17. def _record(self, if_cmu: bool = False, rate=16000):
  18. with sr.Microphone(sample_rate=rate) as source:
  19. self.r.adjust_for_ambient_noise(source, duration=1)
  20. print("可以开始说话了")
  21. audio = self.r.listen(source=source, timeout=20, phrase_time_limit=2)
  22. file_name = self.path + "/audio/speech.wav"
  23. with open(file_name, "wb") as f:
  24. f.write(audio.get_wav_data())
  25. if if_cmu:
  26. return audio
  27. else:
  28. return self._get_file_content(file_name)
  29. # 从本地获取音频文件内容,作为后续asr识别
  30. def _get_file_content(self, file_name):
  31. with open(file_name, "rb") as f:
  32. audio_data = f.read()
  33. return audio_data
  34. def speech_to_text(self, audi_path: str = "test.wav", if_microphone: bool = True):
  35. if if_microphone:
  36. asr_result = self.client.asr(self._record(), 'wav', 16000, {'dev_pid': 1537, })
  37. else:
  38. asr_result = self.client.asr(self._get_file_content(audi_path), 'wav', 16000, {'dev_pid': 1537, })
  39. if asr_result["err_msg"] != "success.":
  40. return "语音识别错误:" + asr_result["err_msg"]
  41. else:
  42. return asr_result["result"][0]
  43. def del_asr(self):
  44. print("del asr file...")
  45. if os.path.exists(self.path + "/audio/speech.wav"):
  46. os.remove(self.path + "/audio/speech.wav")

星火大模型

同样,星火大模型,也有个人的免费版,可以去官网申请。本类input 问题的文本,out为星火的回复文本。

  1. import base64
  2. import hashlib
  3. import hmac
  4. import ssl
  5. from datetime import datetime
  6. from time import mktime
  7. from urllib.parse import urlparse, urlencode
  8. from wsgiref.handlers import format_date_time
  9. from com.wsm.xunfei.ws_client.ws_client import WsClient
  10. class HuoXingModel:
  11. def __init__(self, app_id, api_secret, api_key, url, domain):
  12. self.app_id = app_id
  13. self.api_secret = api_secret
  14. self.api_key = api_key
  15. self.url = url
  16. self.domain = domain
  17. self.text = []
  18. self.answer = ""
  19. # 接口鉴权,返回最终url
  20. def getAuthApi(self):
  21. print("接口鉴权")
  22. # 1. 获取host,从url中截取
  23. host = urlparse(self.url).netloc
  24. path = urlparse(self.url).path
  25. # 2. 获取data,生成RFC1123格式的时间戳
  26. cur_time = datetime.now()
  27. date = format_date_time(mktime(cur_time.timetuple()))
  28. # 3. 拼接字符串
  29. tmp = "host: " + host + "\n"
  30. tmp += "date: " + date + "\n"
  31. tmp += "GET " + path + " HTTP/1.1"
  32. """上方拼接生成的tmp字符串如下
  33. host: spark-api.xf-yun.com
  34. date: Fri, 05 May 2023 10:43:39 GMT
  35. GET /v1.1/chat HTTP/1.1
  36. """
  37. print("拼接后的字符串为%s" % tmp)
  38. # 4.对上述3中的tmp进行签名(hmac-sha256算法+api_secret)
  39. tmp_sha = hmac.new(self.api_secret.encode('utf-8'), tmp.encode('utf-8'), digestmod=hashlib.sha256).digest()
  40. # 5. 将上方的tmp_sha进行base64编码生成signature
  41. signature = base64.b64encode(tmp_sha).decode(encoding='utf-8')
  42. # 6.利用上面生成的signature,拼接下方的字符串生成authorization_origin
  43. authorization_origin = f'api_key="{self.api_key}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature}"'
  44. # 7.最后再将上方的authorization_origin进行base64编码,生成最终的authorization
  45. authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
  46. # 8. 将鉴权参数组合成最终的键值对,并urlencode生成最终的握手url。开发者可先根据上面的步骤一步步进行参数校验,确保生成的参数无误
  47. v = {
  48. "authorization": authorization, # 上方鉴权生成的authorization
  49. "date": date, # 步骤1生成的date
  50. "host": host # 请求的主机名,根据具体接口替换
  51. }
  52. url_auth = self.url + '?' + urlencode(v)
  53. """生成的最终url如下
  54. wss://spark-api.xf-yun.com/v1.1/chat?authorization=YXBpX2tleT0iYWRkZDIyNzJiNmQ4YjdjOGFiZGQ3OTUzMTQyMGNhM2IiLCBhbGdvcml0aG09ImhtYWMtc2hhMjU2IiwgaGVhZGVycz0iaG9zdCBkYXRlIHJlcXVlc3QtbGluZSIsIHNpZ25hdHVyZT0iejVnSGR1M3B4VlY0QURNeWs0Njd3T1dEUTlxNkJRelIzbmZNVGpjL0RhUT0i&date=Fri%2C+05+May+2023+10%3A43%3A39+GMT&host=spark-api.xf-yun.com
  55. """
  56. print("鉴权后的url:%s" % url_auth)
  57. return url_auth
  58. # 定义问题输入的内容长度小于8000字符
  59. def checkQuestionLen(self, text):
  60. while (self.getContentlength(text) > 8000):
  61. del text[0]
  62. return text
  63. # 获取content字符长度
  64. def getContentlength(self, text):
  65. length = 0
  66. for content in text:
  67. temp = content["content"]
  68. leng = len(temp)
  69. length += leng
  70. return length
  71. # 组合请求-payload-message-text中的内容
  72. def getText(self, role, content):
  73. #print("1:%s"%self.text)
  74. jsoncon = {}
  75. jsoncon["role"] = role
  76. jsoncon["content"] = content
  77. self.text.append(jsoncon)
  78. #print("2:%s" % self.text)
  79. return self.text
  80. def gen_params(self, question):
  81. """
  82. 通过appid和用户的提问来生成请参数
  83. """
  84. data = {
  85. "header": {
  86. "app_id": self.app_id,
  87. "uid": "1234"
  88. },
  89. "parameter": {
  90. "chat": {
  91. "domain": self.domain,
  92. "random_threshold": 0.5,
  93. "max_tokens": 2048,
  94. "auditing": "default"
  95. }
  96. },
  97. "payload": {
  98. "message": {
  99. "text": question
  100. }
  101. }
  102. }
  103. return data
  104. def start(self, question_text):
  105. ws_client = WsClient(url=self.getAuthApi(), question=question_text)
  106. ws_client.start()
  107. self.answer = ws_client.answer

websocket 客户端类

星火模型是通过websocket建立连接的,需要注意的是,星火每次回复后,会断开websocket连接。如需再次问答,需要重新建立ws连接。

  1. import json
  2. import ssl
  3. import threading
  4. import websocket
  5. class WsClient:
  6. def __init__(self, url, question):
  7. self.send_msg = question
  8. self.url = url
  9. self.ws = None
  10. self.answer = ""
  11. def on_open(self, ws):
  12. print("onOpen")
  13. threading.Thread(target=self.run, args=()).start()
  14. def on_message(self, ws, message):
  15. #print("onMessage:%s" % message)
  16. data = json.loads(message)
  17. code = data['header']['code']
  18. if code != 0:
  19. print(f'请求错误: {code}, {data}')
  20. self.ws.close()
  21. else:
  22. choices = data["payload"]["choices"]
  23. status = choices["status"]
  24. content = choices["text"][0]["content"]
  25. self.answer += content
  26. # print(1)
  27. # status 0:首结果,1中间结果,2最后一个结果
  28. if status == 2:
  29. print("星火:%s" % self.answer)
  30. ws.close()
  31. def on_close(self, ws, one, two):
  32. print("onClose...")
  33. def on_error(self, ws, error):
  34. print("onError: %s" % error)
  35. def run(self):
  36. print("thread running...")
  37. data = json.dumps(self.send_msg)
  38. self.ws.send(data)
  39. def start(self):
  40. websocket.enableTrace(False)
  41. self.ws = websocket.WebSocketApp(url=self.url,
  42. on_error=self.on_error,
  43. on_open=self.on_open,
  44. on_close=self.on_close,
  45. on_message=self.on_message)
  46. self.ws.run_forever(ping_interval=60, ping_timeout=5, sslopt={"cert_reqs": ssl.CERT_NONE})
  47. if __name__ == "__main__":
  48. print("test")
'
运行

百度TTS类

星火大模型返回到文本,再次调用百度AipSpeech模块生成tts音频

  1. import os.path
  2. from os import path
  3. from aip import AipSpeech
  4. from playsound import playsound
  5. class BaiDuTTS():
  6. def __init__(self, APP_ID, API_KEY, SECRET_KEY):
  7. self.APP_ID = APP_ID
  8. self.API_KEY = API_KEY
  9. self.SECRET_KEY = SECRET_KEY
  10. self.client = AipSpeech(self.APP_ID, self.API_KEY, self.SECRET_KEY)
  11. # 存放音频目录
  12. self.path = path.dirname(__file__)
  13. # 清空上一次保存的音频文件
  14. self.del_tts()
  15. def text_to_speech_baidu_and_play(self, text):
  16. result = self.client.synthesis(text, 'zh', 1, {
  17. 'vol': 5,
  18. }) # 得到tts的二进制文件
  19. if not isinstance(result, dict):
  20. print("tts合成成功")
  21. with open(self.path + '/audio/audio_tts.wav', 'wb') as f:
  22. f.write(result)
  23. else:
  24. print('tts 合成失败')
  25. # 播放tts音频
  26. playsound(self.path + "/audio/audio_tts.wav")
  27. def del_tts(self):
  28. print("del...")
  29. if os.path.exists(self.path + "/audio/audio_tts.wav"):
  30. os.remove(self.path + "/audio/audio_tts.wav")
  31. if __name__ == "__main__":
  32. d = path.dirname(__file__)
  33. print(d)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/852078
推荐阅读
相关标签
  

闽ICP备14008679号