当前位置:   article > 正文

基于循环神经网络的一维信号降噪方法(简单版本,Python)

基于循环神经网络的一维信号降噪方法(简单版本,Python)

代码非常简单。

  1. import torch
  2. import torch.nn as nn
  3. from torch.autograd import Variable
  4. from scipy.io.wavfile import write
  5. #need install pydub module
  6. #pip install pydub
  7. import numpy as np
  8. import pydub
  9. from scipy import signal
  10. import IPython
  11. import matplotlib.pylab as plt
  12. from mpl_toolkits.mplot3d import Axes3D
  13. # For running on GPU
  14. #device = torch.device("cuda")# choose your device
  15. device = torch.device("cpu")
  16. a = torch.rand(5, 5, device=device)# change by either using the device argument
  17. a = a.to(device)# or by .to()

Make data

  1. fs = 512
  2. x = np.linspace(0, 20*np.pi * (1-1/(10*fs)), fs*10)
  3. y_sin = 0.5*np.sin(x)
  4. plt.plot(x, y_sin)
  5. plt.xlabel('Angle [rad]')
  6. plt.ylabel('sin(x)')
  7. plt.axis('tight')
  8. plt.show()

  1. y_triangle = 0.5*signal.sawtooth(x, 0.5)
  2. plt.plot(x, y_triangle)
  3. plt.xlabel('Phase [rad]')
  4. plt.ylabel('triangle(x)')
  5. plt.axis('tight')
  6. plt.show()

  1. y_saw = 0.5*signal.sawtooth(x, 1)
  2. plt.plot(x, y_saw)
  3. plt.xlabel('Phase [rad]')
  4. plt.ylabel('sawtooth(x)')
  5. plt.axis('tight')
  6. plt.show()

Add Gaussian Noise

Add noise

  1. # Add guassian noise
  2. y_sin_n = y_sin + 0.1*np.random.normal(size=len(x))
  3. y_triangle_n = y_triangle + 0.1*np.random.normal(size=len(x))
  4. y_saw_n = y_saw + 0.1*np.random.normal(size=len(x))
  5. plt.plot(x, y_sin_n)
  6. plt.xlabel('Angle [rad]')
  7. plt.ylabel('sin(x) + noise')
  8. plt.axis('tight')
  9. plt.show()

  1. plt.plot(x, y_triangle_n)
  2. plt.xlabel('Phase [rad]')
  3. plt.ylabel('triangle(x) + noise')
  4. plt.axis('tight')
  5. plt.show()

  1. plt.plot(x, y_saw_n)
  2. plt.xlabel('Phase [rad]')
  3. plt.ylabel('sawtooth(x) + noise')
  4. plt.axis('tight')
  5. plt.show()

Creating Dataset

  1. def give_part_of_data(x, y, n_samples=10000, sample_size=100) :
  2. data_inp = np.zeros((n_samples, sample_size))
  3. data_out = np.zeros((n_samples, sample_size))
  4. for i in range(n_samples):
  5. random_offset = np.random.randint(0, len(x) - sample_size)
  6. sample_inp = x[random_offset:random_offset+sample_size]
  7. sample_out = y[random_offset:random_offset+sample_size]
  8. data_inp[i, :] = sample_inp
  9. data_out[i, :] = sample_out
  10. return data_inp, data_out
  11. # Train, Validationa, and Test
  12. sin_train_in, sin_train_out = give_part_of_data(y_sin_n[0:int(7/10 * len(x))], y_sin[0:int(7/10 * len(x))], 2000, int(len(x)/6))
  13. tri_train_in, tri_train_out = give_part_of_data(y_triangle_n[0:int(7/10 * len(x))], y_triangle[0:int(7/10 * len(x))], 2000, int(len(x)/6))
  14. saw_train_in, saw_train_out = give_part_of_data(y_saw_n[0:int(7/10 * len(x))], y_saw[0:int(7/10 * len(x))], 2000, int(len(x)/6))
  15. sin_val_in, sin_val_out = y_sin_n[int(7/10 * len(x)):int(8/10 * len(x))], y_sin[int(7/10 * len(x)):int(8/10 * len(x))]
  16. tri_val_in, tri_val_out = y_triangle_n[int(7/10 * len(x)):int(8/10 * len(x))], y_triangle[int(7/10 * len(x)):int(8/10 * len(x))]
  17. saw_val_in, saw_val_out = y_saw_n[int(7/10 * len(x)):int(8/10 * len(x))], y_saw[int(7/10 * len(x)):int(8/10 * len(x))]
  18. sin_test_in, sin_test_out = y_sin_n[int(8/10 * len(x)):int(10/10 * len(x))], y_sin[int(8/10 * len(x)):int(10/10 * len(x))]
  19. tri_test_in, tri_test_out = y_triangle_n[int(8/10 * len(x)):int(10/10 * len(x))], y_triangle[int(8/10 * len(x)):int(10/10 * len(x))]
  20. saw_test_in, saw_test_out = y_saw_n[int(8/10 * len(x)):int(10/10 * len(x))], y_saw[int(8/10 * len(x)):int(10/10 * len(x))]
  21. plt.plot(range(853), sin_train_in[3])
  22. plt.plot(range(853), sin_train_out[3])
  23. plt.xlabel('Phase [rad]')
  24. plt.ylabel('sin(x) + noise')
  25. plt.axis('tight')
  26. plt.show()

