赞
踩
使用的百度的paddle框架,在AIstudio上面运行本次任务。
#一、库与数据准备 import sys sys.path.append('/home/aistudio/external-libraries') ## 定义数据读取 import paddle import paddle.fluid as fluid from paddle.fluid.dygraph import Conv2D, Pool2D, Linear, Conv2DTranspose import numpy as np import matplotlib.pyplot as plt # 噪声维度 Z_DIM = 100 BATCH_SIZE = 128 # BATCH_SIZE = 3 # debug # 噪声生成,通过由噪声来生成假的图片数据输入。 def z_reader(): while True: yield np.random.normal(0.0, 1.0, (Z_DIM, 1, 1)).astype('float32') # 生成真实图片reader mnist_generator = paddle.batch( paddle.reader.shuffle(paddle.dataset.mnist.train(), 30000), batch_size=BATCH_SIZE) # 生成假图片的reader z_generator = paddle.batch(z_reader, batch_size=BATCH_SIZE) ## import matplotlib.pyplot as plt %matplotlib inline data_tmp = next(mnist_generator()) print('一个batch图片数据的形状:batch_size =', len(data_tmp), ', data_shape =', data_tmp[0][0].shape, ', num = ', data_tmp[0][1]) plt.imshow(data_tmp[0][0].reshape(28, 28)) plt.show() z_tmp = next(z_generator()) print('一个batch噪声z的形状:batch_size =', len(z_tmp), ', data_shape =', z_tmp[0].shape)
## 定义CGAN # 定义特征图拼接 def conv_concatenate(x, y): # print('---', x.shape, y.shape) # y = fluid.dygraph.to_variable(y.numpy().astype('float32')) if len(x.shape) == 2: # 给全连接层输出的特征图拼接噪声 y = fluid.layers.reshape(y, shape=[x.shape[0], 1]) ones = fluid.layers.fill_constant(y.shape, dtype='float32', value=1.0) elif len(x.shape) == 4: # 给卷积层输出的特征图拼接噪声 y = fluid.layers.reshape(y, shape=[x.shape[0], 1, 1, 1]) ones = fluid.layers.fill_constant(x.shape, dtype='float32', value=1.0) x = fluid.layers.concat([x, ones * y], axis=1) # print(ones.shape, x.shape, y.shape, '---') return x # 定义生成器 class G(fluid.dygraph.Layer): def __init__(self, name_scope): super(G, self).__init__(name_scope) name_scope = self.full_name() # 第一组全连接和BN层 self.fc1 = Linear(input_dim=100+1, output_dim=1024) self.bn1 = fluid.dygraph.BatchNorm(num_channels=1024, act='relu') # 第二组全连接和BN层 self.fc2 = Linear(input_dim=1024+1, output_dim=128*7*7) self.bn2 = fluid.dygraph.BatchNorm(num_channels=128*7*7, act='relu') # 第一组转置卷积运算 self.convtrans1 = Conv2DTranspose(256, 64, 4, stride=2, padding=1) self.bn3 = fluid.dygraph.BatchNorm(64, act='relu') # 第二组转置卷积运算 self.convtrans2 = Conv2DTranspose(128, 1, 4, stride=2, padding=1, act='relu') def forward(self, z, label): z = fluid.layers.reshape(z, shape=[-1, 100]) z = conv_concatenate(z, label) # 拼接噪声和label y = self.fc1(z) y = self.bn1(y) y = conv_concatenate(y, label) # 拼接特征图和label y = self.fc2(y) y = self.bn2(y) y = fluid.layers.reshape(y, shape=[-1, 128, 7, 7]) y = conv_concatenate(y, label) # 拼接特征图和label y = self.convtrans1(y) #print('G第一次transpose:',y.shape) y = self.bn3(y) y = conv_concatenate(y, label) # 拼接特征图和label y = self.convtrans2(y) #print('G第2次transpose:',y.shape) return y # 定义判别器 class D(fluid.dygraph.Layer): def __init__(self, name_scope): super(D, self).__init__(name_scope) name_scope = self.full_name() # 第一组卷积池化 self.conv1 = Conv2D(num_channels=2, num_filters=64, filter_size=3) self.bn1 = fluid.dygraph.BatchNorm(num_channels=64, act='leaky_relu') self.pool1 = Pool2D(pool_size=2, pool_stride=2) # 第二组卷积池化 self.conv2 = Conv2D(num_channels=128, num_filters=128, filter_size=3) self.bn2 = fluid.dygraph.BatchNorm(num_channels=128, act='leaky_relu') self.pool2 = Pool2D(pool_size=2, pool_stride=2) # 全连接输出层 self.fc1 = Linear(input_dim=128*5*5+1, output_dim=1024) self.bnfc1 = fluid.dygraph.BatchNorm(num_channels=1024, act='leaky_relu') self.fc2 = Linear(input_dim=1024+1, output_dim=1) def forward(self, img, label): y = conv_concatenate(img, label) # 拼接输入图片和label y = self.conv1(y) y = self.bn1(y) y = self.pool1(y) y = conv_concatenate(y, label) # 拼接特征图和label y = self.conv2(y) y = self.bn2(y) y = self.pool2(y) y = fluid.layers.reshape(y, shape=[-1, 128*5*5]) y = conv_concatenate(y, label) # 拼接特征图和label y = self.fc1(y) #print('D第一次transpose:',y.shape) y = self.bnfc1(y) y = conv_concatenate(y, label) # 拼接特征图和label y = self.fc2(y) #print('D第2次transpose:',y.shape) return y ## 测试生成网络G和判别网络D with fluid.dygraph.guard(): g_tmp = G('G') l_tmp = fluid.dygraph.to_variable(np.array([x[1] for x in data_tmp]).astype('float32')) tmp_g = g_tmp(fluid.dygraph.to_variable(np.array(z_tmp)), l_tmp).numpy() print('生成器G生成图片数据的形状:', tmp_g.shape) plt.imshow(tmp_g[0][0]) plt.show() d_tmp = D('D') tmp_d = d_tmp(fluid.dygraph.to_variable(tmp_g), l_tmp).numpy() print('判别器D判别生成的图片的概率数据形状:', tmp_d.shape)
## 定义显示图片的函数,构建一个18*n大小(n=batch_size/16)的图片阵列,把预测的图片打印到note中。 ## import matplotlib.pyplot as plt %matplotlib inline def show_image_grid(images, batch_size=128, pass_id=None): fig = plt.figure(figsize=(8, batch_size/32)) fig.suptitle("Pass {}".format(pass_id)) gs = plt.GridSpec(int(batch_size/16), 16) gs.update(wspace=0.05, hspace=0.05) for i, image in enumerate(images): ax = plt.subplot(gs[i]) plt.axis('off') ax.set_xticklabels([]) ax.set_yticklabels([]) ax.set_aspect('equal') plt.imshow(image[0], cmap='Greys_r') plt.show()
## 训练CGAN from visualdl import LogWriter import time import random def train(mnist_generator, epoch_num=10, batch_size=128, use_gpu=True, load_model=False): # with fluid.dygraph.guard(): place = fluid.CUDAPlace(0) if use_gpu else fluid.CPUPlace() with fluid.dygraph.guard(place): # 模型存储路径 model_path = './output/' d = D('D') d.train() g = G('G') g.train() # 创建优化方法 g_optimizer = fluid.optimizer.AdamOptimizer(learning_rate=2e-4, parameter_list=g.parameters()) d_optimizer = fluid.optimizer.AdamOptimizer(learning_rate=2e-4, parameter_list=d.parameters()) # 读取上次保存的模型 if load_model == True: g_para, g_opt = fluid.load_dygraph(model_path+'g') d_para, d_opt = fluid.load_dygraph(model_path+'d') g.load_dict(g_para) g_optimizer.set_dict(g_opt) d.load_dict(d_para) d_optimizer.set_dict(d_opt) iteration_num = 0 print('Start time :', time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), 'start step:', iteration_num + 1) for epoch in range(epoch_num): for i, real_data in enumerate(mnist_generator()): # 丢弃不满整个batch_size的数据 if(len(real_data) != BATCH_SIZE): continue iteration_num += 1 ''' 判别器d通过最小化输入真实图片时判别器d的输出与真值标签ones的交叉熵损失,来优化判别器的参数, 以增加判别器d识别真实图片real_image为真值标签ones的概率。 ''' # 将MNIST数据集里的图片读入real_image,将真值标签ones用数字1初始化 ri = np.array([x[0] for x in real_data]).reshape(-1, 1, 28, 28) rl = np.array([x[1] for x in real_data]).astype('float32') real_image = fluid.dygraph.to_variable(np.array(ri)) real_label = fluid.dygraph.to_variable(rl) ones = fluid.dygraph.to_variable(np.ones([len(real_image), 1]).astype('float32')) # 计算判别器d判断真实图片的概率 p_real = d(real_image, real_label) # 计算判别真图片为真的损失 # real_cost = fluid.layers.sigmoid_cross_entropy_with_logits(p_real, ones) real_cost = (p_real - ones) ** 2 #lsgan real_avg_cost = fluid.layers.mean(real_cost) ''' 判别器d通过最小化输入生成器g生成的假图片g(z)时判别器的输出与假值标签zeros的交叉熵损失, 来优化判别器d的参数,以增加判别器d识别生成器g生成的假图片g(z)为假值标签zeros的概率。 ''' # 创建高斯分布的噪声z,将假值标签zeros初始化为0 z = next(z_generator()) z = fluid.dygraph.to_variable(np.array(z)) zeros = fluid.dygraph.to_variable(np.zeros([len(real_image), 1]).astype('float32')) # 判别器d判断生成器g生成的假图片的概率 p_fake = d(g(z, real_label), real_label) # fl = rl # for i in range(batch_size): # fl[i] = random.randint(0, 9) # fake_label = fluid.dygraph.to_variable(fl) # p_fake = d(g(z, fake_label), fake_label) # 计算判别生成器g生成的假图片为假的损失 # fake_cost = fluid.layers.sigmoid_cross_entropy_with_logits(p_fake, zeros) fake_cost = (p_fake - zeros) ** 2 #lsgan fake_avg_cost = fluid.layers.mean(fake_cost) # 更新判别器d的参数 d_loss = real_avg_cost + fake_avg_cost d_loss.backward() d_optimizer.minimize(d_loss) d.clear_gradients() ''' 生成器g通过最小化判别器d判别生成器生成的假图片g(z)为真的概率d(fake)与真值标签ones的交叉熵损失, 来优化生成器g的参数,以增加生成器g使判别器d判别其生成的假图片g(z)为真值标签ones的概率。 ''' # 生成器用输入的高斯噪声z生成假图片 fake = g(z, real_label) # 计算判别器d判断生成器g生成的假图片的概率 p_fake = d(fake, real_label) # 使用判别器d判断生成器g生成的假图片的概率与真值ones的交叉熵计算损失 # g_cost = fluid.layers.sigmoid_cross_entropy_with_logits(p_fake, ones) g_cost = (p_fake - ones) ** 2 #lsgan g_avg_cost = fluid.layers.mean(g_cost) # 反向传播更新生成器g的参数 g_avg_cost.backward() g_optimizer.minimize(g_avg_cost) g.clear_gradients() if(iteration_num % 100 == 0): print('epoch =', epoch, ', batch =', i, ', d_loss =', d_loss.numpy(), 'g_loss =', g_avg_cost.numpy()) show_image_grid(fake.numpy(), BATCH_SIZE, epoch) print('End time :', time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), 'End Step:', iteration_num) # 存储模型 fluid.save_dygraph(g.state_dict(), model_path+'g') fluid.save_dygraph(g_optimizer.state_dict(), model_path+'g') fluid.save_dygraph(d.state_dict(), model_path+'d') fluid.save_dygraph(d_optimizer.state_dict(), model_path+'d') # train(mnist_generator, epoch_num=1, batch_size=BATCH_SIZE, use_gpu=True) # train(mnist_generator, epoch_num=1, batch_size=BATCH_SIZE, use_gpu=True, load_model=True) train(mnist_generator, epoch_num=20, batch_size=BATCH_SIZE, use_gpu=True, load_model=True) #11m # train(mnist_generator, epoch_num=800, batch_size=BATCH_SIZE, use_gpu=True, load_model=True) #440m
## 使用CGAN分别生成数字0~9 def infer(batch_size=128, num=0, use_gpu=True): place = fluid.CUDAPlace(0) if use_gpu else fluid.CPUPlace() with fluid.dygraph.guard(place): # 模型存储路径 model_path = './output/' g = G('G') g.eval() # 读取上次保存的模型 g_para, g_opt = fluid.load_dygraph(model_path+'g') g.load_dict(g_para) # g_optimizer.set_dict(g_opt) z = next(z_generator()) z = fluid.dygraph.to_variable(np.array(z)) label = fluid.layers.fill_constant([batch_size], dtype='float32', value=float(num)) fake = g(z, label) show_image_grid(fake.numpy(), batch_size, -1) for i in range(10): infer(batch_size=BATCH_SIZE, num=i)
仅对个人的深度学习实验做一次记录,文中不足、错误之处欢迎指正;
创作不易,点个赞吧!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。