赞
踩
时频分析方法使用时-频域联合分布描述时间序列信号的瞬态特征,并通过瞬时频率估计来表征信号的特征频率随时间变化的趋势,在时间序列信号处理中得到了广泛的应用。STFT 和WT等常用的时频分析方法时频分辨率较低,而且对于多分量时变信号的匹配效果不佳;WVD对噪声的鲁棒性不足且对于多分量时变信号存在交叉干扰项;EMD及其改进方法缺乏数学理论支撑,并存在端点效应和模态混叠等问题。以上时频分析方法存在一些共性问题,例如它们在时频平面的变换系数分布比较离散,瞬时频率曲线幅值能量不够集中,因此时频谱会出现模糊的现象。为了实现理想的时频表示,在原始时频谱的基础上进行能量重排是当前的研究热点。基于同步压缩变换SST的时频分析方法实现了对时频系数的压缩和重排,能够对复杂多分量信号实现高分辨率表达。同步压缩变换的前身,即重排算法,具有坚实的理论基础。重排算法作为一种后处理的时频分析方法,主要用于提升时频表达的效果,但是它最大的缺陷是不支持信号重构。在此基础上,小波的创始人之一Daubechies等在2011年提出了同步压缩小波变换SST,SST通过同步压缩算子对时频系数进行重排,将信号在时频平面任一点处的时频分布移到能量的重心位置,增强瞬时频率的能量集中,较好地解决传统时频分析方法存在的时频模糊问题。
前置知识可参考如下文章
调制信号的连续小波变换 - 哥廷根数学学派的文章 - 知乎 调制信号的连续小波变换 - 知乎
再看连续小波变换 - 哥廷根数学学派的文章 - 知乎 再看连续小波变换 - 知乎
同步压缩变换初探 - 哥廷根数学学派的文章 - 知乎 同步压缩变换初探 - 知乎
同步压缩变换在机械振动信号处理领域[1],在微震信号处理领域[2],辐射源个体识别领域[3],雷达辐射源分选识别领域[4],储层识别领域[5],通信信号调制识别领域[6],生理信号处理领域[7]等方面都有广泛应用。
由于很多研究者在工作中使用python语言,因为这篇文章介绍在python环境下如何更好地使用SST方法。首先安装ssqueezepy模块:pip install ssqueezepy
ssqueezepy模块里是这么介绍SST的:Synchrosqueezing is a powerful reassignment method that focuses time-frequency representations, and allows extraction of instantaneous amplitudes and frequencies.
主要包括如下内容
1.首先,看一下基础模块Benchmarks
导入相关模块
- import os
- import numpy as np
- import gc
- import pandas as pd
- import scipy.signal as sig
- #pip install librosa
- import librosa
- from pywt import cwt as pcwt
- from timeit import timeit as _timeit
-
- from ssqueezepy import cwt, stft, ssq_cwt, ssq_stft, Wavelet
- from ssqueezepy.utils import process_scales, padsignal
- from ssqueezepy.ssqueezing import _compute_associated_frequencies
-
- def timeit(fn, number=10):
- return _timeit(fn, number=number) / number
定义一些基础函数,以后都会用到
- #输出名称
- def print_report(header, times):
- print(("{}\n"
- "CWT: {:.3f} sec\n"
- "STFT: {:.3f} sec\n"
- "SSQ_CWT: {:.3f} sec\n"
- "SSQ_STFT: {:.3f} sec\n"
- ).format(header, *list(times.values())[-4:]))
- #CWT同步压缩变换
- def time_ssq_cwt(x, dtype, scales, cache_wavelet, ssq_freqs):
- wavelet = Wavelet(dtype=dtype)
- kw = dict(wavelet=wavelet, scales=scales, ssq_freqs=ssq_freqs)
- if cache_wavelet:
- for _ in range(3): # warmup run
- _ = ssq_cwt(x, cache_wavelet=True, **kw)
- del _; gc.collect()
- return timeit(lambda: ssq_cwt(x, cache_wavelet=cache_wavelet, **kw))
- #STFT同步压缩变换
- def time_ssq_stft(x, dtype, n_fft):
- for _ in range(3):
- _ = ssq_stft(x, dtype=dtype, n_fft=n_fft)
- del _; gc.collect()
- return timeit(lambda: ssq_stft(x, dtype=dtype, n_fft=n_fft))
- #CWT变换
- def time_cwt(x, dtype, scales, cache_wavelet):
- wavelet = Wavelet(dtype=dtype)
- if cache_wavelet:
- for _ in range(3): # warmup run
- _ = cwt(x, wavelet, scales=scales, cache_wavelet=True)
- del _; gc.collect()
- return timeit(lambda: cwt(x, wavelet, scales=scales,
- cache_wavelet=cache_wavelet))
- #STFT变换
- def time_stft(x, dtype, n_fft):
- for _ in range(3):
- _ = stft(x, dtype=dtype, n_fft=n_fft)
- del _; gc.collect()
- return timeit(lambda: stft(x, dtype=dtype, n_fft=n_fft))
-
- def time_all(x, dtype, scales, cache_wavelet, ssq_freqs, n_fft):
- num = str(len(x))[:-3] + 'k'
- return {num: '',
- f'{num}-cwt': time_cwt(x, dtype, scales, cache_wavelet),
- f'{num}-stft': time_stft(x, dtype, n_fft),
- f'{num}-ssq_cwt': time_ssq_cwt(x, dtype, scales, cache_wavelet,
- ssq_freqs),
- f'{num}-ssq_stft': time_ssq_stft(x, dtype, n_fft)
- }
- x = np.random.randn(1000)
- for dtype in ('float32', 'float64'):
- wavelet = Wavelet(dtype=dtype)
- _ = ssq_cwt(x, wavelet, cache_wavelet=False)
- _ = ssq_stft(x, dtype=dtype)
- del _, wavelet
设置STFT 和 CWT的 输出参数
- N0, N1 = 10000, 160000 # selected such that CWT pad length ratios are same
- n_rows = 300
- n_fft = n_rows * 2 - 2
-
- wavelet = Wavelet()
- scales = process_scales('log-piecewise', N1, wavelet=wavelet)[:n_rows]
- ssq_freqs = _compute_associated_frequencies(
- scales, N1, wavelet, 'log-piecewise', maprange='peak',
- was_padded=True, dt=1, transform='cwt')
-
- kw = dict(scales=scales, ssq_freqs=ssq_freqs, n_fft=n_fft)
- t_all = {}
使用一个很简单的数据x = np.random.randn(N)测试用时
- #基础用时
- print("// BASELINE (dtype=float32, cache_wavelet=True)")
- os.environ['SSQ_PARALLEL'] = '0'
- os.environ['SSQ_GPU'] = '0'
- t_all['base'] = {}
- dtype = 'float32'
-
- for N in (N0, N1):
- x = np.random.randn(N)
- t_all['base'].update(time_all(x, dtype=dtype, cache_wavelet=True, **kw))
- print_report(f"/ N={N}", t_all['base'])
-
- #并行+小波Parallel + wavelet cache用时
- print("// PARALLEL + CACHE (dtype=float32, cache_wavelet=True)")
-
- os.environ['SSQ_PARALLEL'] = '1'
- os.environ['SSQ_GPU'] = '0'
- t_all['parallel'] = {}
- for N in (N0, N1):
- x = np.random.randn(N)
- t_all['parallel'].update(time_all(x, dtype='float32', cache_wavelet=True,
- **kw))
- print_report(f"/ N={N}", t_all['parallel'])
-
- #GPU+小波GPU + wavelet cache用时
- print("// GPU + CACHE (dtype=float32, cache_wavelet=True)")
-
- os.environ['SSQ_GPU'] = '1'
- t_all['gpu'] = {}
- for N in (N0, N1):
- x = np.random.randn(N)
- t_all['gpu'].update(time_all(x, dtype='float32', cache_wavelet=True, **kw))
- print_report(f"/ N={N}", t_all['gpu'])
然后测试一下python的其他模块执行小波变换的耗时
- #PyWavelets模块,小波变换常用的一个python模块
- for N in (N0, N1):
- x = np.random.randn(N)
- xp = padsignal(x)
- t = timeit(lambda: pcwt(xp, wavelet='cmor1.5-1.0', scales=scales,
- method='fft'))
- print("pywt_cwt-%s:" % N, t)
-
- #Scipy模块
- for N in (N0, N1):
- x = np.random.randn(N)
- xp = padsignal(x)
- t = timeit(lambda: sig.cwt(xp, wavelet=sig.morlet,
- widths=np.arange(4, 4 + len(scales))))
- print("scipy_cwt-%s:" % N, t)
-
- #%%
- for N in (N0, N1):
- x = np.random.randn(N)
- t = timeit(lambda: sig.stft(x, nperseg=n_fft, nfft=n_fft, noverlap=n_fft-1))
- print("scipy_stft-%s:" % N, t)
-
- #Librosa模块,语音信号分析中常用的模块
- # NOTE: we bench here with float64 since float32 is slower for librosa as of 0.8.0
- for N in (N0, N1):
- x = np.random.randn(N)
- t = timeit(lambda: librosa.stft(x, n_fft=n_fft, hop_length=1, dtype='float64'))
- print("librosa_stft-%s:" % N, t)
最后看一下执行结果
2.讲完了benchmarks,接下来看一下test_transforms相关内容
导入相关模块
- import numpy as np
- import scipy.signal as sig
- from ssqueezepy import Wavelet, TestSignals
- from ssqueezepy.utils import window_resolution
生成测试信号
- tsigs = TestSignals(N=2048)
- # 设置 `dft` 模式
- dft = (None, 'rows', 'cols')[0]
- tsigs.demo(dft=dft)
看一下配备了多少测试信号
还有很多很多,就不一一列举了。
还可以指定测试信号类,例如
- signals = [
- 'am-cosine',
- ('hchirp', dict(fmin=.2)),
- ('sine:am-cosine', (dict(f=32, phi0=1), dict(amin=.3))),
- ]
- tsigs.demo(signals, N=2048)
加上`dft` 参数,就展示了相应的频谱
- tsigs.demo(signals, dft='rows')
- #tsigs.demo(signals, dft='cols')
下面开始进行CWT变换和SST变换,使用广义morse小波,但参数不同,只列出几个图片
- tsigs = TestSignals(N=2048)
- wavelets = [
- Wavelet(('gmw', {'beta': 60})),
- Wavelet(('gmw', {'beta': 5})),
- ]
- tsigs.wavcomp(wavelets, signals='all')
再看一个chirp信号的例子
tsigs.wavcomp(wavelets, signals=[('#echirp', dict(fmin=.1))], N=2048)
再对比一下CWT和STFT以及相应的同步压缩变换的例子
再看一下含噪声信号的例子
再简要看一下时频谱图的脊线提取的例子,以后会慢慢讲
今天就讲到这吧,精华就是这些,其他的关于小波的知识以后再讲。
时间序列信号处理系列-基于Python的同步压缩变换 - 哥廷根数学学派的文章 - 知乎 https://zhuanlan.zhihu.com/p/554189692
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。