RNN + Sin

  1. # RNN model
  2. input_dim = 1
  3. hidden_size_1 = 60
  4. hidden_size_2 = 60
  5. output_size = 1
  6. class CustomRNN(nn.Module):
  7. def __init__(self, input_size, hidden_size_1, hidden_size_2, output_size):
  8. super(CustomRNN, self).__init__()
  9. self.rnn = nn.RNN(input_size=input_size, hidden_size=hidden_size_1, batch_first=True)
  10. self.linear = nn.Linear(hidden_size_1, hidden_size_2, )
  11. self.act = nn.Tanh()
  12. self.linear = nn.Linear(hidden_size_2, output_size, )
  13. self.act = nn.Tanh()
  14. def forward(self, x):
  15. pred, hidden = self.rnn(x, None)
  16. pred = self.act(self.linear(pred)).view(pred.data.shape[0], -1, 1)
  17. return pred
  18. model = CustomRNN(input_dim, hidden_size_1, hidden_size_2, output_size)
  19. model = model.to(device)
  20. optimizer = torch.optim.Adam(model.parameters())
  21. loss_func = nn.MSELoss()
  22. lr = 1e-2
  23. for t in range(1000):
  24. inp = torch.Tensor(sin_train_in[..., np.newaxis] )
  25. inp.requires_grad = True
  26. inp = inp.to(device)
  27. out = torch.Tensor(sin_train_out[..., np.newaxis])
  28. out = out.to(device)
  29. pred = model(inp)
  30. optimizer.zero_grad()
  31. loss = loss_func(pred, out)
  32. if t%20==0:
  33. print(t, loss.data.item())
  34. lr = lr / 1.0001
  35. optimizer.param_groups[0]['lr'] = lr
  36. loss.backward()
  37. optimizer.step()
  38. test_in = sin_test_in
  39. inp = torch.Tensor(test_in[np.newaxis, ... , np.newaxis] )
  40. inp = inp.to(device)
  41. pred = model(inp).cpu().detach().numpy()
  42. plt.plot(range(len(sin_test_in)), test_in)
  43. plt.plot(range(len(sin_test_in)), pred[0, :,0])
  44. plt.show
  45. orginal_SNR = np.sum(np.abs(sin_test_out)**2) / np.sum(np.abs(sin_test_in - sin_test_out)**2)
  46. orginal_SNR_db = 10*np.log(orginal_SNR)/np.log(10)
  47. print('Original SNR : ', orginal_SNR)
  48. print('Original SNR DB : ', orginal_SNR_db)
  49. network_SNR = np.sum(np.abs(sin_test_out)**2) / np.sum(np.abs(pred[0, :,0] - sin_test_out)**2)
  50. network_SNR_db = 10*np.log(network_SNR)/np.log(10)
  51. print('Network SNR : ', network_SNR)
  52. print('Network SNR DB : ', network_SNR_db)
  53. Original SNR : 12.951857235597608
  54. Original SNR DB : 11.123320486750668
  55. Network SNR : 107.29848229242438
  56. Network SNR DB : 20.305935790331755

RNN + Triangular

  1. # RNN model
  2. input_dim = 1
  3. hidden_size_1 = 60
  4. hidden_size_2 = 60
  5. output_size = 1
  6. class CustomRNN(nn.Module):
  7. def __init__(self, input_size, hidden_size_1, hidden_size_2, output_size):
  8. super(CustomRNN, self).__init__()
  9. self.rnn = nn.RNN(input_size=input_size, hidden_size=hidden_size_1, batch_first=True)
  10. self.linear = nn.Linear(hidden_size_1, hidden_size_2, )
  11. self.act = nn.Tanh()
  12. self.linear = nn.Linear(hidden_size_2, output_size, )
  13. self.act = nn.Tanh()
  14. def forward(self, x):
  15. pred, hidden = self.rnn(x, None)
  16. pred = self.act(self.linear(pred)).view(pred.data.shape[0], -1, 1)
  17. return pred
  18. model = CustomRNN(input_dim, hidden_size_1, hidden_size_2, output_size)
  19. model = model.to(device)
  20. optimizer = torch.optim.Adam(model.parameters())
  21. loss_func = nn.MSELoss()
  22. lr = 1e-2
  23. for t in range(1000):
  24. inp = torch.Tensor(tri_train_in[..., np.newaxis] )
  25. inp.requires_grad = True
  26. inp = inp.to(device)
  27. out = torch.Tensor(tri_train_out[..., np.newaxis])
  28. out = out.to(device)
  29. pred = model(inp)
  30. optimizer.zero_grad()
  31. loss = loss_func(pred, out)
  32. if t%20==0:
  33. print(t, loss.data.item())
  34. lr = lr / 1.0001
  35. optimizer.param_groups[0]['lr'] = lr
  36. loss.backward()
  37. optimizer.step()
  38. test_in = tri_test_in
  39. inp = torch.Tensor(test_in[np.newaxis, ... , np.newaxis] )
  40. inp = inp.to(device)
  41. pred = model(inp).cpu().detach().numpy()
  42. plt.plot(range(len(tri_test_in)), test_in)
  43. plt.plot(range(len(tri_test_in)), pred[0, :,0])
  44. plt.show
  45. orginal_SNR = np.sum(np.abs(tri_test_out)**2) / np.sum(np.abs(tri_test_in - tri_test_out)**2)
  46. orginal_SNR_db = 10*np.log(orginal_SNR)/np.log(10)
  47. print('Original SNR : ', orginal_SNR)
  48. print('Original SNR DB : ', orginal_SNR_db)
  49. network_SNR = np.sum(np.abs(tri_test_out)**2) / np.sum(np.abs(pred[0, :,0] - tri_test_out)**2)
  50. network_SNR_db = 10*np.log(network_SNR)/np.log(10)
  51. print('Network SNR : ', network_SNR)
  52. print('Network SNR DB : ', network_SNR_db)
  53. Original SNR : 9.06282337035853
  54. Original SNR DB : 9.572635159053185
  55. Network SNR : 46.622532666082044
  56. Network SNR DB : 16.685958619136

