赞
踩
简 介: 利用计算机声卡录制了电话机在没有连线情况下发送DTMF信号,利用Python编程,截取了所有拨码信号,利用FFT分析了其中的DTMF成分,并进行译码。通过实验证明,控制板在没有连线情况下,总共发送了589次电话号码,所有的号码都是正确。但是在发送过程,可以看到对于某些信号会出现间隔。比如下面就是其中一次拨码,在拨码信号中间存在短暂的简短。
关键词
: DTMF,信号频谱
DTMF 指 Dual-tune Multi-Frequency,用于早期电话拨码信号发送。几乎在人们开始使用电报、电话系统传递信息,人们就需要发现一种方式可以机械重复可靠的与系统交互。现在信号系统可以完成这些任务,包括接线、拨号以及与电话系统进行交互。
最早使用的电话信号系统是脉冲拨号,下面这种转盘式拨号系统利用内部机械系统中断电话连接,产生电信号用于传递电话拨号信息。
▲ 图1 带有拨码转盘的电话机
第一个双音频电话机是在 1963年11月份由贝尔实验室首次引入使用 ,并迅速取代了旋转拨号电话机。现在所使用的标准为 ITU Q.23推荐标准 .
▲ DTMF信号标准
在电话双音频拨号声音中的干扰信号中对于旧式拨码电话改造项目中,出现的电话拨号出现的偶发错误。现在通过对利用计算机声卡采集到电话拨号声音进行分析,来判断出现的这种偶发错误究竟来自于控制器系统内部,还是来自于和外部电话机连接方面的错误。
在今天(2022-01-21)录制了大约3个小时的拨号声音,是在没有连接电话线的情况下录制的。这样可以判断内部电话机系统本身在发送信号过程中是否会出现发送些信号故障问题。
录制软件
:Audacity
录制频谱
:44.1kHz
通道数
:2(两个通道一样的模拟信号)
时间
:大约为3个小时
信号发送的电话号码为: 18612110728
录制的信号上载AI Studio,建立了音频数据库。这个数据集合包括了六段利用计算机声卡录制的电话双音频信号。
▲ 图1.1.1 双音频电话报号声音
包含有信号:
import sys,os,math,time
import matplotlib.pyplot as plt
from numpy import *
from scipy.io import wavfile
filename = '/home/aistudio/data/data126234/wav2.wav'
sample_rate,sig = wavfile.read(filename)
print("sample_rate: {}\n".format(sample_rate), "shape(sig): {}\n".format(shape(sig)))
sample_rate: 44100
shape(sig): (45646080, 2)
sig.shape[0]/sample_rate: 1035.0585034013604
plt.clf()
plt.figure(figsize=(10,6))
plt.plot(sig[:,0])
plt.xlabel("n")
plt.ylabel("value")
plt.grid(True)
plt.tight_layout()
plt.savefig('/home/aistudio/stdout.jpg')
plt.show()
▲ 图1.2.1 所有信号的波形
▲ 图1.2.2 截取的第一个拨码信号
使用Signal Spectrogram函数显示数据的短时傅里叶变换。
▲ 数据的短视傅里叶变换
from scipy import signal f,t,Sxx = signal.spectrogram(sig[startid:endid, 0], fs=sample_rate, nperseg =2048, noverlap=2000, nfft=8192) thread = 1500000 Sxx[where(Sxx>thread)] = thread plt.clf() plt.figure(figsize=(10,7)) plt.pcolormesh(t, f[50:350], Sxx[50:350,:]) plt.xlabel("Time(S)") plt.ylabel("Frequency(Hz)") plt.grid(True) plt.tight_layout() plt.savefig('/home/aistudio/stdout.jpg') plt.show()
为了便于对数据进行判断,需要优先截取下有效数据,然后再判断其中的频谱是否正确。
下图显示了相邻两个拨码信号之间的波形关系。
▲ 相临两个拨码信号
通过上面波形可以看到,由于进行电话的摘挂机,产生的两个高脉冲信号幅度超过30000, 因此根据这个可以将信号进行有效的分割和定位。
如下 是一个尖脉冲波形放大后的图像。
▲ 图1.3.2 一个尖脉冲的波形
可以根据该脉冲波形的前面的上升沿的起始位置来确定该脉冲的位置。
dataseg = list(where(abs(sig[:,0])>0x7f00)) print(dataseg, shape(dataseg)) print(dataseg[:100]) posid = dataseg[0] posdelta = [s1-s2 for s1,s2 in zip(posid[1:], posid[:-1])] print(posdelta[:1000]) plt.clf() plt.figure(figsize=(10,6)) plt.plot(posdelta) plt.xlabel("n") plt.ylabel("deltapos") plt.grid(True) plt.tight_layout() plt.savefig('/home/aistudio/stdout.jpg') plt.show()
[array([ 355749, 355750, 355751, ..., 45120329, 45120330, 45120331])]
(1, 25351)
[array([ 355749, 355750, 355751, ..., 45120329, 45120330, 45120331])]
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 786481, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 4, 1, 1, 1, 1, 1, 785271, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
▲ 图1.3.3 所有上升沿对应的波形
这些上升脉冲它们之间的间隔为:
pd = array(posdelta)
possplit = pd[where(pd > 1000)]
print("possplit: {}\n".format(possplit), "len(possplit): {}\n".format(len(possplit)), "mean(possplit): {}\n".format(mean(possplit)), "mean(possplit)/sample_rate: {}\n".format(mean(possplit)/sample_rate))
possplit: [786481 785271 786005 785485 785631 785996 785302 786970 785405 784943
786674 784894 785761 785753 786155 784973 786987 783855 784476 783570
782555 784458 782916 785667 789454 784251 783545 783764 784366 784989
784486 787431 784040 783745 785528 783743 784939 789109 785727 787110
783177 783532 786536 783833 784088 783743 783043 783922 782797 784285
784523 784822 782861 784441 783300 783629 784274]
len(possplit): 57
mean(possplit): 784898.5263157894
mean(possplit)/sample_rate: 17.798152524167563
从上面结果来看,在wav2file中总共有57个拨码片段,平均它们之间的时间间隔为: T s = 17.8 s T_s = 17.8s Ts=17.8s
截取数据是出指吧拨号11个脉冲的音频数据截取出来。
▲ 图1.3.4 信号波形
在拨码信号前对应的时间间隔大于400000。
wavefilename = '/home/aistudio/data/data126234/wav2.wav' sample_rate,sig = wavfile.read(wavefilename) dataseg = list(where(sig[:,0] > 20000)) posid = dataseg[0] posdelta = [s1-s2 for s1,s2 in zip(posid[1:], posid[:-1])] plt.clf() plt.figure(figsize=(10,6)) plt.plot(posdelta[:9000], label='POSDELTA') plt.xlabel("n") plt.ylabel("deltapos") plt.grid(True) plt.tight_layout() plt.savefig('/home/aistudio/stdout.jpg') plt.show()
▲ 图1.3.5 前两小段数据
下面给出电话信号起始位置。
beginid = array(list(zip(posid[1:],posid[:-1])))[where(posdelta > 400000)]
print("beginid.T: {}\n".format(beginid.T), "shape(beginid): {}\n".format(shape(beginid)))
beginid.T: [[ 836306 1621745 2408903 3193945 3980233 4765672 5551524 6339190 7124987 7912553 8698676 9484556 10269399 11054722 11842377 12629740 13414451 14203456 14986067 15770299 16553373 17339407 18122751 18908758 19698392 20481857 21268481 22049985 22836030 23620779 24406880 25192396 25979819 26763351 27549539 28332773 29118796 29908430 30692265 31480796 32266869 33047810 33835365 34619228 35405796 36189780 36973047 37755785 38542178 39324767 40108260 40895783 41678995 42463568 43246945 44032197 44816043 45602380] [ 356796 1143719 1929410 2715871 3501790 4287880 5074302 5860021 6647411 7433280 8218650 9005777 9791085 10577298 11363487 12150076 12935483 13722878 14507173 15292070 16076090 16859083 17643994 18427354 19213444 20003346 20788030 21572011 22356204 23141016 23926446 24711360 25499256 26283716 27067917 27853868 28638069 29423430 30212969 30999135 31786684 32570306 33354257 34141237 34925519 35710046 36494197 37277688 38062044 38845293 39630016 40414985 41200222 41983539 42768398 43552158 44336209 45120939]] shape(beginid): (58, 2)
wavetime = 2.25
wavelength = int(sample_rate * wavetime)
startid = beginid.T[0][0] - 3000
endid = startid + wavelength
plt.clf()
plt.figure(figsize=(10,6))
plt.plot(sig[startid:endid, 0])
plt.xlabel("n")
plt.ylabel("value")
plt.grid(True)
plt.tight_layout()
plt.savefig('/home/aistudio/stdout.jpg')
plt.show()
▲ 图1.3.6 截取一段数据
前面有58个起始点,但最后一段波形长度不够一个完整的拨码限号。下面将前面57组拨码信号进行截取。
▲ 图 所有截取到的波形共有57个
对应短时傅里叶变换,所得到的视频联合分布。
▲ 图 截取的数据的短时傅里叶变换
gifpath = '/home/aistudio/GIF' if not os.path.isdir(gifpath): os.makedirs(gifpath) gifdim = os.listdir(gifpath) for f in gifdim: fn = os.path.join(gifpath, f) if os.path.isfile(fn): os.remove(fn) count = 0 for id in tqdm(beginid.T[0][:-1]): startid = id - 3000 endid = startid + wavelength f,t,Sxx = signal.spectrogram(sig[startid:endid, 0], fs=sample_rate, nperseg =2048, noverlap=2000, nfft=8192) thread = 1500000 Sxx[where(Sxx>thread)] = thread plt.clf() plt.figure(figsize=(10,6)) plt.pcolormesh(t, f[50:350], Sxx[50:350,:]) plt.xlabel("Time(S)") plt.ylabel("Frequency(Hz)") plt.grid(True) plt.tight_layout() savefile = os.path.join(gifpath, '%03d.jpg'%count) count += 1 plt.savefig(savefile)
wavetime = 2.2 wavelength = int(sample_rate * wavetime) startid = beginid.T[0][0] - 2200 endid = startid + wavelength segpos = [int(s) for s in linspace(startid, endid, 12, endpoint=True)] plt.clf() plt.figure(figsize=(10,16)) for i in range(11): plt.subplot(11,1,i+1) plt.plot(sig[segpos[i]:segpos[i+1], 0]) plt.xlabel('n') plt.ylabel('Value') plt.grid(True) plt.savefig('/home/aistudio/stdout.jpg') plt.show()
▲ 图1.4.1 将电话号信号分割成11等分
通过FFT,计算信号的频谱。下面显示500 ~ 1800Hz之间的频谱。
segnum = sig[segpos[0]:segpos[1], 0] fftabs = abs(fft.fft(segnum))/len(segnum) fstart = 500 fend = 1800 fmid = 1075 fstartid = int(len(segnum)*fstart/sample_rate) fendid = int(len(segnum)*fend/sample_rate) fdim = linspace(0, sample_rate, len(segnum)) plt.clf() plt.figure(figsize=(10,6)) plt.plot(fdim[fstartid:fendid], fftabs[fstartid:fendid]) plt.xlabel("Frequency") plt.ylabel("Amplitude") plt.grid(True) plt.tight_layout() plt.savefig('/home/aistudio/stdout.jpg') plt.show()
▲ 图1.4.2 信号的FFT幅度谱
分别在 (500 ~ 1075),(1075,1800)搜索峰值对应的频率。
lowfft = fftabs[fstartid:fmidid]
highfft = fftabs[fmidid:fendid]
lowid = where(lowfft==max(lowfft))[0][0]+fstartid
highid = where(highfft==max(highfft))[0][0]+fmidid
lowf = lowid * sample_rate/len(segnum)
highf = highid * sample_rate/len(segnum)
print("lowf: {}\n".format(lowf), "highf: {}\n".format(highf))
lowf: 700.0
highf: 1210.0
根据 DTMF 给出的DTMF低频、高频编码方案。根据lowf,highf判断对应的编码。
lowfdim = [697, 770, 852, 941] highfdim = [1209, 1336, 1477, 1633] deltaf = 25 def freq2code(lowf, highf): lowid = 0 highid = 0 for i,f in enumerate(lowfdim): if lowf >= f-deltaf and lowf < f+deltaf: lowid = i break for i,f in enumerate(highfdim): if highf >= f-deltaf and highf < f+deltaf: highid = i break idid = lowid + highid*4 return "147*2580369#ABCD"[idid] ''' code = freq2code(lowf, highf) print("code: {}\n".format(code)) ''' def wave2phone(wavedata, fs): segpos = [int(s) for s in linspace(0, len(wavedata), 12, endpoint=True)] fstart = 500 fend = 1800 fmid = 1075 code = '' for i in range(11): wave = wavedata[segpos[i]:segpos[i+1]] fstartid = int(len(wave)*fstart/fs) fendid = int(len(wave)*fend/fs) fmidid = int(len(wave)*fmid/fs) fftabs = abs(fft.fft(wave))/len(wave) lowfft = fftabs[fstartid:fmidid] highfft = fftabs[fmidid:fendid] lowid = where(lowfft==max(lowfft))[0][0]+fstartid highid = where(highfft==max(highfft))[0][0]+fmidid lowf = lowid * fs /len(wave) highf = highid * fs /len(wave) code = code + freq2code(lowf, highf) return code wavetime = 2.2 wavelength = int(sample_rate * wavetime) startid = beginid.T[0][0] - 2200 endid = startid + wavelength ret = wave2phone(sig[startid:endid, 0], sample_rate) print("ret: {}\n".format(ret))
解码输出如下,可以看到它是符合发送的数据。
ret: 18612110728
wavetime = 2.2
wavelength = int(sample_rate * wavetime)
for ii in beginid.T[0][:-1]:
startid = ii - 2200
endid = startid + wavelength
ret = wave2phone(sig[startid:endid, 0], sample_rate)
print("ret: {}\n".format(ret))
所获得的的57个拨码都是相同的。
ret: 18612110728
ret: 18612110728
ret: 18612110728
ret: 18612110728
.......
ret: 18612110728
ret: 18612110728
ret: 18612110728
ret: 18612110728
ret: 18612110728
根据前面给出的算法,下面对录取的拨码数据进行处理。
from headm import * # = from scipy import signal from scipy.io import wavfile from tqdm import tqdm wavefilename = '/home/aistudio/data/data126234/wav2.wav' sample_rate,sig = wavfile.read(wavefilename) dataseg = list(where(sig[:,0] > 20000)) posid = dataseg[0] beginid = array(list(zip(posid[1:],posid[:-1])))[where(posdelta > 400000)] lowfdim = [697, 770, 852, 941] highfdim = [1209, 1336, 1477, 1633] deltaf = 25 def freq2code(lowf, highf): lowid = 0 highid = 0 for i,f in enumerate(lowfdim): if lowf >= f-deltaf and lowf < f+deltaf: lowid = i break for i,f in enumerate(highfdim): if highf >= f-deltaf and highf < f+deltaf: highid = i break idid = lowid + highid*4 return "147*2580369#ABCD"[idid] def wave2phone(wavedata, fs): segpos = [int(s) for s in linspace(0, len(wavedata), 12, endpoint=True)] fstart = 500 fend = 1800 fmid = 1075 code = '' for i in range(11): wave = wavedata[segpos[i]:segpos[i+1]] fstartid = int(len(wave)*fstart/fs) fendid = int(len(wave)*fend/fs) fmidid = int(len(wave)*fmid/fs) fftabs = abs(fft.fft(wave))/len(wave) lowfft = fftabs[fstartid:fmidid] highfft = fftabs[fmidid:fendid] lowid = where(lowfft==max(lowfft))[0][0]+fstartid highid = where(highfft==max(highfft))[0][0]+fmidid lowf = lowid * fs /len(wave) highf = highid * fs /len(wave) code = code + freq2code(lowf, highf) return code wavetime = 2.2 wavelength = int(sample_rate * wavetime) error = 0 for ii in beginid.T[0][:-1]: startid = ii - 2200 endid = startid + wavelength ret = wave2phone(sig[startid:endid, 0], sample_rate) if ret != '18612110728': error += 1 printt(len(beginid.T[0])-1:, error:)
输出结果:
len(beginid.T[0])-1: 57
error: 0
len(beginid.T[0])-1: 22
error: 0
录制的DTMF-RECORD_2022-1-21.mp3,录制了10511秒的声音。
wavefilename = '/home/aistudio/data/data126234/DTFM-RECORD-2022-1-21.mp3'
sound = AudioSegment.from_file(file=wavefilename)
left = sound.split_to_mono()[0]
sample_rate = sound.frame_rate
sig = frombuffer(left._data, int16)
总共数据为463,552,000采样数据。由于数据很大,需要使用 AI Studio中 32G内存的至尊配置,才能够处理这个数据。在普通版本的配置,系统总是崩溃。
经过56.8秒的运行,处理的结果如下:
sample_rate: 44100
len(beginid.T[0])-1: 589
error: 0
总共589次电话拨码,发送的输出都正确。
利用计算机声卡录制了电话机在没有连线情况下发送DTMF信号,利用Python编程,截取了所有拨码信号,利用FFT分析了其中的DTMF成分,并进行译码。通过实验证明,控制板在没有连线情况下,总共发送了589次电话号码,所有的号码都是正确。
但是在发送过程,可以看到对于某些信号会出现间隔。比如下面就是其中一次拨码,在拨码信号中间存在短暂的简短。
▲ 图3.1 发送信号中间存在间隔
原则上,这中间断如果配判断成两个拨码信号,就会造成电话号码错误。因此需要对于这种情况进行统计判断,其中这中间段是否普遍?间断最长会有多少?
■ 相关文献链接:
● 相关图表链接:
#!/usr/local/bin/python # -*- coding: gbk -*- #============================================================ # TEST2.PY -- by Dr. ZhuoQing 2022-01-21 # # Note: #============================================================ from headm import * # = from scipy import signal from scipy.io import wavfile from tqdm import tqdm from pydub import AudioSegment #------------------------------------------------------------ #wavefilename = '/home/aistudio/data/data126234/sound1.wav' #sample_rate,sig = wavfile.read(wavefilename) wavefilename = '/home/aistudio/data/data126234/DTFM-RECORD-2022-1-21.mp3' #wavefilename = '/home/aistudio/work/sound1.mp3' sound = AudioSegment.from_file(file=wavefilename) left = sound.split_to_mono()[0] sample_rate = sound.frame_rate sig = frombuffer(left._data, int16) printt(shape(sig), sample_rate:) #------------------------------------------------------------ ''' plt.clf() plt.figure(figsize=(10,6)) plt.plot(sig) plt.xlabel("n") plt.ylabel("value") plt.grid(True) plt.tight_layout() plt.savefig('/home/aistudio/stdout.jpg') plt.show() ''' #------------------------------------------------------------ dataseg = list(where(sig > 20000)) posid = dataseg[0] posdelta = array([s1-s2 for s1,s2 in zip(posid[1:], posid[:-1])]) beginid = array(list(zip(posid[1:],posid[:-1])))[where(posdelta > 400000)] printt(len(beginid.T[0])-1|) #------------------------------------------------------------ lowfdim = [697, 770, 852, 941] highfdim = [1209, 1336, 1477, 1633] deltaf = 25 def freq2code(lowf, highf): lowid = 0 highid = 0 for i,f in enumerate(lowfdim): if lowf >= f-deltaf and lowf < f+deltaf: lowid = i break for i,f in enumerate(highfdim): if highf >= f-deltaf and highf < f+deltaf: highid = i break idid = lowid + highid*4 return "147*2580369#ABCD"[idid] #------------------------------------------------------------ def wave2phone(wavedata, fs): segpos = [int(s) for s in linspace(0, len(wavedata), 12, endpoint=True)] fstart = 500 fend = 1800 fmid = 1075 code = '' for i in range(11): wave = wavedata[segpos[i]:segpos[i+1]] fstartid = int(len(wave)*fstart/fs) fendid = int(len(wave)*fend/fs) fmidid = int(len(wave)*fmid/fs) fftabs = abs(fft.fft(wave))/len(wave) lowfft = fftabs[fstartid:fmidid] highfft = fftabs[fmidid:fendid] lowid = where(lowfft==max(lowfft))[0][0]+fstartid highid = where(highfft==max(highfft))[0][0]+fmidid lowf = lowid * fs /len(wave) highf = highid * fs /len(wave) code = code + freq2code(lowf, highf) return code #------------------------------------------------------------ wavetime = 2.2 wavelength = int(sample_rate * wavetime) error = 0 for ii in beginid.T[0][:-1]: startid = ii - 2200 endid = startid + wavelength ret = wave2phone(sig[startid:endid], sample_rate) if ret != '18612110728': error += 1 #------------------------------------------------------------ printt(error|) #------------------------------------------------------------ # END OF FILE : TEST2.PY #============================================================
#!/usr/local/bin/python # -*- coding: gbk -*- #============================================================ # TEST1.PY -- by Dr. ZhuoQing 2022-01-21 # # Note: #============================================================ from headm import * # = from scipy import signal from scipy.io import wavfile from tqdm import tqdm #------------------------------------------------------------ wavefilename = '/home/aistudio/data/data126234/wav2.wav' sample_rate,sig = wavfile.read(wavefilename) #------------------------------------------------------------ dataseg = list(where(sig[:,0] > 20000)) posid = dataseg[0] posdelta = array([s1-s2 for s1,s2 in zip(posid[1:], posid[:-1])]) #------------------------------------------------------------ plt.clf() plt.figure(figsize=(10,6)) plt.plot(posdelta[:9000], label='POSDELTA') plt.xlabel("n") plt.ylabel("deltapos") plt.grid(True) plt.tight_layout() plt.savefig('/home/aistudio/stdout.jpg') plt.show() #------------------------------------------------------------ beginid = array(list(zip(posid[1:],posid[:-1])))[where(posdelta > 400000)] printt(beginid.T:, shape(beginid):) #------------------------------------------------------------ wavetime = 2.2 wavelength = int(sample_rate * wavetime) gifpath = '/home/aistudio/GIF' if not os.path.isdir(gifpath): os.makedirs(gifpath) gifdim = os.listdir(gifpath) for f in gifdim: fn = os.path.join(gifpath, f) if os.path.isfile(fn): os.remove(fn) count = 0 for id in beginid.T[0][:-1]: startid = id - 2200 endid = startid + wavelength plt.clf() plt.figure(figsize=(10,6)) plt.plot(sig[startid:endid, 0]) plt.xlabel("n") plt.ylabel("value") plt.grid(True) plt.tight_layout() # plt.savefig('/home/aistudio/stdout.jpg') # plt.show() savefile = os.path.join(gifpath, '%03d.jpg'%count) count += 1 plt.savefig(savefile) #------------------------------------------------------------ gifpath = '/home/aistudio/GIF' if not os.path.isdir(gifpath): os.makedirs(gifpath) gifdim = os.listdir(gifpath) for f in gifdim: fn = os.path.join(gifpath, f) if os.path.isfile(fn): os.remove(fn) count = 0 for id in tqdm(beginid.T[0][:-1]): startid = id - 2200 endid = startid + wavelength f,t,Sxx = signal.spectrogram(sig[startid:endid, 0], fs=sample_rate, nperseg =2048, noverlap=2000, nfft=8192) thread = 1500000 Sxx[where(Sxx>thread)] = thread plt.clf() plt.figure(figsize=(10,6)) plt.pcolormesh(t, f[50:350], Sxx[50:350,:]) plt.xlabel("Time(S)") plt.ylabel("Frequency(Hz)") plt.grid(True) plt.tight_layout() savefile = os.path.join(gifpath, '%03d.jpg'%count) count += 1 plt.savefig(savefile) #------------------------------------------------------------ wavetime = 2.2 wavelength = int(sample_rate * wavetime) startid = beginid.T[0][0] - 2200 endid = startid + wavelength segpos = [int(s) for s in linspace(startid, endid, 12, endpoint=True)] plt.clf() plt.figure(figsize=(10,16)) for i in range(11): plt.subplot(11,1,i+1) plt.plot(sig[segpos[i]:segpos[i+1], 0]) plt.xlabel('n') plt.ylabel('Value') plt.grid(True) plt.savefig('/home/aistudio/stdout.jpg') plt.show() #------------------------------------------------------------ segnum = sig[segpos[0]:segpos[1], 0] fftabs = abs(fft.fft(segnum))/len(segnum) fstart = 500 fend = 1800 fmid = 1075 fstartid = int(len(segnum)*fstart/sample_rate) fendid = int(len(segnum)*fend/sample_rate) fmidid = int(len(segnum)*fmid/sample_rate) fdim = linspace(0, sample_rate, len(segnum)) plt.clf() plt.figure(figsize=(10,6)) plt.plot(fdim[fstartid:fendid], fftabs[fstartid:fendid]) plt.xlabel("Frequency") plt.ylabel("Amplitude") plt.grid(True) plt.tight_layout() plt.savefig('/home/aistudio/stdout.jpg') plt.show() #------------------------------------------------------------ lowfft = fftabs[fstartid:fmidid] highfft = fftabs[fmidid:fendid] lowid = where(lowfft==max(lowfft))[0][0]+fstartid highid = where(highfft==max(highfft))[0][0]+fmidid lowf = lowid * sample_rate/len(segnum) highf = highid * sample_rate/len(segnum) printt(lowf:, highf:) #------------------------------------------------------------ lowfdim = [697, 770, 852, 941] highfdim = [1209, 1336, 1477, 1633] deltaf = 25 def freq2code(lowf, highf): lowid = 0 highid = 0 for i,f in enumerate(lowfdim): if lowf >= f-deltaf and lowf < f+deltaf: lowid = i break for i,f in enumerate(highfdim): if highf >= f-deltaf and highf < f+deltaf: highid = i break idid = lowid + highid*4 return "147*2580369#ABCD"[idid] #------------------------------------------------------------ ''' code = freq2code(lowf, highf) printt(code:) ''' #------------------------------------------------------------ def wave2phone(wavedata, fs): segpos = [int(s) for s in linspace(0, len(wavedata), 12, endpoint=True)] fstart = 500 fend = 1800 fmid = 1075 code = '' for i in range(11): wave = wavedata[segpos[i]:segpos[i+1]] fstartid = int(len(wave)*fstart/fs) fendid = int(len(wave)*fend/fs) fmidid = int(len(wave)*fmid/fs) fftabs = abs(fft.fft(wave))/len(wave) lowfft = fftabs[fstartid:fmidid] highfft = fftabs[fmidid:fendid] lowid = where(lowfft==max(lowfft))[0][0]+fstartid highid = where(highfft==max(highfft))[0][0]+fmidid lowf = lowid * fs /len(wave) highf = highid * fs /len(wave) code = code + freq2code(lowf, highf) return code #------------------------------------------------------------ wavetime = 2.2 wavelength = int(sample_rate * wavetime) for ii in beginid.T[0][:-1]: startid = ii - 2200 endid = startid + wavelength ret = wave2phone(sig[startid:endid, 0], sample_rate) printt(ret:) #------------------------------------------------------------ # END OF FILE : TEST1.PY #============================================================
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。