赞
踩
import numpy as np w=np.array([5,3,4]).transpose() b=np.array([100]) def compute_error_for_given_points(b,w,points): totalError = 0 for i in range(0, len(points)): x = points[i].x y = points[i].y totalError += (y-(np.dot(w,x)+b)) ** 2 return totalError / float(len(points)) def step_gradient(b_current, w_current, points, learningRate): b_gradient = 0 w_gradient = np.zeros((len(w_current))) N = float(len(points)) for i in range(0, len(points)): x = points[i].x y = points[i].y residual = np.dot(w_current,x)+b_current - y b_gradient += (2) * residual w_gradient += (2/N) * residual * x new_b = b_current - (learningRate * b_gradient) new_w = w_current - (learningRate * w_gradient) return [new_b, new_w] def gradient_descent_runner(starting_b, starting_w, points, learning_rate, num_iterations): b = starting_b w = starting_w percent_old = 0 for i in range(num_iterations): percent = i/num_iterations if percent-percent_old > 0.01: percent_old = percent print("\r进度为{:.2f}%".format(percent*100), end="") b, w = step_gradient(b, w, points, learning_rate) print("\n") return [b, w] class Point: def __init__(self, x, y): self.x = x self.y = y x = np.random.random((1000,3))*10+10 y = b + np.matmul(x,w) + np.random.normal(0, 1, 1000) # points = tf.data.Dataset.from_tensor_slices((x,y)) points = np.ndarray((1000), dtype=Point) for i in range(1000): points[i] = Point(x[i], y[i]) loss = compute_error_for_line_given_points(0, np.array([0,0,0]), points) print(f"loss before: {loss}") b, w = gradient_descent_runner(0, np.array([0,0,0]), points, 0.0001, 5000) loss = compute_error_for_line_given_points(b, w, points) print(f"loss after: {loss}") print(f"{b} + {w}*x")
损失函数为 M S E 时, L ( w ) = 1 n Σ i = 1 n ( f ^ ( x ( i ) ) − y ( i ) ) 2 。当模型为 f ^ ( x ) = y ^ = b + Σ i = 1 d w i x i 时, ∂ L ( w ) ∂ b = 2 n Σ i = 1 n ( y ^ ( i ) − y ( i ) ) , ∂ L ( w ) ∂ w j = 2 n Σ i = 1 n ( y ^ ( i ) − y ( i ) ) x j ( i ) 。由对向量求导的定义得: ∂ L ( w ) ∂ w = 2 n Σ i = 1 n ( y ^ ( i ) − y ( i ) ) x ( i ) 。将 b 视为 w 0 的增广形式结果类似,只需 x 0 固定为 1. 损失函数为MSE时,L(\boldsymbol w)= \frac {1}{n}\Sigma_{i=1}^n (\hat f(\boldsymbol x^{(i)})- y^{(i)})^2 。当模型为\hat f(\boldsymbol x)=\hat y=b+\Sigma_{i=1}^d w_ix_i时,\frac{\partial L(\boldsymbol w)}{\partial b}=\frac {2}{n}\Sigma_{i=1}^n (\hat y^{(i)}- y^{(i)}),\frac{\partial L(\boldsymbol w)}{\partial w_j}=\frac {2}{n}\Sigma_{i=1}^n (\hat y^{(i)}- y^{(i)})x^{(i)}_j。由对向量求导的定义得:\frac{\partial L(\boldsymbol w)}{\partial \boldsymbol w}=\frac {2}{n}\Sigma_{i=1}^n (\hat y^{(i)}- y^{(i)})\boldsymbol x^{(i)}。将b视为w_0的增广形式结果类似,只需x_0固定为1. 损失函数为MSE时,L(w)=n1Σi=1n(f^(x(i))−y(i))2。当模型为f^(x)=y^=b+Σi=1dwixi时,∂b∂L(w)=n2Σi=1n(y^(i)−y(i)),∂wj∂L(w)=n2Σi=1n(y^(i)−y(i))xj(i)。由对向量求导的定义得:∂w∂L(w)=n2Σi=1n(y^(i)−y(i))x(i)。将b视为w0的增广形式结果类似,只需x0固定为1.
当模型为
f
^
(
x
)
=
y
^
=
Σ
i
=
0
m
w
i
ϕ
i
(
x
)
时
,
当模型为\hat f(\boldsymbol x)=\hat y=\Sigma_{i=0}^m w_i \phi_i (\boldsymbol x)时,
当模型为f^(x)=y^=Σi=0mwiϕi(x)时,
其中
ϕ
i
(
x
)
为
d
元实值函数,可退化为
P
o
l
y
n
o
m
i
a
l
:
x
i
i
,
G
u
s
s
i
a
n
:
e
(
x
i
−
μ
i
)
2
2
σ
i
2
,
S
i
g
m
o
i
d
a
l
:
1
1
+
e
x
i
−
μ
i
σ
i
.
并保持
ϕ
0
(
x
)
≡
1
其中\phi_i(\boldsymbol x)为d元实值函数,可退化为Polynomial:x_i^i, Gussian:e^{\frac{(x_i-\mu_i)^2}{2\sigma_i^2}}, Sigmoidal:\frac{1}{1+e^{\frac{x_i-\mu_i}{\sigma_i}}}.并保持\phi_0(\boldsymbol x)\equiv1
其中ϕi(x)为d元实值函数,可退化为Polynomial:xii,Gussian:e2σi2(xi−μi)2,Sigmoidal:1+eσixi−μi1.并保持ϕ0(x)≡1
上述求导结果变为
∂
L
(
w
)
∂
w
=
2
n
Σ
i
=
1
n
(
y
^
(
i
)
−
y
(
i
)
)
ϕ
(
x
(
i
)
)
上述求导结果变为\frac{\partial L(\boldsymbol w)}{\partial \boldsymbol w}=\frac {2}{n}\Sigma_{i=1}^n (\hat y^{(i)}- y^{(i)})\boldsymbol \phi(\boldsymbol x^{(i)})
上述求导结果变为∂w∂L(w)=n2Σi=1n(y^(i)−y(i))ϕ(x(i))
在求解
∂
L
(
w
)
∂
w
=
0
时需要将上式中的
w
分离
.
即对
n
个向量求和为
0
向量问题需转化为矩阵问题
在求解\frac{\partial L(\boldsymbol w)}{\partial \boldsymbol w}=\boldsymbol 0时需要将上式中的\boldsymbol w分离.即对n个向量求和为0向量问题需转化为矩阵问题
在求解∂w∂L(w)=0时需要将上式中的w分离.即对n个向量求和为0向量问题需转化为矩阵问题
Σ
i
=
1
n
(
y
^
(
i
)
−
y
(
i
)
)
ϕ
(
x
(
i
)
)
\Sigma_{i=1}^n (\hat y^{(i)}- y^{(i)})\boldsymbol \phi(\boldsymbol x^{(i)})
Σi=1n(y^(i)−y(i))ϕ(x(i))
=
Σ
i
=
1
n
ϕ
(
x
(
i
)
)
(
y
^
(
i
)
−
y
(
i
)
)
=\Sigma_{i=1}^n \boldsymbol \phi(\boldsymbol x^{(i)})(\hat y^{(i)}- y^{(i)})
=Σi=1nϕ(x(i))(y^(i)−y(i))
=
(
ϕ
(
x
(
1
)
)
,
ϕ
(
x
(
2
)
,
)
⋯
,
ϕ
(
x
(
n
)
)
⋅
(
y
^
(
1
)
−
y
(
1
)
,
y
^
(
2
)
−
y
(
2
)
,
⋯
,
y
^
(
n
)
−
y
(
n
)
)
′
=(\boldsymbol \phi(\boldsymbol x^{(1)}),\boldsymbol \phi(\boldsymbol x^{(2)},)\cdots,\boldsymbol \phi(\boldsymbol x^{(n)})\cdot (\hat y^{(1)}- y^{(1)},\hat y^{(2)}- y^{(2)},\cdots,\hat y^{(n)}- y^{(n)})'
=(ϕ(x(1)),ϕ(x(2),)⋯,ϕ(x(n))⋅(y^(1)−y(1),y^(2)−y(2),⋯,y^(n)−y(n))′
=
(
ϕ
(
x
(
1
)
)
,
ϕ
(
x
(
2
)
)
,
⋯
,
ϕ
(
x
(
n
)
)
⋅
(
w
′
ϕ
(
x
(
1
)
)
−
y
(
1
)
,
w
′
ϕ
(
x
(
2
)
)
−
y
(
2
)
,
⋯
,
w
′
ϕ
(
x
(
n
)
)
−
y
(
n
)
)
=(\boldsymbol \phi(\boldsymbol x^{(1)}),\boldsymbol \phi(\boldsymbol x^{(2)}),\cdots,\boldsymbol \phi(\boldsymbol x^{(n)})\cdot \left( %左括号
=
(
ϕ
(
x
(
1
)
)
,
ϕ
(
x
(
2
)
)
,
⋯
,
ϕ
(
x
(
n
)
)
⋅
(
ϕ
(
x
(
1
)
)
′
w
−
y
(
1
)
,
ϕ
(
x
(
2
)
)
′
w
−
y
(
2
)
,
⋯
,
ϕ
(
x
(
n
)
)
′
w
−
y
(
n
)
)
=(\boldsymbol \phi(\boldsymbol x^{(1)}),\boldsymbol \phi(\boldsymbol x^{(2)}),\cdots,\boldsymbol \phi(\boldsymbol x^{(n)})\cdot \left( %左括号
=
(
ϕ
(
x
(
1
)
)
,
ϕ
(
x
(
2
)
)
,
⋯
,
ϕ
(
x
(
n
)
)
⋅
(
(
ϕ
(
x
(
1
)
)
′
,
ϕ
(
x
(
2
)
)
′
,
⋯
,
ϕ
(
x
(
n
)
)
′
)
w
−
y
)
=(\boldsymbol \phi(\boldsymbol x^{(1)}),\boldsymbol \phi(\boldsymbol x^{(2)}),\cdots,\boldsymbol \phi(\boldsymbol x^{(n)})\cdot (\left( %左括号
令
Φ
=
(
ϕ
(
x
(
1
)
)
,
ϕ
(
x
(
2
)
)
,
⋯
,
ϕ
(
x
(
n
)
)
=
(
ϕ
0
(
x
(
1
)
)
,
ϕ
0
(
x
(
2
)
)
,
⋯
,
ϕ
0
(
x
(
n
)
)
ϕ
1
(
x
(
1
)
)
,
ϕ
1
(
x
(
2
)
)
,
⋯
,
ϕ
1
(
x
(
n
)
)
⋯
ϕ
m
(
x
(
1
)
)
,
ϕ
m
(
x
(
2
)
)
,
⋯
,
ϕ
m
(
x
(
n
)
)
)
令\boldsymbol \Phi=(\boldsymbol \phi(\boldsymbol x^{(1)}),\boldsymbol \phi(\boldsymbol x^{(2)}),\cdots,\boldsymbol \phi(\boldsymbol x^{(n)})= \left(
∂
L
(
w
)
∂
w
=
Φ
⋅
(
Φ
′
w
−
y
)
\frac{\partial L(\boldsymbol w)}{\partial \boldsymbol w}=\boldsymbol \Phi \cdot (\boldsymbol \Phi' \boldsymbol w - \boldsymbol y)
∂w∂L(w)=Φ⋅(Φ′w−y)
此外,矩阵形式的梯度计算可能可以得到更好的
G
P
U
加速
此外,矩阵形式的梯度计算可能可以得到更好的GPU加速
此外,矩阵形式的梯度计算可能可以得到更好的GPU加速
∂
L
(
w
)
∂
w
=
Φ
⋅
(
Φ
′
w
−
y
)
=
0
\frac{\partial L(\boldsymbol w)}{\partial \boldsymbol w}=\boldsymbol \Phi \cdot (\boldsymbol \Phi' \boldsymbol w - \boldsymbol y)=\boldsymbol 0
∂w∂L(w)=Φ⋅(Φ′w−y)=0
Φ
Φ
′
w
−
Φ
y
=
0
\boldsymbol \Phi \boldsymbol \Phi' \boldsymbol w - \boldsymbol \Phi \boldsymbol y=\boldsymbol 0
ΦΦ′w−Φy=0
Φ
Φ
′
w
=
Φ
y
\boldsymbol \Phi \boldsymbol \Phi' \boldsymbol w =\boldsymbol \Phi \boldsymbol y
ΦΦ′w=Φy
w
=
(
Φ
Φ
′
)
−
1
Φ
y
\boldsymbol w =(\boldsymbol \Phi \boldsymbol \Phi')^{-1}\boldsymbol \Phi \boldsymbol y
w=(ΦΦ′)−1Φy
输出值为c个类别的评分,即c元向量,每个维度均由各自的线性函数产生
∂
L
(
w
i
)
∂
w
i
=
2
n
Σ
p
=
1
n
(
y
^
(
p
)
−
y
(
p
)
)
x
(
p
)
=
2
n
Σ
p
=
1
n
(
w
i
x
(
p
)
−
y
(
p
)
)
x
(
p
)
\frac{\partial L(\boldsymbol w_i)}{\partial \boldsymbol w_i}=\frac {2}{n}\Sigma_{p=1}^n (\hat y^{(p)}- y^{(p)})\boldsymbol x^{(p)}=\frac {2}{n}\Sigma_{p=1}^n (\boldsymbol w_i \boldsymbol x^{(p)}- y^{(p)})\boldsymbol x^{(p)}
∂wi∂L(wi)=n2Σp=1n(y^(p)−y(p))x(p)=n2Σp=1n(wix(p)−y(p))x(p)
每个维度的权重向量
w
i
\boldsymbol w_i
wi均可由上述方法求出。
1.若生成的x为特殊序列则随机梯度下降可能永远无法得到正确解,例如
(
1
,
1
,
1
)
,
(
2
,
2
,
2
)
,
⋯
,
(
n
,
n
,
n
)
(1,1,1),(2,2,2),\cdots,(n,n,n)
(1,1,1),(2,2,2),⋯,(n,n,n),此时各维权重总是增加相同值。
2.原模型偏置bias相对样本的teaching signal较小时,总是无法得到较好解。
import numpy as np import matplotlib.pyplot as plt import tensorflow as tf import keras from keras.layers import Input, Dense from keras.models import Model from keras.optimizers import Adam import time cifar10 = tf.keras.datasets.cifar10 (x_train, y_train), (x_valid, y_valid) = cifar10.load_data() x_train = x_train.reshape((50000,32*32*3)) x_valid = x_valid.reshape((10000,32*32*3)) x_valid = x_valid[0:1000] y_valid = y_valid[0:1000] class NearestNeighbor: def __init__(self): pass def train(self, X, y): """ X is N x D, y is 1-dimension of size N""" self.Xtr = X self.ytr = y def predict(self, X): """predict an array of patterns at one time""" start = time.time() num_test = X.shape[0] yPred = np.zeros(num_test, dtype = self.ytr.dtype) for i in range(num_test): print("\r{:.2f}%".format(i*100/num_test), end="") distances = np.sum(np.abs(self.Xtr-X[i,:]), axis = 1)#N1 # N2 # distances = np.linalg.norm(self.Xtr-X[i,:], ord=2, axis=1) # 余弦 # distances = np.zeros(self.Xtr.shape[0]) # for j in range(len(distances)): # numerator = np.dot(X[i], self.Xtr[j]) # denominator = math.sqrt(np.sum(np.square(X[i]))) # denominator *= math.sqrt(np.sum(np.square(self.Xtr[j]))) # distances[j] = numerator/denominator min_index = np.argmin(distances) yPred[i] = self.ytr[min_index] end = time.time() print(f"\n耗时{end-start}s") return yPred nn = NearestNeighbor() nn.train(x_train, y_train) yPred = nn.predict(x_valid) num_wrong = 0 for i in range(yPred.shape[0]): if yPred[i] != y_valid[i]: num_wrong += 1 print(num_wrong) print("accuracy is {:.2f}%".format((1-num_wrong/x_valid.shape[0])*100))
方法 | L1范式 | L2范式 | 余弦距离 |
---|---|---|---|
准确率/% | 25.30 | 20.90 | 9.30 |
import numpy as np import matplotlib.pyplot as plt import tensorflow as tf import math import keras from keras.layers import Input, Dense from keras.models import Model from keras.optimizers import Adam import time cifar10 = tf.keras.datasets.cifar10 (x_train, y_train), (x_valid, y_valid) = cifar10.load_data() x_train = x_train.reshape((50000,32*32*3)) x_valid = x_valid.reshape((10000,32*32*3)) x_valid = x_valid[0:1000] y_valid = y_valid[0:1000] class KNearestNeighbor: def __init__(self): pass def train(self, X, y, k): """ X is N x D, y is 1-dimension of size N""" self.Xtr = X self.ytr = y self.k = k def getIndex(self, distances): n = len(distances) cnts = np.zeros(10) pivot = np.partition(distances, self.k)[self.k] for i in range(n): if distances[i] <= pivot: cnts[self.ytr[i]] += 1 return np.argmax(cnts) def predict(self, X): """predict an array of patterns at one time""" start = time.time() num_test = X.shape[0] yPred = np.zeros(num_test, dtype = self.ytr.dtype) for i in range(num_test): print("\r{:.2f}%".format(100*i/num_test), end="") distances = np.sum(np.abs(self.Xtr-X[i,:]), axis = 1) yPred[i] = self.getIndex(distances) end = time.time() print(f"耗时{end-start}s") return yPred nn = KNearestNeighbor() for i in range(2, 10): # define range by your own nn.train(x_train, y_train, i) yPred = nn.predict(x_valid) num_wrong = 0 for j in range(yPred.shape[0]): if yPred[j] != y_valid[j]: num_wrong += 1 print(num_wrong) print("accuracy is {:.2f}%".format((1-num_wrong/x_valid.shape[0])*100))
使用L1范式作为测度
K | 准确率/% |
---|---|
2 | 23.90 |
3 | 24.40 |
4 | 25.40 |
5 | 23.70 |
6 | 24.60 |
7 | 25.00 |
8 | 25.20 |
9 | 25.00 |
25 | 24.10 |
50 | 22.80 |
100 | 21.40 |
500 | 19.10 |
1000 | 17.80 |
10000 | 12.90 |
使用批规范化后训练效果显著增强
GuassNoise产生噪点版 GaussianBlur产生模糊版
import numpy as np import matplotlib.pyplot as plt import tensorflow as tf import time import keras from keras.layers import Input, Dense, Conv2D, GlobalAvgPool2D, Reshape, Conv2DTranspose, BatchNormalization, ReLU from keras.models import Model from keras.optimizers import Adam import math import albumentations as A from keras.callbacks import Callback, TensorBoard import random import cv2 mnist = keras.datasets.mnist (x_train, y_train), (x_valid, y_valid) = mnist.load_data() x_train = np.expand_dims(x_train, -1) x_valid = np.expand_dims(x_valid, -1) # transform = A.Compose([A.GaussNoise(var_limit=(10,1000), p=1)]) transform = A.Compose([A.GaussianBlur(p=1)]) before = x_train[0] after = transform(image=before)["image"] plt.subplot(121) plt.imshow(np.squeeze(after), cmap='gray') plt.subplot(122) plt.imshow(np.squeeze(before), cmap='gray') plt.show() print(after.shape) def process_data(x): def augment_function(imgs): aug_imgs = [] for img in imgs: aug_data = transform(image=img) aug_imgs.append(aug_data['image']) aug_imgs = tf.cast(aug_imgs, tf.float32) / 255.0 # normalization return aug_imgs aug_imgs = tf.numpy_function(func=augment_function, inp=[x], Tout=[tf.float32]) targets = tf.cast(x, tf.float32) / 255.0 return aug_imgs, targets train_ds = tf.data.Dataset.from_tensor_slices(x_train) valid_ds = tf.data.Dataset.from_tensor_slices(x_valid) AUTOTUNE = tf.data.experimental.AUTOTUNE train_ds = train_ds.shuffle(len(train_ds)) train_ds = train_ds.batch(100) train_ds = train_ds.map(lambda x: tf.py_function(process_data, [x], [tf.float32, tf.float32]), num_parallel_calls=AUTOTUNE) valid_ds = valid_ds.batch(100) valid_ds = valid_ds.map(lambda x: tf.py_function(process_data, [x], [tf.float32, tf.float32]), num_parallel_calls=AUTOTUNE) aug, ori = next(iter(train_ds)) class Convnet(Model): def __init__(self): super(Convnet, self).__init__() def downsample(self, filters, size=(3,3), stride=(2,2), padding='same', name=''): model = tf.keras.Sequential(name=name) model.add(Conv2D(filters, size, strides=2, padding=padding)) model.add(BatchNormalization()) # Use batchnormalization model.add(ReLU()) return model def upsample(self, filters, size=(3,3), strides=(2,2), padding='same', name=''): model = tf.keras.Sequential(name=name) model.add(Conv2DTranspose(filters, size, strides=strides, padding=padding)) model.add(BatchNormalization()) model.add(ReLU()) return model def build(self): input = Input(shape=(28,28,1), name='input') h = self.downsample(16, name='downblock_1')(input) h = self.downsample(32, name='downblock_2')(h) h = self.downsample(64, name='downblock_3')(h) h = self.downsample(128, name='downblock_4')(h) encoder_out = GlobalAvgPool2D(name='gap')(h) h = Dense(2*2*128)(encoder_out) h = Reshape((2,2,128))(h) h = self.upsample(64, name='upblock_1')(h) h = self.upsample(32, (4, 4),(1,1), 'valid', name='upblock_2')(h) h = self.upsample(16, name='upblock_3')(h) output = Conv2DTranspose(1, (3,3), (2,2),'same', activation='sigmoid',name='output')(h) return Model(inputs=input, outputs=output) autoencoder = Convnet().build() autoencoder.summary() # tf.keras.utils.plot_model(autoencoder, show_shapes=True, dpi=72) class ShowImage(Callback): def on_epoch_end(self, epoch, logs=None): aug, ori = next(iter(valid_ds)) aug = aug.numpy() ori = ori.numpy() index = np.array([random.choice(range(len(valid_ds))) for i in range(8)]) aug = aug[index] ori = ori[index] gen = autoencoder(aug, training=False).numpy() plt.subplots_adjust(hspace=0.5) for i in range(len(gen)): plt.subplot(4, 6, i*3+1) plt.title('noise') plt.imshow(np.squeeze(aug[i]), cmap='gray') plt.subplot(4, 6, i * 3 + 2) plt.title('original') plt.imshow(np.squeeze(ori[i]), cmap='gray') plt.subplot(4, 6, i * 3 + 3) plt.title('generated') plt.imshow(np.squeeze(gen[i]), cmap='gray') plt.show() autoencoder.compile(Adam(0.001), 'MSE', 'MAE') autoencoder.fit(train_ds, epochs=5, callbacks=[ShowImage()], validation_data=valid_ds) autoencoder.save('denoise_convnet.h5') model = keras.models.load_model('denoise_convnet.h5') model.summary() denoiseResult = model.predict(np.expand_dims(after/255, 0)) plt.subplot(121) plt.imshow(np.squeeze(after), cmap='gray') plt.subplot(122) plt.imshow(np.squeeze(denoiseResult), cmap='gray') plt.show()
普通卷积网络会丢失位置信息,在信息丢失前直接通过捷径传给生成网络可以得到更准确的生成图像
import tensorflow as tf import keras import numpy as np import albumentations as A import copy from keras.models import Model from keras.layers import Input, Conv2D, BatchNormalization, Conv2DTranspose, GlobalAvgPool2D, Dense, Reshape, Concatenate, ReLU import matplotlib.pyplot as plt from keras.optimizers import Adam from keras.callbacks import Callback import random mnist = keras.datasets.mnist (x_train, y_train),(x_valid, y_valid) = mnist.load_data() x_train = np.expand_dims(x_train, -1) x_valid = np.expand_dims(x_valid, -1) transform = A.Compose([ A.GaussNoise((0,10000),p=1) #会溢出吗 ]) train_ds = tf.data.Dataset.from_tensor_slices(x_train) valid_ds = tf.data.Dataset.from_tensor_slices(x_valid) train_ds = train_ds.shuffle(len(train_ds)) train_ds = train_ds.batch(100) valid_ds = valid_ds.batch(100) def process(x): # print(x.shape) #mini-batch? 是的 (100,28,28,1) def augment_function(imgs): aug_imgs = copy.deepcopy(imgs) for i in range(len(imgs)): aug_imgs[i] = transform(image=imgs[i])['image'] aug_imgs = tf.cast(aug_imgs, tf.float32)/255.0 return aug_imgs aug_imgs = tf.numpy_function(augment_function, [x], [tf.float32]) ori_imgs = tf.cast(x, tf.float32)/255.0 return aug_imgs, ori_imgs train_ds = train_ds.map(lambda x: tf.py_function(process, [x], Tout=[tf.float32, tf.float32])) valid_ds = valid_ds.map(lambda x: tf.py_function(process, [x], Tout=[tf.float32, tf.float32])) class Unet(Model): #same: o_w=ceil(i_w/s_w) #valid: o_w=ceil((i_w-f_w+1)/s_w)=ceil((i_w-f_w)/s_w)+1 def __init__(self): super(Unet, self).__init__() self.unet=self.mybuild() def downsample(self, num_filter, name='', size_filter=(3,3), padding='same'): model = keras.Sequential(name=name) model.add(Conv2D(num_filter,size_filter,(2,2),padding,activation='relu')) model.add(BatchNormalization()) # model.add(ReLU()) #add顺序无影响,add(ReLu())与直接加入conv2D在model中层数显式不同,但是最后训练的结果无差别,可通过加入断点查看model属性验证 return model def upsample(self, num_filter, name='', size_filter=(3,3), padding='same', strides=(2,2)): model = keras.Sequential(name=name) model.add(Conv2DTranspose(num_filter,size_filter,strides,padding)) model.add(BatchNormalization()) model.add(ReLU()) return model def mybuild(self): input = Input((28,28,1)) h1 = self.downsample(16, 'db1')(input) h2 = self.downsample(32, 'db2')(h1) h3 = self.downsample(64, 'db3')(h2) h4 = self.downsample(128, 'db4')(h3) gap = GlobalAvgPool2D()(h4) h5 = Dense(512, activation='relu')(gap) h6 = Reshape((2,2,128))(h5) h7 = Concatenate()([h4, h6]) h8 = self.upsample(64, 'ub1')(h7) h9 = Concatenate()([h3, h8]) h10 = self.upsample(32, 'ub2', (4,4), 'valid', (1,1))(h9) h11 = Concatenate()([h2, h10]) h12 = self.upsample(16, 'ub3')(h11) h13 = Concatenate()([h1, h12]) output = self.upsample(1, 'output')(h13) #activation=sigmoid? return Model(inputs=input,outputs=output) unet = Unet().unet unet.compile(optimizer=Adam(learning_rate=0.001), loss='mse', metrics=['mae']) # Custom callback for showing images at the end of epoch class ShowImage(Callback): # When an epoch ends, this function will be called def on_epoch_end(self, epoch, logs=None): org_imgs = np.array([random.choice(x_valid) for i in range(8)], dtype=np.uint8) noise_data = transform(image=org_imgs) noise_img = noise_data['image'] noise_img = tf.cast(noise_img, tf.float32) / 255.0 # imgs = tf.expand_dims(imgs, axis=-1) print(noise_img.shape) gens = unet(noise_img, training=False) fig = plt.figure(figsize=(14, 10)) plt.subplots_adjust(hspace=0.4) for i, img in enumerate(gens.numpy()): noise = noise_img[i] noise = np.squeeze(noise) plt.subplot(4, 6, i*3+1) plt.title('noise') plt.imshow(noise, cmap='gray') org = org_imgs[i] org = np.squeeze(org) plt.subplot(4, 6, i*3+2) plt.title('original') plt.imshow(org, cmap='gray') plt.subplot(4, 6, i*3+3) plt.title('generated') img = np.squeeze(img) plt.imshow(img, cmap='gray') plt.show() unet.fit(train_ds, epochs=5, validation_data=valid_ds, callbacks=[ShowImage()]) before = x_train[0] after = transform(image=before)['image'] denoiseresult = unet.predict(np.expand_dims(after/255, axis=0)) print(denoiseresult.shape) plt.subplot(121) plt.imshow(np.squeeze(after), cmap='gray') plt.subplot(122) # predict返回值shape带有batch维度 plt.imshow(np.squeeze(denoiseresult[0]), cmap='gray')
上述U-net与Convnet均不能训练成错误检测自编码器。
甚至只用一类数字训练5epoch,网络也可对其他类型数字进行降噪,说明所得降噪器是类别无关的。
before = data
after = transform(image=before)['image']
denoiseresult = unet.predict(after/255)
pred=denoiseresult
plt.figure(figsize=(20, 3))
plt.subplots_adjust(wspace=0.8, hspace=1.0)
for i, img in enumerate(after):
plt.subplot(2, 10, i+1)
plt.title(label[i])
plt.imshow(np.squeeze(img))
plt.subplot(2, 10, i+11)
error = tf.reduce_mean(np.linalg.norm(np.matrix(data[i]-pred[i]), 2))
plt.title("{:.2}".format(error))
plt.imshow(np.squeeze(pred[i]))
但是一旦使用了减少层数的Convnet,即embedding vector很长时,所得降噪自编码器变为类别相关。数字结构语义被存储在了浅层结构的DNN中
import tensorflow as tf import keras import numpy as np import albumentations as A import copy from keras.models import Model from keras.layers import Input, Conv2D, BatchNormalization, Conv2DTranspose, GlobalAvgPool2D, Dense, Reshape, Concatenate, ReLU import matplotlib.pyplot as plt from keras.optimizers import Adam from keras.callbacks import Callback import random mnist = keras.datasets.mnist (x_train, y_train),(x_valid, y_valid) = mnist.load_data() idx=np.zeros(10,int) index=0 for i in range(10): while True: if y_valid[index]==i: idx[i]=index break else: index+=1 data=x_valid[idx] data=np.expand_dims(data,-1) label=y_valid[idx] detect=3 idx = [i for i, label in enumerate(y_train) if label==detect] x3_train = np.array([x_train[i] for i in idx]) idx = [i for i, label in enumerate(y_valid) if label==detect] x3_valid = np.array([x_valid[i] for i in idx]) x_train=x3_train x_valid=x3_valid x_train = np.expand_dims(x_train, -1) x_valid = np.expand_dims(x_valid, -1) transform = A.Compose([ A.GaussNoise((0,10000),p=1) #会溢出吗 ]) train_ds = tf.data.Dataset.from_tensor_slices(x_train) valid_ds = tf.data.Dataset.from_tensor_slices(x_valid) train_ds = train_ds.shuffle(len(train_ds)) train_ds = train_ds.batch(100) valid_ds = valid_ds.batch(100) def process(x): # print(x.shape) #mini-batch? 是的 (100,28,28,1) def augment_function(imgs): aug_imgs = copy.deepcopy(imgs) for i,img in enumerate(imgs): aug_imgs[i] = transform(image=img)['image'] aug_imgs = tf.cast(aug_imgs, tf.float32)/255.0 return aug_imgs aug_imgs = tf.numpy_function(augment_function, [x], [tf.float32]) ori_imgs = tf.cast(x, tf.float32)/255.0 return aug_imgs, ori_imgs train_ds = train_ds.map(lambda x: tf.py_function(process, [x], Tout=[tf.float32, tf.float32])) valid_ds = valid_ds.map(lambda x: tf.py_function(process, [x], Tout=[tf.float32, tf.float32])) class Convnet(Model): def __init__(self): super(Convnet, self).__init__() def downsample(self, filters, size=(3,3), stride=(2,2), padding='same', name=''): model = tf.keras.Sequential(name=name) model.add(Conv2D(filters, size, strides=2, padding=padding)) model.add(BatchNormalization()) # Use batchnormalization model.add(ReLU()) return model def upsample(self, filters, size=(3,3), strides=(2,2), padding='same', name=''): model = tf.keras.Sequential(name=name) model.add(Conv2DTranspose(filters, size, strides=strides, padding=padding)) model.add(BatchNormalization()) model.add(ReLU()) return model def build(self): input = Input(shape=(28,28,1), name='input') h = self.downsample(16, name='downblock_1')(input) h = self.downsample(32, name='downblock_2')(h) encoder_out = GlobalAvgPool2D(name='gap')(h) h = Dense(7*7*32)(encoder_out) h = Reshape((7,7,32))(h) h = self.upsample(16, name='upblock_3')(h) output = Conv2DTranspose(1, (3,3), (2,2),'same', activation='sigmoid',name='output')(h) return Model(inputs=input, outputs=output) autoencoder = Convnet().build() # Custom callback for showing images at the end of epoch class ShowImage(Callback): # When an epoch ends, this function will be called def on_epoch_end(self, epoch, logs=None): org_imgs = np.array([random.choice(x_valid) for i in range(8)], dtype=np.uint8) noise_data = transform(image=org_imgs) noise_img = noise_data['image'] noise_img = tf.cast(noise_img, tf.float32) / 255.0 # imgs = tf.expand_dims(imgs, axis=-1) print(noise_img.shape) gens = autoencoder(noise_img, training=False) fig = plt.figure(figsize=(14, 10)) plt.subplots_adjust(hspace=0.4) for i, img in enumerate(gens.numpy()): noise = noise_img[i] noise = np.squeeze(noise) plt.subplot(4, 6, i*3+1) plt.title('noise') plt.imshow(noise, cmap='gray') org = org_imgs[i] org = np.squeeze(org) plt.subplot(4, 6, i*3+2) plt.title('original') plt.imshow(org, cmap='gray') plt.subplot(4, 6, i*3+3) plt.title('generated') img = np.squeeze(img) plt.imshow(img, cmap='gray') plt.show() autoencoder.compile(Adam(0.001), 'MSE', 'MAE') autoencoder.fit(train_ds, epochs=5, callbacks=[ShowImage()], validation_data=valid_ds)
before = data
after = transform(image=before)['image']
denoiseresult = autoencoder.predict(after/255)
pred=denoiseresult
plt.figure(figsize=(20, 3))
plt.subplots_adjust(wspace=0.8, hspace=1.0)
for i, img in enumerate(after):
plt.subplot(2, 10, i+1)
plt.title(label[i])
plt.imshow(np.squeeze(img))
plt.subplot(2, 10, i+11)
error = tf.reduce_mean(np.linalg.norm(np.matrix(data[i]-pred[i]), 2))
plt.title("{:.2}".format(error))
plt.imshow(np.squeeze(pred[i]))
100epoch后的结果
变分推断是统计学常用方法1。在VAE中,编码器生成潜空间。在潜空间中采样获得潜变量。令 x x x为输入数据的随机变量, z z z为潜变量。由联合概率 p ( x , z ) = p ( x ) p ϕ ( z ∣ x ) = p ( z ) p θ ( x ∣ z ) p(x,z)=p(x)p_\phi(z|x)=p(z)p_\theta(x|z) p(x,z)=p(x)pϕ(z∣x)=p(z)pθ(x∣z)有 − l o g p ( z ) p ( x ∣ z ) p ( x ) p ( z ∣ x ) = 0 -log\frac{p(z)p(x|z)}{p(x)p(z|x)}=0 −logp(x)p(z∣x)p(z)p(x∣z)=0,且 p ( z ) p ( x ∣ z ) p ( z ∣ x ) = p ( x ) ≤ 1 \frac{p(z)p(x|z)}{p(z|x)}=p(x)\le1 p(z∣x)p(z)p(x∣z)=p(x)≤1,又输入样本概率与优化参数过程无关,故损失函数为 − l o g p ( z ) p ( x ∣ z ) p ( z ∣ x ) -log\frac{p(z)p(x|z)}{p(z|x)} −logp(z∣x)p(z)p(x∣z),且损失函数中的概率函数在计算时使用概率密度函数替代。假设 Z ∣ X ∼ N ( μ ϕ ( x ) , Σ ϕ ( x ) ) , Z ∼ N ( 0 , 1 ) , X ∣ Z ∼ N ( μ θ ( z ) , Σ θ ( z ) ) Z|X\sim N(\mu_\phi(x),\Sigma_\phi(x)), Z\sim N( 0,1), X|Z\sim N(\mu_\theta(z),\Sigma_\theta(z)) Z∣X∼N(μϕ(x),Σϕ(x)),Z∼N(0,1),X∣Z∼N(μθ(z),Σθ(z))。计算过程中使用再参数化技巧将z的条件采样改为 μ ϕ ( z ) + ϵ Σ ϕ ( z ) , ϵ 采样自 N ( 0 , 1 ) \mu_\phi(z)+\epsilon\Sigma_\phi(z), \epsilon采样自N(0,1) μϕ(z)+ϵΣϕ(z),ϵ采样自N(0,1)使得计算图可反向传播。由于重构图像为神经网络确定性生成且网络未计算 μ θ ( z ) , Σ θ ( z ) \mu_\theta(z),\Sigma_\theta(z) μθ(z),Σθ(z),故使用KL散度代替 − l o g p ( x ∣ z ) -logp(x|z) −logp(x∣z), D K L ( X ∣ ∣ X ′ ) = H X ( X ′ ) − H ( X ) D_{KL}(X||X')=H_X(X')-H(X) DKL(X∣∣X′)=HX(X′)−H(X),即交叉熵减自信息熵,进一步使用交叉熵替代。
import tensorflow as tf import keras import numpy as np from keras.models import Model from keras.layers import Input, Conv2D, Flatten, Dense, BatchNormalization, Reshape, Conv2DTranspose import matplotlib.pyplot as plt from keras.optimizers import Adam import random mnist = keras.datasets.mnist (x_train, y_train),(x_valid, y_valid) = mnist.load_data() x_train = np.expand_dims(np.float32(x_train), -1)/255.0 x_valid = np.expand_dims(np.float32(x_valid), -1)/255.0 train_ds = tf.data.Dataset.from_tensor_slices(x_train) train_ds = train_ds.shuffle(len(train_ds)) train_ds = train_ds.batch(100) class VAE(Model): def __init__(self, n_zdim): super(VAE, self).__init__() self.n_zdim=n_zdim self.encoder = self.encoder_model() self.decoder = self.decoder_model() self.loss_tracker = keras.metrics.Mean(name="loss") def encoder_model(self): input = Input((28,28,1)) # padding 如何影响参数个数 h = Conv2D(32, (3,3), (2,2), "valid", activation="relu")(input) h = BatchNormalization()(h) h = Conv2D(64, (3,3), (2,2), "valid", activation="relu")(h) h = BatchNormalization()(h) h = Flatten()(h) z_mean = Dense(self.n_zdim)(h) z_log_stddev = Dense(self.n_zdim)(h) return Model(inputs=input, outputs=[z_mean, z_log_stddev]) def decoder_model(self): # 增加反卷积层数 减少全连接单元数 总参数减少且一直少于编码器 input = Input(self.n_zdim) h = Dense(7*7*32, activation='relu')(input) h = Reshape((7,7,32))(h) h = BatchNormalization()(h) h = Conv2DTranspose(64, (3,3), (2,2), "same", activation='relu')(h) h = BatchNormalization()(h) h = Conv2DTranspose(32, (3,3), (2,2), "same", activation='relu')(h) h = BatchNormalization()(h) # 最后一层激活函数必须linear 否则loss一直处于高位 因为一开始生成数据都是负数 relu输出全0 无法进行有效反向传播 output = Conv2DTranspose(1, (3,3), (1,1), "same", activation="linear")(h) #Conv2D也可? return Model(input, output) def reparameterization(self, z_mean, z_log_stddev): epsilon = tf.random.normal(tf.shape(self.n_zdim)) return z_mean+epsilon*tf.exp(z_log_stddev) def encode(self, x): mean, log_stddev = self.encoder(x) return mean, log_stddev def decode(self, z): return self.decoder(z) @tf.function def log_normal_pdf(self, sample, mean, log_stddev, raxis=1): # print(sample.shape) #(None, 100) log2pi = tf.math.log(2. * np.pi) return tf.reduce_sum( -log2pi*log_stddev-0.5*(sample-mean)**2.*tf.exp(-2.*log_stddev), axis=raxis ) @tf.function def train_step(self, data): with tf.GradientTape() as tape: z_mean, z_log_stddev = self.encoder(data) z = self.reparameterization(z_mean, z_log_stddev) reconstruct = self.decoder(z) cross_entropy = tf.nn.sigmoid_cross_entropy_with_logits( labels=data, logits=reconstruct) # print(cross_entropy.shape) #(None,28,28,1) cross_entropy = tf.reduce_sum(cross_entropy, axis=[1,2,3]) #(None,) cross_entropy = tf.reduce_mean(cross_entropy) log_pz = self.log_normal_pdf(z,0,0) #N(0,1) shape=(None,) # log_px_z = -tf.reduce_sum(cross_entropy, axis=[1,2,3]) log_qz_x = self.log_normal_pdf(z, z_mean, z_log_stddev) # total_loss = -tf.reduce_mean(log_pz+log_px_z-log_qz_x) #() total_loss = -tf.reduce_mean(log_pz-log_qz_x)+cross_entropy gradients = tape.gradient(total_loss, self.trainable_variables) self.optimizer.apply_gradients(zip(gradients, self.trainable_variables)) self.loss_tracker.update_state(total_loss) return {'loss': self.loss_tracker.result()} model = VAE(100) model.encoder.summary() model.decoder.summary() model.compile(optimizer=Adam(0.001)) model.fit(train_ds, epochs=5) idx = random.randint(0, 9998) # select image index data = x_valid[idx:idx+2] # extract two images z_mu, z_log_stddev = model.encode(data) w_mu = [] w_log_stddev = [] for i in range(11): w_mu.append((1.0 - i * 0.1) * z_mu[0] + (i * 0.1) * z_mu[1]) w_logvar.append((1.0 - i * 0.1) * z_log_stddev[0] + (i * 0.1) * z_log_stddev[1]) w_mu = tf.cast(w_mu, tf.float32) w_log_stddev = tf.cast(w_logvar, tf.float32) z = model.reparameterization(w_mu, w_logvar) # latent vector generation from encoder out outputs = model.decode(z) # Generate image using latent vecotr def to_numpy(tftensorimg): npy = tftensorimg.numpy().reshape(28, 28) max = npy.max() min = npy.min() npy = (npy - min) / (max - min) * 255. return np.uint8(npy) fig = plt.figure(figsize=(22, 2), dpi=72) plt.subplots_adjust(wspace=0.5) for i in range(11): img = to_numpy(outputs[i]) ax = fig.add_subplot(1, 11, i+1) if i == 0: ax.set_title(str(y_valid[idx])) elif i == 10: ax.set_title(str(y_valid[idx+1])) ax.imshow(img, cmap='gray')
本质为零和博弈,记鉴别器的收益函数 V ( D , G ) V(D,G) V(D,G),其目标为最大化收益函数,可控量为鉴别器产生的鉴别函数: max D V ( D , G ) \max \limits_DV(D,G) DmaxV(D,G),同时生成器的收益函数为函数 − V ( D , G ) -V(D,G) −V(D,G),游戏总体来说变为 min G max D V ( D , G ) \min \limits_G \max \limits_DV(D,G) GminDmaxV(D,G)。一般令 V ( D , G ) = E X ∼ p d a t a ( x ) [ l o g D ( X ) ] + E Z ∼ p Z ( z ) [ l o g ( 1 − D ( G ( Z ) ) ) ] V(D,G)=E_{X \sim p_{data}(x)}[logD(X)]+E_{Z \sim p_Z(z)}[log(1-D(G(Z)))] V(D,G)=EX∼pdata(x)[logD(X)]+EZ∼pZ(z)[log(1−D(G(Z)))]。因此在训练时,生成器的loss可设置为负二值交叉熵 − H Z ∼ p Z ( z ) ( 1 − D ( G ( z ) ) -H_{Z \sim p_Z(z)}(1-D(G(z)) −HZ∼pZ(z)(1−D(G(z)),Z为一随机变量,在此例服从 N ( 0 , 1 ) N(0,1) N(0,1),故生成器本质希望交叉熵尽可能大,即 p Z ( z ) p_Z(z) pZ(z)与 1 − D ( G ( z ) ) 1-D(G(z)) 1−D(G(z))尽可能不同,即 p Z ( z ) p_Z(z) pZ(z)与 D ( G ( z ) ) D(G(z)) D(G(z))尽可能相同, p Z ( z ) ≡ 1 p_Z(z) \equiv 1 pZ(z)≡1。通俗理解,生成器期望鉴别器总输出1,以便自己生成的fake图像通过测试。在实践中我们使用随机生成图像确保这点的实现,此时不必关注鉴别器对真实图像的反应。最终生成器的loss设置为 1 × ( − l o g D ( G ( z ) ) ) + 0 × ( − l o g ( 1 − D ( G ( z ) ) ) ) 1\times(-logD(G(z)))+0\times(-log(1-D(G(z)))) 1×(−logD(G(z)))+0×(−log(1−D(G(z)))),即 b c e ( 1 , D ( G ( z ) ) ) bce(1, D(G(z))) bce(1,D(G(z)))。鉴别器同时关心真实图像与fake图像的鉴别结果,因此损失函数为 b c e ( 1 , D ( x ) ) + b c e ( 0 , D ( G ( z ) ) ) bce(1, D(x))+bce(0, D(G(z))) bce(1,D(x))+bce(0,D(G(z)))。
import tensorflow as tf import keras import numpy as np from keras.models import Model from keras.layers import Input, Conv2D, Flatten, Dense, BatchNormalization, Reshape, Conv2DTranspose, LeakyReLU, MaxPooling2D, Dropout from keras.losses import BinaryCrossentropy import matplotlib.pyplot as plt from keras.optimizers import Adam from keras.callbacks import Callback, TensorBoard mnist = keras.datasets.mnist (x_train, y_train), (x_valid, y_valid) = mnist.load_data() x_train = np.expand_dims(np.float32(x_train)/255.0, -1) train_ds = tf.data.Dataset.from_tensor_slices(x_train) train_ds = train_ds.shuffle(len(train_ds)) train_ds = train_ds.batch(100) class DCGAN(Model): def __init__(self): super(DCGAN, self).__init__() self.noise_dim=128 self.bce = BinaryCrossentropy(False) #prediction is not a logits self.generator = self.generator_model() self.discriminator = self.discriminator_model() self.generator.optimizer = Adam(1e-4) self.discriminator.optimizer = Adam(1e-4) self.loss_gen_tracker = tf.keras.metrics.Mean(name="loss_gen") self.loss_dis_tracker = tf.keras.metrics.Mean(name="loss_dis") def deconv(self, filters, size=(3,3), strides=(2,2), padding='same', name=''): model = tf.keras.Sequential(name=name) model.add(Conv2DTranspose(filters, size, strides=strides, padding=padding)) model.add(BatchNormalization()) model.add(LeakyReLU()) return model def generator_model(self): input = Input(self.noise_dim, name="input") h = Dense(7*7*256)(input) h = BatchNormalization()(h) h = LeakyReLU()(h) h = Reshape((7,7,256))(h) h = self.deconv(128, strides = (1, 1))(h) h = self.deconv(64)(h) h = self.deconv(32)(h) output = Conv2DTranspose(1, (3, 3), (1,1), "same", activation="sigmoid")(h) return Model(input, output) def conv(self, filters, size=(3,3), strides=(1,1), padding='same', name=''): model = keras.Sequential(name=name) model.add(Conv2D(filters, size, strides, padding, activation="relu")) model.add(MaxPooling2D((2,2))) model.add(Dropout(0.3)) return model # drop out is necessary for discriminator def discriminator_model(self): input = Input((28,28,1)) h = self.conv(32)(input) h = self.conv(64)(h) h = self.conv(128)(h) h = Flatten()(h) h = Dense(256, "relu")(h) output = Dense(1, "sigmoid")(h) return Model(input, output) @tf.function def train_step(self, data): noise = tf.random.normal((len(data), self.noise_dim)) with tf.GradientTape() as gen_tape, tf.GradientTape() as dis_tape: fake_images = self.generator(noise) fake_output = self.discriminator(fake_images) loss_gen = self.bce(tf.ones_like(fake_output), fake_output) real_output = self.discriminator(data) loss_real = self.bce(tf.ones_like(real_output), real_output) loss_fake = self.bce(tf.zeros_like(fake_output), fake_output) loss_dis = loss_real+loss_fake gen_gradients = gen_tape.gradient(loss_gen, self.generator.trainable_variables) dis_gradients = dis_tape.gradient(loss_dis, self.discriminator.trainable_variables) self.generator.optimizer.apply_gradients(zip(gen_gradients, self.generator.trainable_variables)) self.discriminator.optimizer.apply_gradients(zip(dis_gradients, self.discriminator.trainable_variables)) self.loss_gen_tracker.update_state(loss_gen) self.loss_dis_tracker.update_state(loss_dis) return {"loss_gen":self.loss_gen_tracker.result(), "loss_dis":self.loss_dis_tracker.result()} @property def metrics(self): return [self.loss_gen_tracker, self.loss_dis_tracker] model = DCGAN() model.generator.summary() model.discriminator.summary() # Test the models # show generated image for untrained model noise = tf.random.normal([1, 128]) gen_out = model.generator(noise, training=False) gen_img = np.squeeze(gen_out.numpy()) plt.imshow(gen_img, cmap='gray') fake_out = model.discriminator(gen_out) r_img = tf.expand_dims(x_train[0], axis=0) r_img = tf.expand_dims(r_img, axis=-1) r_img = (tf.cast(r_img, tf.float32) - 127.5) / 127.5 real_out = model.discriminator(r_img, training=False) tf.print('fake:', fake_out, 'real:', real_out) model.compile() tb_cb = TensorBoard("dcgan_logs", 1) class GenerateImage(Callback): # When an epoch ends, this function will be called def on_epoch_end(self, epoch, logs=None): noise = tf.random.normal([16, 128]) gen_out = model.generator(noise, training=False) gen_imgs = gen_out.numpy() fig = plt.figure(figsize=(4, 4)) for i in range(16): plt.subplot(4, 4, i+1) img = np.squeeze(gen_imgs[i]) plt.imshow(img, cmap='gray') #plt.savefig('image_at_epoch_{:04d}.png'.format(epoch)) plt.show() model.fit(train_ds, epochs=20, callbacks=[tb_cb, GenerateImage()]) # Test noise = tf.random.normal([16, 128]) gen_out = model.generator(noise, training=False) gen_imgs = gen_out.numpy() fig = plt.figure(figsize=(4, 4)) plt.subplots_adjust(wspace=0.2) for i in range(16): ax = fig.add_subplot(4,4,i+1) img = np.squeeze(gen_imgs[i]) ax.set_title(f"{i+1}") ax.imshow(img, cmap='gray')
epoch=20,前几个epoch会训练出全0图像,可能是由于0背景占比过大。若由于随机初始点较差使得鉴别器能力过强导致生成器生成纯黑背景且长时间不变可重新训练。
import random import tensorflow as tf import keras import numpy as np from keras.models import Model from keras.layers import Input, Conv2D, Flatten, Dense, BatchNormalization, Reshape, Conv2DTranspose, LeakyReLU, \ MaxPooling2D, Dropout, Concatenate from keras.losses import BinaryCrossentropy import matplotlib.pyplot as plt from keras.optimizers import Adam from keras.callbacks import Callback, TensorBoard mnist = keras.datasets.mnist (x_train, y_train), (x_valid, y_valid) = mnist.load_data() x_train = np.expand_dims(np.float32(x_train) / 255.0, -1) train_ds = tf.data.Dataset.from_tensor_slices((x_train, y_train)) train_ds = train_ds.shuffle(len(train_ds)) train_ds = train_ds.batch(100) class C_DCGAN(Model): def __init__(self): super(C_DCGAN, self).__init__() self.noise_dim = 128 self.bce = BinaryCrossentropy(False) # prediction is not a logits self.initializer = tf.keras.initializers.HeNormal(seed=0) self.loss_gen_tracker = tf.keras.metrics.Mean(name="loss_gen") self.loss_dis_tracker = tf.keras.metrics.Mean(name="loss_dis") self.generator = self.generator_model() self.discriminator = self.discriminator_model() self.generator.optimizer = Adam(1e-4) self.discriminator.optimizer = Adam(1e-4) def deconv(self, filters, size=(3, 3), strides=(2, 2), padding='same', name=''): model = tf.keras.Sequential(name=name) model.add(Conv2DTranspose(filters, size, strides, padding, kernel_initializer=self.initializer)) model.add(BatchNormalization()) model.add(LeakyReLU()) return model def generator_model(self): input_noise = Input(self.noise_dim) input_one_hot_label = Input(10) h = Dense(7 * 7 * 128, kernel_initializer=self.initializer)(input_noise) h = BatchNormalization()(h) h = LeakyReLU()(h) h = Reshape((7, 7, 128))(h) h1 = Dense(7 * 7 * 128, kernel_initializer=self.initializer)(input_one_hot_label) h1 = BatchNormalization()(h1) h1 = LeakyReLU()(h1) h1 = Reshape((7, 7, 128))(h1) h = Concatenate()([h, h1]) h = self.deconv(128, strides=(1, 1))(h) h = self.deconv(64)(h) h = self.deconv(32)(h) output = Conv2DTranspose(1, (3, 3), (1, 1), "same", activation="sigmoid", kernel_initializer=self.initializer)(h) return Model([input_noise, input_one_hot_label], output) def conv(self, filters, size=(3, 3), strides=(1, 1), padding='same', name=''): model = keras.Sequential(name=name) model.add(Conv2D(filters, size, strides, padding, activation="relu", kernel_initializer=self.initializer)) model.add(MaxPooling2D((2, 2))) model.add(Dropout(0.3)) return model # drop out is necessary for discriminator def discriminator_model(self): input_x = Input((28, 28, 1)) input_one_hot_label = Input(10) h = Dense(28*28*1, "relu", kernel_initializer=self.initializer)(input_one_hot_label) h = Reshape((28, 28, 1))(h) # mix: label 的语义本身涵盖在图像之中 h = keras.layers.add([input_x, h]) h = self.conv(32)(h) h = self.conv(64)(h) h = self.conv(128)(h) h = Flatten()(h) h = Dense(256, "relu", kernel_initializer=self.initializer)(h) output = Dense(1, "sigmoid", kernel_initializer=tf.keras.initializers.glorot_normal())(h) return Model([input_x, input_one_hot_label], output) @tf.function def train_step(self, data): x, y = data # data is tuple 'tuple' object has no attribute 'shape' # print(len(data)) # data is a 2-tuple Dataset数据格式为外层是传入的tuple,接下来才是batch维度 one_hot = tf.one_hot(y, 10) #(None, 10) noise = tf.random.normal((len(x), self.noise_dim)) with tf.GradientTape() as gen_tape, tf.GradientTape() as dis_tape: fake_images = self.generator((noise, one_hot)) print(fake_images.shape) fake_output = self.discriminator((fake_images, one_hot)) loss_gen = self.bce(tf.ones_like(fake_output), fake_output) real_output = self.discriminator((x, one_hot)) loss_real = self.bce(tf.ones_like(real_output), real_output) loss_fake = self.bce(tf.zeros_like(fake_output), fake_output) loss_dis = loss_real + loss_fake gen_gradients = gen_tape.gradient(loss_gen, self.generator.trainable_variables) dis_gradients = dis_tape.gradient(loss_dis, self.discriminator.trainable_variables) self.generator.optimizer.apply_gradients(zip(gen_gradients, self.generator.trainable_variables)) self.discriminator.optimizer.apply_gradients(zip(dis_gradients, self.discriminator.trainable_variables)) self.loss_gen_tracker.update_state(loss_gen) self.loss_dis_tracker.update_state(loss_dis) return {"loss_gen": self.loss_gen_tracker.result(), "loss_dis": self.loss_dis_tracker.result()} @property def metrics(self): return [self.loss_gen_tracker, self.loss_dis_tracker] model = C_DCGAN() model.generator.summary() model.discriminator.summary() model.compile() tb_cb = TensorBoard("c_dcgan_logs", 1) # Test the models # show generated image for untrained model label = np.random.randint(0,10,(1)) onehot = tf.one_hot(label, depth=10) noise = tf.random.normal([1, 128]) gen_out = model.generator((noise, onehot), training=False) gen_img = np.squeeze(gen_out.numpy()) plt.imshow(gen_img, cmap='gray') fake_out = model.discriminator((gen_out, onehot)) r_img = tf.expand_dims(x_train[0], axis=0) r_img = tf.expand_dims(r_img, axis=-1) r_img = (tf.cast(r_img, tf.float32) - 127.5) / 127.5 real_out = model.discriminator((r_img, onehot), training=False) tf.print('fake:', fake_out, 'real:', real_out) # Custom callback class GenerateImage(Callback): # When an epoch ends, this function will be called def on_epoch_end(self, epoch, logs=None): noise = tf.random.normal([100, 128]) onehot = tf.one_hot(tf.repeat(tf.range(10), repeats=10), depth=10) gen_out = model.generator((noise, onehot), training=False) gen_imgs = gen_out.numpy() fig = plt.figure(figsize=(12, 12)) for i in range(100): plt.subplot(10, 10, i+1) img = np.squeeze(gen_imgs[i]) plt.imshow(img, cmap='gray') #plt.savefig('image_at_epoch_{:04d}.png'.format(epoch)) plt.show() model.fit(train_ds, epochs=20, callbacks=[tb_cb, GenerateImage()]) noise = tf.random.normal([100, 128]) onehot = tf.one_hot(tf.repeat(tf.range(10), repeats=10), depth=10) gen_out = model.generator((noise, onehot), training=False) gen_imgs = gen_out.numpy() fig = plt.figure(figsize=(12, 12)) for i in range(100): plt.subplot(10, 10, i+1) img = np.squeeze(gen_imgs[i]) plt.imshow(img, cmap='gray') #plt.savefig('image_at_epoch_{:04d}.png'.format(epoch)) plt.show()
epoch=18
epoch=20
该代码来自b站up:耿大哥讲算法
后续可能会根据思路重新书写该代码
import numpy as np from PIL import Image from sklearn.cluster import KMeans import matplotlib.pyplot as plt import matplotlib matplotlib.rc('font',family='Microsoft YaHei') #导入图片 转为jpg格式 img=Image.open('洞庭湖.png') p=Image.new('RGB', img.size, (255,255,255)) p.paste(img, (0,0,img.size[0],img.size[1]),img) img_old=np.array(p) hang,lie,dims=img_old.shape print('这张图片的像素矩阵有'+str(hang)+'行,'+str(lie)+'列,'+str(dims)+'通道。') #构建原始像素矩阵 R=img_old[:,:,0];G=img_old[:,:,1];B=img_old[:,:,2] #像素拼接 list_R,list_G,list_B=[],[],[] for i in range(hang): for j in range(lie): list_R.append(R[i,j]);list_G.append(G[i,j]);list_B.append(B[i,j]) print('像素拼接完成!') #构建原始数据矩阵A A=np.zeros(shape=(dims,len(list_G))) for i in range(A.shape[1]): A[0,i]=list_R[i];A[1,i]=list_G[i];A[2,i]=list_B[i] #k-means聚类 k=4 kmeans=KMeans(n_clusters=k,init='random',n_init=50,tol=0.001) kmeans.fit(A.T) label=list(kmeans.labels_) core=np.array([[0,255,255,255], [255,0,102,204], [255,255,0,153]]) print('k-means聚类完成!') #构建数据矩阵B B=np.zeros(shape=(A.shape[0],A.shape[1])) for i in range(A.shape[0]): for j in range(A.shape[1]):B[i,j]=core[i,label[j]] R_new=np.zeros(shape=(hang,lie)) G_new=np.zeros(shape=(hang,lie)) B_new=np.zeros(shape=(hang,lie)) for i in range(hang): for j in range(lie): R_new[i,j]=B[0,lie*i+j];G_new[i,j]=B[1,lie*i+j];B_new[i,j]=B[2,lie*i+j] img_new=np.zeros(shape=(hang,lie,dims)) for i in range(hang): for j in range(lie): img_new[i,j,0]=R_new[i,j];img_new[i,j,1]=G_new[i,j];img_new[i,j,2]=B_new[i,j] print('新的像素矩阵合成完毕!') print('图片总面积约等于'+str(60*50)+'平方公里。') print('耕地面积约等于'+str('%.2f'%(60*50/(hang*lie)*label.count(label[1]))+'平方公里。')) print('洪泽湖面积约等于'+str('%.2f'%(60*50-60*50/(hang*lie)*label.count(label[1]))+'平方公里。')) #图片输出 plt.subplot(1,2,1);plt.xlabel('分割前的图像',fontsize=18);plt.imshow(img_old/255) plt.subplot(1,2,2);plt.xlabel('分割后的图像',fontsize=18);plt.imshow(img_new/255) plt.show()
网球挥拍3轴加速度时间序列
import pathlib import numpy as np import tensorflow as tf from tensorflow.keras.preprocessing.sequence import pad_sequences from keras.layers import Input, Masking, LSTM, Dropout, Dense from tensorflow.compat.v1.keras.layers import CuDNNLSTM from keras.models import Model from keras.regularizers import l2 from keras.losses import SparseCategoricalCrossentropy from keras.metrics import SparseCategoricalAccuracy from keras.callbacks import TensorBoard, EarlyStopping, ReduceLROnPlateau import matplotlib.pyplot as plt from keras.optimizers import Adam from keras import initializers def mk_file_list(base_dir): path_base_dir = pathlib.Path(base_dir) return [str(p) for p in sorted(path_base_dir.glob('*.txt'))] list_train = mk_file_list('tennis/data/train') list_valid = mk_file_list('tennis/data/valid') # Try to visualize a swing data data_file = list_train[10] with open(data_file, 'r') as f: lines = f.readlines() # load data data = [l.strip().split()[1:] for l in lines] data = np.array(data, dtype=np.float32) # To NumPy # Show graph plt.plot(data[:, 0], label='x') # X axis plt.plot(data[:, 1], label='y') # Y plt.plot(data[:, 2], label='z') # Z plt.legend() plt.grid() # Grid plt.title(data_file) # File name plt.show() def get_label(filename): return LABEL.index(pathlib.Path(filename).name.split('_')[0]) #可以优化成int8 def make_padding_data(data_files): batch_data = [] label_data = [] for d in data_files: filename = bytes.decode(d.numpy()) with open(filename, 'r') as f: lines = f.readlines() data = [l.strip().split()[1:] for l in lines] #split() 默认参数是? data = np.array(data, dtype=np.float32) data = (data-np.mean(data))/np.std(data) batch_data.append(data) label_data.append(get_label(filename)) batch_data = pad_sequences(batch_data, padding="post", value=0, dtype=np.float32) return batch_data, label_data LABEL=['A','B','C','D','E','F','G','H','I','J'] batch_size=20 train_ds = tf.data.Dataset.list_files(list_train) valid_ds = tf.data.Dataset.list_files(list_valid) train_ds = train_ds.shuffle(len(list_train)) valid_ds = valid_ds.shuffle(len(list_valid)) train_ds = train_ds.batch(batch_size) valid_ds = valid_ds.batch(batch_size) train_ds = train_ds.map(lambda x: tf.py_function( make_padding_data, [x], Tout=[tf.float32, tf.int32] )) valid_ds = valid_ds.map(lambda x: tf.py_function( make_padding_data, [x], Tout=[tf.float32, tf.int32] )) input = Input((None, 3), name="input") h = Masking(0)(input) # h = LSTM(128, return_sequences=False, kernel_regularizer=l2(1e-4))(h) h = LSTM(128, return_sequences=False, kernel_regularizer=l2(1e-4), kernel_initializer=initializers.HeNormal())(h) h = Dropout(0.5)(h) h = Dense(128, activation="relu", kernel_regularizer=l2(1e-4), kernel_initializer=initializers.GlorotNormal())(h) h = Dropout(0.5)(h) output = Dense(10, activation="softmax", kernel_regularizer=l2(1e-4))(h) model = Model(input, output) model.summary() model.compile(Adam(1e-4),SparseCategoricalCrossentropy(), [SparseCategoricalAccuracy()]) cb_tb = TensorBoard(log_dir="logs_swing", histogram_freq=1) cb_es = EarlyStopping("val_loss", patience=10) reduce_lr = ReduceLROnPlateau("val_loss", 0.5, 5, 0, min_lr=1e-5) model.fit(train_ds, epochs=100, validation_data=valid_ds, callbacks=[cb_tb, cb_es, reduce_lr], use_multiprocessing=True) file = 'tennis/data/valid/A_10.txt' with open(file, 'r') as f: lines = f.readlines() data = [l.strip().split()[1:] for l in lines] data = np.expand_dims(np.array(data, dtype=np.float32), axis=0) data = (data - np.mean(data)) / np.std(data) pred = model.predict(data) LABEL[pred.argmax()]
!pip install tensorflow_addons import numpy as np import csv import tensorflow as tf from keras.layers import Input, Conv2D, BatchNormalization, Dropout, Reshape, LSTM, Dense from keras.regularizers import l2 from keras.models import Model import tensorflow_addons as tfa from tensorflow.keras.losses import SparseCategoricalCrossentropy from tensorflow.keras.metrics import SparseCategoricalAccuracy from tensorflow.keras.callbacks import TensorBoard, LearningRateScheduler history = 100 # use 100 days history for prediction N = 10 # Predict exchange 10 days after from the last day of the history exclude_days = 30 # the last 30 days are excluded because of testing data = [] # for store the loaded data with open('usdjpy_d.csv', 'r') as f: reader = csv.reader(f) # CSV file open data = [row[1:] for row in reader] # Load data from the CSV file data.pop(0) # Remove the header data = np.array(data, dtype=np.float32) # Convert to NumPy orgdata = data data = (data - data.min()) / (data.max() - data.min()) # Normalization train_idx = np.arange(history+N, len(data)-1-exclude_days).astype(np.int32) # Make index file np.random.shuffle(train_idx) # Random shuffle train_idx, valid_idx = np.split(train_idx, [int(len(data) * 0.9)]) # Split dataset: 90% for training, 10% for testing def make_inputdata(idx): inputs = [] targets = [] for i in idx: his_data = data[i.numpy()-N-history : i.numpy()-N] inputs.append(his_data) if data[i.numpy()][-1] < data[i.numpy()-N][-1]: targets.append(0) else: targets.append(1) inputs = np.expand_dims(np.array(inputs), -1) return inputs, targets train_ds = tf.data.Dataset.from_tensor_slices(train_idx) train_ds = train_ds.batch(50) train_ds = train_ds.map(lambda x: tf.py_function( make_inputdata, [x], Tout=[tf.float32, tf.float32] )) valid_ds = tf.data.Dataset.from_tensor_slices(valid_idx) valid_ds = valid_ds.batch(100) valid_ds = valid_ds.map(lambda x: tf.py_function( make_inputdata, [x], Tout=[tf.float32, tf.float32] )) def ExchangeModel(history=100, u_units=256, seed=0): init = tf.keras.initializers.HeNormal(seed) input = Input((history, 4, 1), name="input") h = Conv2D(32, (20, 2), (4, 1), activation="linear", kernel_initializer=init, kernel_regularizer=l2(1e-4), name="cnn1")(input) h = BatchNormalization()(h) h = Dropout(0.5)(h) h = Conv2D(64, (10, 2), (1, 1), activation="linear", kernel_initializer=init, kernel_regularizer=l2(1e-4), name="cnn2")(h) h = BatchNormalization()(h) h = Dropout(0.5)(h) h = Conv2D(128, (5, 2), (1, 1), activation="linear", kernel_initializer=init, kernel_regularizer=l2(1e-4), name="cnn3")(h) h = BatchNormalization()(h) h = Dropout(0.5)(h) h = Reshape((8, 128), input_shape=(8, 1, 128))(h) h = LSTM(u_units, return_sequences=True, kernel_initializer=init, kernel_regularizer=l2(1e-4), name="lstm1")(h) h = Dropout(0.5)(h) h = LSTM(u_units, return_sequences=False, kernel_initializer=init, kernel_regularizer=l2(1e-4), name="lstm2")(h) h = Dense(u_units, "linear", kernel_initializer=init, kernel_regularizer=l2(1e-4), name="fc")(h) h = Dropout(0.5)(h) output = Dense(2, "softmax", kernel_initializer=init, kernel_regularizer=l2(1e-4), name="output")(h) model = Model(input, output) return model model = ExchangeModel() model.compile(optimizer=tfa.optimizers.LAMB(0.0001), loss=SparseCategoricalCrossentropy(), metrics=SparseCategoricalAccuracy()) model.summary() def lr_schedule(epoch, lr): # Note: epoch starts at zero if epoch != 0 and ( epoch % 800 or epoch % 1000 ) == 0: return lr * 0.5 else: return lr lr_cb = LearningRateScheduler(lr_schedule, verbose=1) # dynamic learning rate change depending on epoch tb_cb = TensorBoard(log_dir='exchange_logs', histogram_freq=1) model.fit(train_ds, initial_epoch=0, epochs=1200, validation_data=valid_ds, callbacks=[tb_cb], use_multiprocessing=True) data = [] days = [] with open('usdjpy_d.csv', 'r') as f: reader = csv.reader(f) for row in reader: days.append(row[0]) data.append(row[1:]) data.pop(0) # remove the header days.pop(0) evaldata = [] label = [] cls =['up', 'down'] lastidx = len(data) - 1 for i in range(lastidx-exclude_days+1, lastidx+1): # from befor exlude_day to the latest his_data = data[i-history-N : i-N] if data[i-N][-1] > data[i][-1]: label.append(0) # Yen up else: label.append(1) # Yen down eval = np.expand_dims(np.array(his_data), -1) evaldata.append(eval) evaldata = np.array(evaldata, dtype=np.float32) # print(len(evaldata)) num_corr = 0 # num. of correction evaldata = (evaldata - evaldata.min()) / (evaldata.max() - evaldata.min()) # Normalization r = model.predict(evaldata) for i, result in enumerate(model.predict(evaldata)): est = np.argmax(result) if est == label[i]: # if correct num_corr += 1 print(f'Result for {days[lastidx-exclude_days+1+i]}: {cls[est]} (True: {cls[label[i]]})') print('=== Prediction Result ===') print(f'Num. of evaluation data: {i+1}, correction: {num_corr}, correct rate {num_corr/(i+1)*100}%')
import keras.backend import tensorflow as tf from keras.layers import StringLookup import os import glob from scipy.io.wavfile import read import librosa from tensorflow.keras.preprocessing.sequence import pad_sequences from tensorflow.keras.layers import Input, Dense, Dropout, LSTM, Bidirectional, Masking from tensorflow.keras.models import Model from tensorflow.keras.optimizers import Adam from tensorflow.keras.callbacks import Callback, TensorBoard import numpy as np # The set of characters accepted in the transcription. characters = [x for x in 'efghinorstuvwx'] # limited the alphabet set consisting of number words (from "one" to "nine") # Mapping characters to integers char_to_num = StringLookup(vocabulary=characters, oov_token='') # Mapping integers back to original characters num_to_char = StringLookup(vocabulary=char_to_num.get_vocabulary(), oov_token="", invert=True) print( f'The vocabulary set is: {char_to_num.get_vocabulary()}' f' (size={char_to_num.vocabulary_size()})' ) def get_filelist_and_label(dir): filelist = [] labels = [] for d in os.listdir(dir): for f in glob.glob(os.path.join(dir, d, "*.wav")): filelist.append(f) labels.append(d) return filelist, labels train_list, train_labels = get_filelist_and_label(r"numvoice\train") valid_list, valid_labels = get_filelist_and_label(r"numvoice\test") def wav2mfcc(wav_files, labels): x=[] y=[] for wav in wav_files: file = bytes.decode(wav.numpy()) rate, samples = read(file) feat = librosa.feature.mfcc(y=np.float32(samples), sr=rate, n_mfcc=13, n_fft=512, hop_length=160) f = np.average(feat) s = np.std(feat) feat = (feat.T-f)/s x.append(feat) for label in labels: label = tf.strings.unicode_split(label, "UTF-8") y.append(char_to_num(label)) x = pad_sequences(x, padding="post", value=.0, dtype=np.float32) y = pad_sequences(y, padding="post", value=0, dtype=np.int32) return x, y batch_size = 20 train_ds = tf.data.Dataset.from_tensor_slices((train_list, train_labels)) valid_ds = tf.data.Dataset.from_tensor_slices((valid_list, valid_labels)) train_ds = train_ds.shuffle(len(train_ds)) train_ds = train_ds.padded_batch(batch_size) # make minibatch train_ds = train_ds.map(lambda wav_files, labels: tf.py_function( wav2mfcc, [wav_files, labels], Tout=[tf.float32, tf.int32] )) # x,y = next(iter(train_ds)) # single_wav2mfcc(x, y) def asr_model(): input = Input((None, 13), name="input") h = Masking(mask_value=0.0, input_shape=(None, 13))(input) h = Bidirectional(LSTM(256, return_sequences=True), name="bilstm")(h) h = Dropout(0.5)(h) h = Dense(512, "relu", name="fc")(h) h = Dropout(0.5)(h) output = Dense(16, "softmax", name="output")(h) return Model(input, output) model = asr_model() model.summary() def CTC_Loss(y_true, y_pred): len_batch = tf.cast(tf.shape(y_true)[0], dtype="int32") len_input = tf.cast(tf.shape(y_pred)[1], dtype="int32") len_label = tf.cast(tf.shape(y_true)[1], dtype="int32") len_input = len_input * tf.ones((len_batch, 1), dtype="int32") #shape=(len_batch, 1) elements are len_input 每个元素都需告诉长度 len_label = len_label * tf.ones((len_batch, 1), dtype="int32") loss = keras.backend.ctc_batch_cost(y_true, y_pred, len_input, len_label) return loss # Callback for evaluation at each epoch end # Decording (best path search) from the output of the model def decode_batch_predictions(pred): input_len = np.ones(pred.shape[0]) * pred.shape[1] # Use greedy search for get the best path results = keras.backend.ctc_decode(pred, input_length=input_len, greedy=True)[0][0] # Iterate over the results in the minibatch and get back the text output_text = [] for result in results: result = tf.strings.reduce_join(num_to_char(result)).numpy().decode("utf-8") output_text.append(result) return output_text class ASR_Test(Callback): # When an epoch ends, this function will be called def on_epoch_end(self, epoch, logs=None): predictions = [] # store prediction results targets = [] # ground truth for batch in valid_ds: # evaluate all the sentences in the validation dataset x, y = batch preds = model.predict(x) # mini-batch prediction preds = decode_batch_predictions(preds) # decode the model output predictions.extend(preds) for label in y: label = tf.strings.reduce_join(num_to_char(label)).numpy().decode("utf-8") targets.append(label) cer_score = cer(targets, predictions) # calculate phoneme error rate using the jiwer library print("-" * 100) print(f"Character Error Rate: {cer_score:.4f}") print("-" * 100) # Rondomly select 5 sentence from the validation dataset for i in np.random.randint(0, len(predictions), 5): print(f"Target : {targets[i]}") print(f"Prediction: {predictions[i]}") print("-" * 100) model.compile(optimizer=Adam(0.0001), loss=CTC_Loss) # set a optimizer and CTC loss # run training model.fit(train_ds, validation_data=valid_ds, initial_epoch=0, epochs=50, callbacks=[ASR_Test()]) # ASR test file = 'numvoice/test/eight/15574821_nohash_1.wav' def getmfcc(wav_file): rate, samples = read(wav_file) feat = librosa.feature.mfcc(y=np.float32(samples), sr=rate, n_mfcc=13, n_fft=512, hop_length=160) f = np.average(feat) s = np.std(feat) feat = (feat.T - f) / s # transformation and normalization return feat feat = getmfcc(file) feat = np.expand_dims(feat, axis=0) # add batch dim. preds = model2.predict(feat) # mini-batch prediction preds = decode_batch_predictions(preds) # decode the model output print('ASR result:', preds)
import tensorflow as tf import numpy as np import librosa import matplotlib.pyplot as plt # for draw images from tensorflow import keras from tensorflow.keras.layers import Input, Dense, Dropout, LSTM, Bidirectional, Masking, MultiHeadAttention, LayerNormalization from tensorflow.keras.models import Model from tensorflow.keras.optimizers import Adam from tensorflow.keras.callbacks import TensorBoard, ModelCheckpoint from tensorflow.keras.callbacks import LearningRateScheduler from tensorflow.keras.layers import StringLookup # A preprocessing layer which maps string features to integer indices from tensorflow.keras.preprocessing.sequence import pad_sequences from tensorflow.keras.callbacks import Callback, TensorBoard import matplotlib.pyplot as plt from scipy.io.wavfile import read import os import glob from jiwer import cer # for caluculating character error rate tf.test.gpu_device_name() # confirm for using GPU # The set of characters accepted in the transcription. characters = [x for x in 'efghinorstuvwxz'] # limited the alphabet set consisting of number words (from "one" to "nine") # Mapping characters to integers char_to_num = StringLookup(vocabulary=characters, oov_token='') # Mapping integers back to original characters num_to_char = StringLookup(vocabulary=char_to_num.get_vocabulary(), oov_token="", invert=True) print( f'The vocabulary set is: {char_to_num.get_vocabulary()}' f' (size={char_to_num.vocabulary_size()})' ) # Get the file list and labels def get_filelist_and_label(dir): file_list = [] label_list = [] for d in os.listdir(dir): for f in glob.glob(os.path.join(dir, d, '*.wav')): file_list.append(f) label_list.append(d) return file_list, label_list train_list, train_labels = get_filelist_and_label('numvoice/train') valid_list, valid_labels = get_filelist_and_label('numvoice/test') print(train_labels) # wave file loading and calculate MFCC def wav2mfcc(wav_files, labels): x = [] # for MFCC data record y = [] # for label data record for wav in wav_files: file = bytes.decode(wav.numpy()) rate, samples = read(file) mfcc = librosa.feature.mfcc(y=np.float32(samples), sr=rate, n_mfcc=13, n_fft=512, hop_length=160) m = np.average(mfcc) s = np.std(mfcc) mfcc = (mfcc.T - m) / s # transformation and normalization x.append(mfcc) for label in labels: label = tf.strings.unicode_split(label, input_encoding='UTF-8') y.append(char_to_num(label)) x = pad_sequences(x, padding='post', value=0.0, dtype=np.float32) # zero padding y = pad_sequences(y, padding='post', value=0, dtype=np.int32) # zero padding return x, y batch_size = 20 train_ds = tf.data.Dataset.from_tensor_slices((train_list, train_labels)) valid_ds = tf.data.Dataset.from_tensor_slices((valid_list, valid_labels)) train_ds = train_ds.shuffle(len(train_ds)) train_ds = train_ds.padded_batch(batch_size) train_ds = train_ds.map(lambda wav_files, labels: tf.py_function( wav2mfcc, [wav_files, labels], Tout=[tf.float32, tf.int32] )) valid_ds = valid_ds.padded_batch(batch_size) valid_ds = valid_ds.map(lambda wav_files, labels: tf.py_function( wav2mfcc, [wav_files, labels], Tout=[tf.float32, tf.int32] )) def asr_model(): input = Input((None, 13), name="input") h = Masking(mask_value=0.0, input_shape=(None, 3))(input) h = Dense(128)(h) mha = MultiHeadAttention(num_heads=4, key_dim=128)(h, h) mha = Dropout(0.1)(mha) out1 = LayerNormalization(epsilon=1e-6)(mha+h) #mha+h ffn = Dense(128)(out1) ffn = Dropout(0.1)(ffn) out2 = LayerNormalization(epsilon=1e-6)(ffn+out1) h = Bidirectional(LSTM(128, return_sequences=True))(out2) h = Dropout(0.1)(h) h = Dense(128, "relu")(h) h = Dropout(0.1)(h) output = Dense(16, "softmax")(h) return Model(input, output) model = asr_model() model.summary() tf.keras.utils.plot_model(model, show_shapes=True, show_layer_names=True, to_file='asr_transformer.png') # Definition of CTC loss def CTC_Loss(y_true, y_pred): # Compute the training-time loss value batch_len = tf.cast(tf.shape(y_true)[0], dtype='int32') # calcurate batch_len with tensor format input_length = tf.cast(tf.shape(y_pred)[1], dtype='int32') label_length = tf.cast(tf.shape(y_true)[1], dtype='int32') input_length = input_length * tf.ones(shape=(batch_len, 1), dtype='int32') label_length = label_length * tf.ones(shape=(batch_len, 1), dtype='int32') # CTC loss calculation: using the ctc function loss = keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length) return loss # Callback for evaluation at each epoch end # Decording (best path search) from the output of the model def decode_batch_predictions(pred): input_len = np.ones(pred.shape[0]) * pred.shape[1] # Use greedy search results = keras.backend.ctc_decode(pred, input_length=input_len, greedy=True)[0][0] # Iterate over the results in the minibatch and get back the text output_text = [] for result in results: result = tf.strings.reduce_join(num_to_char(result)).numpy().decode("utf-8") output_text.append(result) return output_text class ASR_Test(Callback): # When an epoch ends, this function will be called def on_epoch_end(self, epoch, logs=None): predictions = [] # store prediction results targets = [] # ground truth for batch in valid_ds: # evaluate all the sentences in the validation dataset x, y = batch preds = model.predict(x) # mini-batch prediction preds = decode_batch_predictions(preds) # decode the model output predictions.extend(preds) for label in y: label = tf.strings.reduce_join(num_to_char(label)).numpy().decode("utf-8") targets.append(label) cer_score = cer(targets, predictions) # calculate phoneme error rate using the jiwer library print("-" * 100) print(f"Character Error Rate: {cer_score:.4f}") print("-" * 100) # Rondomly select 5 sentence from the validation dataset for i in np.random.randint(0, len(predictions), 5): print(f"Target : {targets[i]}") print(f"Prediction: {predictions[i]}") print("-" * 100) # training loop model.compile(optimizer=Adam(0.0001), loss=CTC_Loss) model.fit(train_ds, validation_data=valid_ds, initial_epoch=0, epochs=50, callbacks=[ASR_Test()])
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。