RNN + Sawtooth

  1. # RNN model
  2. input_dim = 1
  3. hidden_size_1 = 60
  4. hidden_size_2 = 60
  5. output_size = 1
  6. class CustomRNN(nn.Module):
  7. def __init__(self, input_size, hidden_size_1, hidden_size_2, output_size):
  8. super(CustomRNN, self).__init__()
  9. self.rnn = nn.RNN(input_size=input_size, hidden_size=hidden_size_1, batch_first=True)
  10. self.linear = nn.Linear(hidden_size_1, hidden_size_2, )
  11. self.act = nn.Tanh()
  12. self.linear = nn.Linear(hidden_size_2, output_size, )
  13. self.act = nn.Tanh()
  14. def forward(self, x):
  15. pred, hidden = self.rnn(x, None)
  16. pred = self.act(self.linear(pred)).view(pred.data.shape[0], -1, 1)
  17. return pred
  18. model = CustomRNN(input_dim, hidden_size_1, hidden_size_2, output_size)
  19. model = model.to(device)
  20. optimizer = torch.optim.Adam(model.parameters())
  21. loss_func = nn.MSELoss()
  22. lr = 1e-2
  23. for t in range(1000):
  24. inp = torch.Tensor(tri_train_in[..., np.newaxis] )
  25. inp.requires_grad = True
  26. inp = inp.to(device)
  27. out = torch.Tensor(tri_train_out[..., np.newaxis])
  28. out = out.to(device)
  29. pred = model(inp)
  30. optimizer.zero_grad()
  31. loss = loss_func(pred, out)
  32. if t%20==0:
  33. print(t, loss.data.item())
  34. lr = lr / 1.0001
  35. optimizer.param_groups[0]['lr'] = lr
  36. loss.backward()
  37. optimizer.step()
  38. test_in = saw_test_in
  39. inp = torch.Tensor(test_in[np.newaxis, ... , np.newaxis] )
  40. inp = inp.to(device)
  41. pred = model(inp).cpu().detach().numpy()
  42. plt.plot(range(len(saw_test_in)), test_in)
  43. plt.plot(range(len(saw_test_in)), pred[0, :,0])
  44. plt.show
  45. orginal_SNR = np.sum(np.abs(saw_test_out)**2) / np.sum(np.abs(saw_test_in - saw_test_out)**2)
  46. orginal_SNR_db = 10*np.log(orginal_SNR)/np.log(10)
  47. print('Original SNR : ', orginal_SNR)
  48. print('Original SNR DB : ', orginal_SNR_db)
  49. network_SNR = np.sum(np.abs(saw_test_out)**2) / np.sum(np.abs(pred[0, :,0] - saw_test_out)**2)
  50. network_SNR_db = 10*np.log(network_SNR)/np.log(10)
  51. print('Network SNR : ', network_SNR)
  52. print('Network SNR DB : ', network_SNR_db)
  53. Original SNR : 8.918716305325825
  54. Original SNR DB : 9.50302349708762
  55. Network SNR : 26.97065260659425
  56. Network SNR DB : 14.308914551667852

  1. 知乎学术咨询:
  2. https://www.zhihu.com/consult/people/792359672131756032?isMe=1

工学博士,担任《Mechanical System and Signal Processing》《中国电机工程学报》《控制与决策》等期刊审稿专家,擅长领域:现代信号处理,机器学习,深度学习,数字孪生,时间序列分析,设备缺陷检测、设备异常检测、设备智能故障诊断与健康管理PHM等。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/木道寻08/article/detail/797709
推荐阅读
相关标签
  

闽ICP备14008679号