赞
踩
由于公司最初项目立项时需要使用到微软的实时语音识别,所以研究了下微软的官方sdk和api,前端和java的交互相对简单,前端页面通过HZRcorder采集实时音频流,处理成二进制,后端netty+websocket接收消息,难点是微软的翻译,微软官方给了几种翻译类型,一种是一段音频片段,一种是硬件设备直接获取流做翻译,我们的服务需要部署服务器所以没有办法用第二种,第一种情况是微软的SpeechRecognizer对象可以接收一个特殊的流对象PullAudioStreamCallback作为数据源,如果传入了这个对象,SpeechRecognizer会主动从该流对象里读取数据。但是SpeechRecognizer会在流中读取到0个字节后停止识别,在我们的场景中默认的流类型无法满足需求,当没有数据读取到时它们无法block住,PullAudioStreamCallback期望的效果是只有当明确流结束时读取流的Read()方法才返回0。因此需要定义我们自己的音频流对象;
- package ********;
-
- import com.microsoft.cognitiveservices.speech.audio.PullAudioInputStreamCallback;
- import lombok.extern.slf4j.Slf4j;
- import java.io.IOException;
- import java.io.InputStream;
-
-
- @Slf4j
- public class VoiceAudioStream extends PullAudioInputStreamCallback {
-
- private EchoStream _dataStream = new EchoStream();
- private ManualResetEvent _waitForEmptyDataStream = null;
- private InputStream stream;
-
- @Override
- public int read(byte[] dataBuffer) //S2T服务从PullAudioInputStream中读取数据, 读到0个字节并不会关闭流
- {
- long ret = 0;
- if (_waitForEmptyDataStream != null && !_dataStream.DataAvailable())
- {//用户主动close时可以关闭流
- _waitForEmptyDataStream.Set();
- return 0;
- }
- try {
- if(this.stream != null){
- //log.info("1前:{}",dataBuffer);
- ret = this.stream.read(dataBuffer,0, dataBuffer.length);
- //log.info("1后:{}",dataBuffer);
- if((int)ret < 1){
- // log.info("2");
- this.stream = _dataStream.Read(dataBuffer, 0, dataBuffer.length);
- ret = this.stream.read(dataBuffer,0, dataBuffer.length);
- }
- }else{
- //log.info("3");
- this.stream = _dataStream.Read(dataBuffer, 0, dataBuffer.length);
- ret = this.stream.read(dataBuffer,0, dataBuffer.length);
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- return (int)Math.max(0, ret);
- }
-
- public void write(byte[] buffer, int offset, int count) //Client向PullAudioInputStream写入数据
- {
- _dataStream.write(buffer, offset, count);
- }
-
- @Override
- public void close(){
- if (_dataStream.DataAvailable())
- {
- log.info("进到close里面了");
- _waitForEmptyDataStream = new ManualResetEvent(false); //通过ManualResetEvent强制流的使用者必须调用close来手动关闭流
- try {
- _waitForEmptyDataStream.WaitOne();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- log.info("等待了吗?");
- _waitForEmptyDataStream.close();
- try {
- _dataStream.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- //_dataStream.close();
- try {
- this.stream.close();
- } catch (IOException ex) {
- // ignored
- }
- }
-
-
-
-
-
- }
- package *****;
-
- import lombok.extern.slf4j.Slf4j;
-
- import java.io.ByteArrayInputStream;
- import java.io.IOException;
- import java.io.InputStream;
- import java.util.concurrent.ConcurrentLinkedDeque;
-
- @Slf4j
- public class EchoStream extends InputStream {
-
-
- private ManualResetEvent _DataReady = new ManualResetEvent(false);
- private ConcurrentLinkedDeque<byte[]> _Buffers = new ConcurrentLinkedDeque<>();
-
-
- public Boolean DataAvailable(){
- return !_Buffers.isEmpty();
- }
-
- public void write(byte[] buffer, int offset, int count)
- {
- // log.info("开始write,EchoStream");
- _Buffers.addLast(buffer);
- if(_Buffers.size()>1){
- _DataReady.Set();
- }
- }
-
- @Override
- public int read() throws IOException {
- return 0;
- }
-
- public byte[] getLBuffer(){
- if(_Buffers.size() != 0){
- return _Buffers.pollFirst();
- }
- return new byte[0];
- }
-
- public InputStream Read(byte[] buffer, int offset, int count)
- {
- //log.info("开始read,EchoStream");
- try {
- if(_Buffers.size() == 0){
- _DataReady.WaitOne();
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- //log.info("开始read,lBuffer前:{}",_Buffers.size());
- byte[] lBuffer = _Buffers.pollFirst();
- //log.info("开始read,lBuffer后:{}",lBuffer);
- if (lBuffer == null || lBuffer.length == 0)
- {
- // log.info("读取到的lBuffer为空,进入等待");
- _DataReady.Reset();
- //return -1;
- }
-
- if (!DataAvailable()) {
- // log.info("此时dataready为空");
- _DataReady.Reset();
- }
- //buffer = Arrays.copyOf(lBuffer, lBuffer.length);
- buffer = lBuffer.clone();
- //log.info("buffer:{},lBuffer.length:{}",buffer,lBuffer.length);
- //buffer = lBuffer;
- //return buffer.length;
- return new ByteArrayInputStream(buffer);
- }
-
- @Override
- public void close() throws IOException {
- super.close();
- }
- }
前端建立连接的时候创建微软翻译的连接;
- public static Boolean buildConnect(SessionEntity sessionEntity)
- {
- try{
- stopTranslationWithAudioStreamSemaphore = new Semaphore(0);
- //初始化语音翻译配置实例
- //SpeechTranslationConfig speechTranslationConfig = getInstance(speechKey ,speechRegion);
- //初始化语言
- speechTranslationConfig.setSpeechRecognitionLanguage(sessionEntity.getFromLanguage());
- speechTranslationConfig.addTargetLanguage(sessionEntity.getToLanguage());
-
- //初始化语音流
- VoiceAudioStream audioStream = null;
- if(sessionEntity.getAudioStream() != null && !sessionEntity.getAudioStream().equals("")){
- log.info("进来了?");
- audioStream = sessionEntity.getAudioStream();
- }else{
- audioStream = new VoiceAudioStream();
- sessionEntity.setAudioStream(audioStream);
- sessionEntity.setVoiceData(new ArrayList<>());
- }
-
- AudioStreamFormat audioFormat = AudioStreamFormat.getWaveFormatPCM(Settings.getEmbeddedSpeechSamplesPerSecond(), Settings.getEmbeddedSpeechBitsPerSample(), Settings.getEmbeddedSpeechChannels());
- PullAudioInputStream pullStream = PullAudioInputStream.createPullStream(audioStream, audioFormat);
- AudioConfig audioConfig = AudioConfig.fromStreamInput(pullStream);
- TranslationRecognizer translationRecognizer = new TranslationRecognizer(speechTranslationConfig, audioConfig);
-
- sessionEntity.setTranslationRecognizer(translationRecognizer);
-
- //绑定监听方法
- translationRecognizer.recognizing.addEventListener((s, e) -> {
- log.info("tempResult:{}", e.getResult());
- //这里是中间结果,每出现一次就推送一次
- AsrResult asrResult = Constant.asrResultMap.get(sessionEntity.getSessionID());
- Integer asrIndex = 0;
- if(asrResult == null){ //第一次进入
- //log.info("第一次进入");
- //获取本房间最新的文本下标
- asrIndex = Constant.roomIndex.get(sessionEntity.getRoomId());
- if(asrIndex == null){
- asrIndex = 0;
- }
- Constant.roomIndex.put(sessionEntity.getRoomId(),asrIndex+1);
- asrResult = new AsrResult();
- asrResult.setStartTime(DateUtils.now());
- asrResult.setEndTime(DateUtils.now());
- asrResult.setAsrIndex(asrIndex);
- }else{
- //log.info("第二次进入");
- //判断本次结果与上次结果之间时间差是否超过一定值
- Long thisTime = System.currentTimeMillis();
- Long lastTime = DateUtils.gettimeStemp(asrResult.getEndTime(),"yyyy-MM-dd HH:mm:ss");
- if((thisTime - lastTime) > 2000L) {
- //log.info("两次时间过长");
- //index新增
- asrIndex = Constant.roomIndex.get(sessionEntity.getRoomId());
- asrResult.setAsrIndex(asrIndex);
- Constant.roomIndex.put(sessionEntity.getRoomId(),asrIndex+1);
- asrResult.setStartTime(DateUtils.now());
- }
-
- asrResult.setEndTime(DateUtils.now());
- }
-
- asrResult.setAsrText(e.getResult().getText());
- asrResult.setTempText(e.getResult().getText());
- asrResult.setUserId(sessionEntity.getExtNum());
- asrResult.setTranslation(e.getResult().getTranslations());
- asrResult.setTempType(1);
- Constant.asrResultMap.put(sessionEntity.getSessionID(),asrResult);
- WebSocketServerHandler.sendReplyToRoom(sessionEntity.getRoomId(), sessionEntity.getSessionID(), asrResult);
- });
-
- translationRecognizer.recognized.addEventListener((s, e) -> {
- log.info("RECOGNIZEDResult:{}", e.getResult());
- if (e.getResult().getReason() == ResultReason.RecognizedSpeech) {
- //这里是中间结果,每出现一次就推送一次
- if(!e.getResult().getText().equals("")){
- AsrResult asrResult = new AsrResult();
- Integer asrIndex = Constant.asrIndexMap.get(sessionEntity.getSessionID());
- asrResult.setAsrIndex(asrIndex);
- asrResult.setAsrText(e.getResult().getText());
- asrResult.setEndTime(DateUtils.now());
- asrResult.setTempText(e.getResult().getText());
- asrResult.setUserId(sessionEntity.getExtNum());
- asrResult.setTranslation(e.getResult().getTranslations());
- asrResult.setTempType(0);
- Constant.asrIndexMap.remove(sessionEntity.getSessionID());
- WebSocketServerHandler.sendReplyToRoom(sessionEntity.getRoomId(), sessionEntity.getSessionID(), asrResult);
- }
- }
- else if (e.getResult().getReason() == ResultReason.NoMatch) {
- log.info("NOMATCH: Speech could not be recognized.");
- }
- });
-
- translationRecognizer.canceled.addEventListener((s, e) -> {
- if (e.getReason() == CancellationReason.Error) {
- log.info("CANCELED: ErrorCode=" + e.getErrorCode());
- log.info("CANCELED: ErrorDetails=" + e.getErrorDetails());
- }
- Constant.asrResultMap.remove(sessionEntity.getSessionID());
- stopTranslationWithAudioStreamSemaphore.release();
- });
-
- translationRecognizer.sessionStopped.addEventListener((s, e) -> {
- log.info("\n Session stopped event.");
- Constant.asrResultMap.remove(sessionEntity.getSessionID());
- stopTranslationWithAudioStreamSemaphore.release();
- });
-
- try {
- log.info("开始异步识别");
- translationRecognizer.startContinuousRecognitionAsync().get();
- return true;
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ExecutionException e) {
- e.printStackTrace();
- }
- }catch (Exception e){
- log.info("微软buildConnect异常:{}",e.getStackTrace());
- }
- return false;
- }
接收语音流直接传入
- /**
- * 接收数据,开始识别
- * */
- public static void ReceiveAudio(SessionEntity sessionEntity,byte[] audioChunk)
- {
- if(audioChunk.length>0){
- sessionEntity.getAudioStream().write(audioChunk,0,audioChunk.length);
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。