赞
踩
完成框架设计文档中列出的基础类和需要在基础类中实现的接口。使用最简的单多层感知机(Multi-Layer Perceptron)模型对框架进行初步验证, 因此, 除了框架的核心部分外, 还要实现一个全连接层,一个激活函数,一个优化器和一个损失函数。
我把这个框架命名为cute-dl, 已经上传到github上: https://github.com/brandonlyg/cute-dl.
目录结构为:
– cutedl: 框架实现代码
– example: 示例
– test: 单元测试
MLP示例位于 example/mlp目录下。
相关代码在model.py中.
LayerParam只有属性的定义, 没什么逻辑在里面:
lass LayerParam(object): ''' layer_name: 所属层的的名字 name: 参数名 value: 参数值 ''' def __init__(self, layer_name, name, value): self.__name = layer_name+"/"+name self.value = value #梯度 self.gradient = None #更新次数 self.udt = 0 @property def name(self): return self.__name def reset(self): self.gradient = None self.udt = 0
其中参数名字是使用树形结构, 例如: “1-MyLayer/W”, 是"1-MyLayer"层的"W"参数的名字。其中"1"是层在模型中的唯一ID, "MyLayer"是层的标签(tag), "W"是参数在这个层中的唯一名字。
Layer需要实现两个方法: 一个是__init__方法,一个是join方法. 其他方法不需要实现,只需按设计文档中的描述给出定义即可。
先来看看__init__方法:
''' outshape: 输出形状 2 或者 (2,3) kargs: activation: 激活函数的名字 inshape: 输入形状 ''' def __init__(self, *outshape, **kargs) #输出形状 if len(outshape) == 1 and type(outshape[0]) == type(()): self.__outshape = outshape[0] else: self.__outshape = outshape #输入形状 self.__inshape = None #得到激活函数 self.__activation = activations.get('linear') #层在模型中的id, 是层在模型中的索引 self.__id = 0 #层的名字 self.__name = '/%d-%s'%(self.__id, self.tag) #得到可选参数 #print("Layer kargs:", kargs) if 'inshape' in kargs: self.__inshape = kargs['inshape'] if type(self.__inshape) != type(()): self.__inshape = (self.__inshape,) #print("------inshape:", self.__inshape) if 'activation' in kargs: self.__activation = activations.get(kargs['activation']) if self.__inshape is not None: self.init_params()
实现的时主要处理这么几个问题:
join方法实现:
''' 加入到模型中 pre_layer: 前一个层 *inshape: 输入形状 ''' def join(self, pre_layer, *inshape): if self.__outshape == (-1,): self.__inshape = pre_layer.inshape self.__outshape = pre_layer.outshape else: self.__inshape = pre_layer.outshape if len(inshape) != 0: self.__inshape = inshape self.__id = pre_layer.layer_id + 1 self.__name = '/%d-%s'%(self.__id, self.tag) self.init_params()
这个方法主要功能是把当前层和另一层连接在一起, 让另一个层成为当前层的(在模型中的)前一层。这里的"连接"主要体现在: 把另一个层的输出作为输入。对层ID的处理上, 使用简单的累加保证层ID在模型中是唯一的, 同时还能通过ID的值知道层位于模型中的什么位置。 有了输入输出形状, 就可以调用子类实现的init_params方法初始化参数了。
激活函数代码在activation.py中。
接口定义:
'''
激活函数
'''
class Activation(object):
name=''
def __call__(self, in_batch):
raise Exception("__call__ not implement")
'''
求梯度
gradient: 该函数输出值的梯度
'''
def grad(self, gradient):
raise Exception("gradient not implement")
其中类属性name作为激活函数的名字。
实现线性激活函数, 作为默认激活函数:
'''
线性激活函数, 没有激活
'''
class Linear(Activation):
name='linear'
def __call__(self, in_batch):
return in_batch
def grad(self, gradient):
return gradient
实现最常用的relu激活函数:
''' relu 激活函数 ''' class Relu(Activation): name='relu' def __init__(self): self.__grad = None def __call__(self, in_batch): #得到 <= 0的数据的索引 indices = in_batch <= 0 in_batch[indices] = 0 self.__grad = indices return in_batch def grad(self, gradient): gradient[self.__grad] = 0 self.__grad = None return gradient
实现用名字(name)获取激活函数:
act_dict = {
Linear.name: Linear,
Relu.name: Relu
}
#创建激活函数
def get(name):
#print(act_dict)
#print('name:', name)
ACT = act_dict[name]
return ACT()
首先需要向模型中添加层
''' layers: Layer list ''' def __init__(self, layers=None): self.__layers = layers ''' 添加层 layer: Layer类型的对象 ''' def add(self, layer): if self.__layers is None: self.__layers = [] self.__layers.append(layer) return self
__init__和add方法都能实现这个功能。
然后是层的的访问能力:
''' 得到一个Layer对象 idx: Layer对象的索引 ''' def get_layer(self, index): self.__check() if len(self.__layers) <= index: raise Exception("index out of range %d"%len(self.__layers)) return self.__layers[index] @property def layer_count(self): return len(self.__layers) ''' 得到层的迭代器 ''' def layer_iterator(self): self.__check() for ly in self.__layers: yield ly
接下来是组装模型:
''' 组装模型 ''' def assemble(self): self.__check() count = len(self.__layers) #输入层必须要有输入形状 ly_0 = self.__layers[0] if ly_0.inshape is None or len(ly_0.inshape) == 0: raise Exception("input layer miss inshape") #把每一层的输入形状设置为上一层的输出形状, #设置输入形状的同时, 要求该层自动初始化参数(如果有参数的话) pre_ly = ly_0 for ly in self.__layers[1:]: ly.join(pre_ly) pre_ly = ly
向前传播:
'''
使用模型预测
in_batch: 一批输入数据
'''
def predict(self, in_batch, training=False):
self.__check()
out = in_batch
for ly in self.__layers:
out = ly.forward(out, training)
return out
反向传播:
'''
反向传播梯度
'''
def backward(self, gradient):
g = gradient
#pdb.set_trace()
count = len(self.__layers)
for i in range(count-1, -1, -1):
ly = self.__layers[i]
g = ly.backward(g)
Session代码在session.py中。
初始化__init__:
'''
model: Model对象
loss: Loss对象
optimizer: Optimizer对象
'''
def __init__(self, model, loss, optimizer):
self.__model = model
self.__loss = loss
self.__optimizer = optimizer
会话主要维护模型, 损失函数和优化器。这些对一个简单的MLP模型来说已经足够,至于genoptimizer以后再添加。
训练模型:
''' 分批训练 ''' def batch_train(self, data, label): #使用模型预测 out = self.__model.predict(data, training=True) #使用损失函数评估误差 loss = self.__loss(out, label) grad = self.__loss.gradient #pdb.set_trace() #反向传播梯度 self.__model.backward(self.__loss.gradient) #更新模型参数 self.__optimizer(self.__model) return loss
保存会话:
''' 保存session fpath: 保存的文件路径 fpath+'.s.pkl' 是保存session的文件 fpath+'.m.pkl' 是保存model的文件 ''' def save(self, fpath): model = self.__model self.__model = None model.save(fpath) realfp = fpath + ".s.pkl" with open(realfp, 'wb') as f: pickle.dump(self, f)
这里把模型和会话分开保存, 是为了以后可以灵活地选择只加载模型或加载整个会话。下面是模型的保存方法, 在Model中实现:
'''
保存模型
'''
def save(self, fpath):
dir = os.path.dirname(fpath)
if not os.path.exists(dir):
os.mkdir(dir)
self.reset()
realfp = fpath + ".m.pkl"
with open(realfp, 'wb') as f:
pickle.dump(self, f)
加载会话:
''' 加载session ''' @classmethod def load(cls, fpath): realfp = fpath + ".s.pkl" if not os.path.exists(realfp): return None sess = None with open(realfp, 'rb') as f: sess = pickle.load(f) model = Model.load(fpath) sess.set_model(model) return sess
损失函数代码在loss.py中。首先定义接口:
''' 损失函数 ''' class Loss(object): ''' 梯度属性 ''' @property def gradient(self): raise Exception("gradient not impliment") ''' 计算误差和梯度 y_true 数据的真实标签 y_pred 模型预测的标签 return 误差值 ''' def __call__(self, y_true, y_pred): raise Exception("__call__ not impliment")
接下来给出均方误差损失函数实现:
''' 均方误差损失函数 ''' class Mse(Loss): def __init__(self): self.__grad = None def __call__(self, y_true, y_pred): err = y_true - y_pred loss = (err**2).mean(axis=0)/2 n = y_true.shape[0] self.__grad = err/n #pdb.set_trace() return loss.sum() @property def gradient(self): return self.__grad
优化器代码在optimizer.py中。
定义接口:
'''
学习率优化器
'''
class Optimizer(object):
'''
更新参数
'''
def __call__(self, model):
raise Exception('not implement')
实现一个固定学习率优化器, 没有用任何参数优化算法。
''' 固定学习率优化器 ''' class Fixed(Optimizer): ''' lt: 学习率 ''' def __init__(self, lt=0.01): self.__lt = lt def __call__(self, model): #pdb.set_trace() for ly in model.layer_iterator(): for p in ly.params: p.value -= self.__lt * p.gradient p.udt += 1
到目前为止,一个能够支持最简单MLP模型的框架已经完成。接下来用一个MLP示例来验证一下。
使用MLP模型完成一个广义线性回归的任务, 代码在examples/mlp/linear-regression.py中。
假设这个任务是拟合一个二次多项式函数:
'''
任务目标函数
'''
def target_func(x):
##加入服从参数(0, 0.25^2)正态分布噪声
y = (x - 2)**2 + 0.25 * np.random.randn(len(x))
return y
看一下这个函数的图像:
从使用这个函数采样得到数据集:
''' 生成数据集 返回: train_x, train_y, test_x, test_y train_x, train_y 训练数据集的数据和标签 test_x, test_y 验证数据解的数据和标签 ''' def generate_dataset(): ''' 生成200条数据, 随机取出80%条作为训练数据集, 剩余数据为测试数据集 ''' fpath = "./ds.pkl" if os.path.exists(fpath): with open(fpath, 'rb') as f: ds = pickle.load(f) return ds count = 200 x = np.linspace(-1, 5, count) y = target_func(x) #打乱顺序 indices = np.arange(count) np.random.shuffle(indices) #训练数据集 split = int(count*0.8) idxs = indices[:split] train_x = x[idxs].reshape((-1,1)) train_y = y[idxs].reshape((-1,1)) #测试数据集 idxs = sorted(indices[split:]) test_x = x[idxs].reshape((-1, 1)) shape = test_x.shape test_y = y[idxs].reshape((-1, 1)) ds = { 'train_x': train_x, 'train_y': train_y, 'test_x': test_x, 'test_y': test_y } with open(fpath, 'wb') as f: pickle.dump(ds, f) return ds #得到数据集 ds_0 = generate_dataset() print("train shape:", ds_0['train_x'].shape) print("test shape:", ds_0['test_x'].shape) #训练集只取一部分 count = 100 ds_1 = { 'train_x': ds_0['train_x'][:16], 'train_y': ds_0['train_y'][:16], 'test_x': ds_0['test_x'], 'test_y': ds_0['test_y'] }
这里得到两个数据集, 一个数据集中有160条训练数据, 40条验证数据。另一个中有16条训练数据和40条验证数据。
分批训练模型:
''' 训练模型 ''' def train(epochs, ds, model=None, batch_size=64, record_epochs=1): #加载/构建session sess = None if model is None: sess = Session.load(model_path) else: sess = Session(model, loss=losses.Mse(), optimizer = optimizers.Fixed() ) train_x = ds['train_x'] train_y = ds['train_y'] test_x = ds['test_x'] test_y = ds['test_y'] batchs = int(train_x.shape[0]/batch_size) print("epochs:%d, batchs=%d"%(epochs, batchs)) #记录训练历史 history = { 'loss': [], 'val_loss': [], 'epochs': [], 'val_x': test_x, 'val_y': test_y, 'val_pred': None } print("start training ") t_start = time.time() steps = epochs * batchs epoch = 1 #循环训练 for step in range(steps): start = (step % batchs) * batch_size end = start + batch_size batch_x = train_x[start:end] batch_y = train_y[start:end] loss = sess.batch_train(batch_x, batch_y) cur_epoch = int(step/batchs) + 1 #每轮打印一次 if step > 0 and step % batchs == 0: print((('epoch:%05d/%d loss=%f'%(cur_epoch, epochs, loss))+' '*50)[:50], end='\r') #记录 if step % batchs == 0 and (cur_epoch - epoch == record_epochs or cur_epoch == epochs): epoch = cur_epoch y_pred = sess.model.predict(test_x) val_loss = sess.loss(test_y, y_pred) history['loss'].append(loss) history['val_loss'].append(val_loss) history['epochs'].append(epoch) history['val_pred'] = y_pred print((('epoch:%05d/%d loss=%f, val_loss=%f'%(cur_epoch, epochs, loss, val_loss))+' '*50)[:50], end='\r') print("") sess.save(model_path) print("training finished cost:%f" % (time.time() - t_start)) return history
通过这段代码可以看出,框架虽然看起来可用, 但训练模型是仍然需要不少代码,不够友好。不过没关系,目前先通过示例积累经验,以后在把分批训练的功能加入到Session中。要牢记现阶段的主要任务: 对框架进行初步验证。
#欠拟合示例
def fit_1():
model = Model([
nnlys.Dense(32, inshape=1),
nnlys.Dense(1)
])
model.assemble()
#这个模型是一个线性模型, 用来拟合非线性函数, 模型复杂度不够,一定会表现出欠拟合
history = train(20000, ds_0, model, record_epochs=100)
fit_report(history, report_path+'01.png')
拟合报告:
可以看到不论是训练误差还是验证误差都很大, 下面的拟合图形更是惨不忍睹。模型呈欠拟合。
#使用增加模型复杂度解决欠拟合问题
def fit_2():
model = Model([
nnlys.Dense(32, inshape=1, activation='relu'),
nnlys.Dense(1)
])
model.assemble()
#使用了relu激活函数模型变成了非线性的, 增加了模型的复杂度
history = train(30000, ds_0, model, record_epochs=300)
history['loss'] = history['loss'][5:]
history['val_loss'] = history['val_loss'][5:]
history['epochs'] = history['epochs'][5:]
fit_report(history, report_path+'02.png')
拟合报告:
拟合情况比较理想。
#过拟合
def fit_3():
model = Model([
nnlys.Dense(512, inshape=1, activation='relu'),
nnlys.Dense(128, activation='relu'),
nnlys.Dense(1)
])
model.assemble()
#使用数据集ds_1, 只有16条训练数据
history = train(30000, ds_1, model, batch_size=16, record_epochs=300)
history['loss'] = history['loss'][20:]
history['val_loss'] = history['val_loss'][20:]
history['epochs'] = history['epochs'][20:]
fit_report(history, report_path+'03.png')
拟合报告:
可以看到训练误差持续降低, 而验证误差先低后高, 说明随着训练轮次的增加,模型过多地学习到了训练数据的模式, 导致泛化误差增大,呈现过过拟合。
#减少参数数量缓解过拟合
def fit_4():
model = Model([
nnlys.Dense(128, inshape=1, activation='relu'),
nnlys.Dense(64, activation='relu'),
nnlys.Dense(1)
])
model.assemble()
history = train(30000, ds_1, model, batch_size=16, record_epochs=300)
history['loss'] = history['loss'][20:]
history['val_loss'] = history['val_loss'][20:]
history['epochs'] = history['epochs'][20:]
fit_report(history, report_path+'04.png')
拟合报告:
可以看到过拟合现象有所缓解,到25000左右才出现过拟合现象,拟合图形变得稍微好一点, 过拟合只是略有缓解。
目前已经实现了一个最简单可运行的深度学习框架。从验证情况看,它已经达到预期,能够支持简单的MLP模型, 但很直接地暴露出两个问题:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。