赞
踩
没注册的话可以在官网注册一下,新人注册的话语音识别这块有赠送的使用次数
cffi==1.12.3
gevent==1.4.0
greenlet==0.4.15
pycparser==2.19
six==1.12.0
websocket==0.2.1
websocket-client==0.56.0
wave
pyaudio
jieba
windows 直接 pip install xxx
或 pip3 isntall xxx
linux 的话 自行下载pip 或 有python 3.6以及之后的配置一下即可.
因为用到了jieba
所以写一个关键词的类方便使用,可把关键词读取或写入json文件。
import os import json from typing import Dict class KeyWord(): """ keyval是一个数组 """ def __init__(self,keyval=[],JSONfile=None,W_JSONfile=None): self.keyval = keyval self.JSONfile = JSONfile self.W_JSONfile = W_JSONfile def write(self): dict_keyval ={"keyval":self.keyval} json_dict = json.dumps(dict_keyval,ensure_ascii=False) filePath = os.getcwd() + "/python/keywords.json" if self.W_JSONfile is not None: filePath = self.W_JSONfile with open(filePath,'w') as fp: fp.write(json_dict) def read(self): readF = os.getcwd() + "/python/keywords.json" if self.JSONfile is not None: readF = self.JSONfile with open(readF,'r') as fp: readstr = json.load(fp) print("readstr = >",readstr,"\ntype =>",type(readstr)) list_str = list(readstr['keyval']) return list_str if __name__ == "__main__": keyvals = ['碘伏','口罩','999感冒灵'] jsonstr = KeyWord(keyval=keyvals) jsonstr.write() keys = jsonstr.read() # print("keys =>",keys) for key in keys: print(key)
因为语音识别吗,得有语音文件哈,所以写了这个类方便录音和格式转化
#!/bin/python3 # 标识引用的python版本 import pyaudio import wave import sys import os import numpy as np """ 首先集成一下录音功能和格式转换功能 """ class Record(): """ 录音的类 CHUNK = 1024 FORMAT = pyaudio.paInt16 CHANNELS = 1 声道 RATE = 16000 频率 RECORD_SECONDS = 5 录音时间 单位=> s WAVE_OUTPUT_FILENAME = os.getcwd() + "/python/output1.wav" 录音文件 """ def __init__(self,WAVE_OUTPUT_FILENAME,CHUNK=1024, FORMAT=pyaudio.paInt16,CHANNELS=1,RECORD_SECONDS=5, Input=True,RATE=16000,PCMName="out.pcm",DataType=np.int16): self.CHUNK = CHUNK self.FORMAT = FORMAT self.CHANNELS = CHANNELS self.RECORD_SECONDS = RECORD_SECONDS self.WAVE_OUTPUT_FILENAME = WAVE_OUTPUT_FILENAME self.Input = Input self.RATE = RATE self.PCMName = PCMName self.DataType = DataType def recording(self): """ 这句代码 会屏蔽一些不必要的报错 os.close(sys.stderr.fileno()) """ #隐藏一些报错,这些不影响程序的运行 os.close(sys.stderr.fileno()) print("开始录音") p = pyaudio.PyAudio() stream = p.open(format=self.FORMAT, channels=self.CHANNELS, rate=self.RATE, input=self.Input,#默认为True frames_per_buffer=self.CHUNK) frames = [] for i in range(0, int(self.RATE / self.CHUNK * self.RECORD_SECONDS)): data = stream.read(self.CHUNK) frames.append(data) print("done") # 关闭流 stream.stop_stream() stream.close() p.terminate() wf = wave.open(self.WAVE_OUTPUT_FILENAME, 'wb') wf.setnchannels(self.CHANNELS) wf.setsampwidth(p.get_sample_size(self.FORMAT)) wf.setframerate(self.RATE) wf.writeframes(b''.join(frames)) wf.close() def wav2pcm(self): """ 音频文件wav格式 转 pcm格式 """ f = open(self.WAVE_OUTPUT_FILENAME, "rb") f.seek(0) f.read(1024) data = np.fromfile(f, dtype=self.DataType) # 获取 分割后的 数组 filePath = str(self.WAVE_OUTPUT_FILENAME).split('/') path = '' # 拼接路径 取出最后一位 [0,-1) for item in filePath[:-1]: path += item +'/' path += self.PCMName # print("PCM Path =>",path) data.tofile(path) print("结束") # 可以返回一个元组; 也可以把它封成数组返回 return (self.WAVE_OUTPUT_FILENAME,path) def run(self): self.recording() wavpath,path = self.wav2pcm() # print("wave =>",wavpath,"\n","path =>",path) # 这个就不写入那个类里了, 这样方便调用 不需要再初始化类了 # 可直接copy到使用的类中或者文件里 def pcm2wav(pcmfile,wavfile,channels=1,rate=16000): with open(pcmfile,'rb') as fp: pcmdata = fp.read() with wave.open(wavfile, 'wb') as wav: wav.setnchannels(channels) wav.setsampwidth(16 // 8) wav.setframerate(rate) # 写入 wav.writeframes(pcmdata) # 测试 if __name__ == "__main__": wavepath = os.getcwd() + "/python/output1.wav" dev = Record(wavepath) # dev.run() pcmfile = os.getcwd() + '/python/demo.pcm' wavfile = os.getcwd() + '/python/demo.wav' pcm2wav(pcmfile,wavfile)
#!/bin/python3 """ linux + Python3.7 """ import datetime import hashlib import base64 import hmac import json from urllib.parse import urlencode import time import ssl from wsgiref.handlers import format_date_time from datetime import datetime from time import mktime import _thread as thread import os import websocket import jieba as JB from KeyWord import KeyWord STATUS_FIRST_FRAME = 0 # 第一帧的标识 STATUS_CONTINUE_FRAME = 1 # 中间帧标识 STATUS_LAST_FRAME = 2 # 最后一帧的标识 """ 为了方便以后复用,单独集成 科大讯飞的 在线语音识别功能 """ class OnlineASR(): """ Automatic Speech Recognition (ASR) 语音识别 对官网实例的再封装; 参数可到官网去获取,这里我写成默认的了,可自行修改 """ def __init__(self, AudioFile, APPID='', APISecret='', APIKey=''): """ AudioFile = r''+os.getcwd()+'/python/output1.wav' """ self.JustTime = datetime.now() self.AudioFile = AudioFile self.APPID = APPID self.APISecret = APISecret self.APIKey = APIKey # 公共参数(common) self.CommonArgs = {"app_id": self.APPID} # 业务参数(business),更多个性化参数可在官网查看 self.BusinessArgs = {"domain": "iat", "language": "zh_cn", "accent": "mandarin", "vinfo": 1, "vad_eos": 10000} #全局参数 self.RESULT = [] # jieba 初始化 词库 # 向分词词典增加新词 KW = KeyWord() keyvals = KW.read() for key in keyvals: JB.add_word(key) # 运行类 def run(self): wsUrl = self.create_url() # print(wsUrl) websocket.enableTrace(False) ws = websocket.WebSocketApp(wsUrl, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close) ws.on_open =self.on_open ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}) CurrTime = datetime.now() self.HandleEndStr(self.RESULT) # print("lists =>",lists) print("运行时间 => ", CurrTime-self.JustTime, "s") return self.RESULT # 生成url def create_url(self): url = 'wss://ws-api.xfyun.cn/v2/iat' # 生成RFC1123格式的时间戳 now = datetime.now() date = format_date_time(mktime(now.timetuple())) # 拼接字符串 signature_origin = "host: " + "ws-api.xfyun.cn" + "\n" signature_origin += "date: " + date + "\n" signature_origin += "GET " + "/v2/iat " + "HTTP/1.1" # 进行hmac-sha256进行加密 signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'), digestmod=hashlib.sha256).digest() signature_sha = base64.b64encode( signature_sha).decode(encoding='utf-8') authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"" % ( self.APIKey, "hmac-sha256", "host date request-line", signature_sha) authorization = base64.b64encode( authorization_origin.encode('utf-8')).decode(encoding='utf-8') # 将请求的鉴权参数组合为字典 v = { "authorization": authorization, "date": date, "host": "ws-api.xfyun.cn" } # 拼接鉴权参数,生成url url = url + '?' + urlencode(v) # print("date: ",date) # print("v: ",v) # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致 # print('websocket url :', url) return url # 处理返回的字符串,把其加入数组中 def HandleMsg(self,msg): # 编码为json字符串 ensure_ascii 不是Ascii编码 jsonstr = json.dumps(msg,ensure_ascii=False) dictstr = json.loads(jsonstr) res = '' for item in dictstr: # print("item => ",item) parsestr = json.loads(json.dumps(item['cw'],ensure_ascii=False))[0] end = json.loads(json.dumps(parsestr,ensure_ascii=False)) res += end['w'] self.RESULT.append(res) # print("res =>",res) # 处理最终的字符串; 使用jiaba def HandleEndStr(self,msg): # 因为只是短短的一句话,故去掉多余的字符 split_char = ''.join(msg) chars = ['.','。',',',',',';',';',':','[',']','{','}', '*','%','$','¥','@','^','……','-','=','+','!','~','`','<','>','《','》','?'] for item in chars: split_char = str(split_char).replace(item,'') # 处理完毕 # 使用 jieba 采用精确模式 : 把文本精确的切分开,不存在冗余单词 ,返回列表 msglist = JB.lcut(split_char) self.RESULT = msglist return msglist # 收到websocket消息的处理 def on_message(self,ws, message): try: code = json.loads(message)["code"] sid = json.loads(message)["sid"] if code != 0: errMsg = json.loads(message)["message"] print("sid:%s call error:%s code is:%s" % (sid, errMsg, code)) else: data = json.loads(message)["data"]["result"]["ws"] # print(json.loads(message)) result = "" for i in data: for w in i["cw"]: result += w["w"] print("sid:%s call success!,data is:%s" % (sid, json.dumps(data, ensure_ascii=False))) # json.dumps(data, ensure_ascii=False) return self.HandleMsg(data) except Exception as e: print("receive msg,but parse exception:", e) # 收到websocket错误的处理 def on_error(self,ws, error): print("### error:", error) # 收到websocket关闭的处理 def on_close(self,ws): print("### closed ###") # 收到websocket连接建立的处理 def on_open(self,ws): def run(*args): frameSize = 8000 # 每一帧的音频大小 intervel = 0.04 # 发送音频间隔(单位:s) status = STATUS_FIRST_FRAME # 音频的状态信息,标识音频是第一帧,还是中间帧、最后一帧 with open(self.AudioFile, "rb") as fp: while True: buf = fp.read(frameSize) # 文件结束 if not buf: status = STATUS_LAST_FRAME # 第一帧处理 # 发送第一帧音频,带business 参数 # appid 必须带上,只需第一帧发送 if status == STATUS_FIRST_FRAME: d = {"common": self.CommonArgs, "business": self.BusinessArgs, "data": {"status": 0, "format": "audio/L16;rate=16000", "audio": str(base64.b64encode(buf), 'utf-8'), "encoding": "raw"}} d = json.dumps(d) ws.send(d) status = STATUS_CONTINUE_FRAME # 中间帧处理 elif status == STATUS_CONTINUE_FRAME: d = {"data": {"status": 1, "format": "audio/L16;rate=16000", "audio": str(base64.b64encode(buf), 'utf-8'), "encoding": "raw"}} ws.send(json.dumps(d)) # 最后一帧处理 elif status == STATUS_LAST_FRAME: d = {"data": {"status": 2, "format": "audio/L16;rate=16000", "audio": str(base64.b64encode(buf), 'utf-8'), "encoding": "raw"}} ws.send(json.dumps(d)) time.sleep(1) break # 模拟音频采样间隔 time.sleep(intervel) ws.close() thread.start_new_thread(run, ()) #测试 if __name__ == "__main__": AudioFile = r''+os.getcwd()+'/python/output1.wav' asr = OnlineASR(AudioFile=AudioFile, APPID='', APISecret='', APIKey='' ) res = asr.run() print("test =>",res)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。