当前位置:   article > 正文

深度学习算法实践12---卷积神经网络(CNN)实现_from net import cnn

from net import cnn

在搞清楚卷积神经网络(CNN)的原理之后,在本篇博文中,我们将讨论基于Theano的算法实现技术。我们还将以MNIST手写数字识别为例,创建卷积神经网络(CNN),训练该网络,使识别误差达到1%以内。

我们首先需要读入MNIST手写数字识别的训练样本集,为此我们定义了一个工具类:

  1. from __future__ import print_function
  2. __docformat__ = 'restructedtext en'
  3. import six.moves.cPickle as pickle
  4. import gzip
  5. import os
  6. import sys
  7. import timeit
  8. import numpy
  9. import theano
  10. import theano.tensor as T
  11. class MnistLoader(object):
  12. def load_data(self, dataset):
  13. data_dir, data_file = os.path.split(dataset)
  14. if data_dir == "" and not os.path.isfile(dataset):
  15. new_path = os.path.join(
  16. os.path.split(__file__)[0],
  17. "..",
  18. "data",
  19. dataset
  20. )
  21. if os.path.isfile(new_path) or data_file == 'mnist.pkl.gz':
  22. dataset = new_path
  23. if (not os.path.isfile(dataset)) and data_file == 'mnist.pkl.gz':
  24. from six.moves import urllib
  25. origin = (
  26. 'http://www.iro.umontreal.ca/~lisa/deep/data/mnist/mnist.pkl.gz'
  27. )
  28. print('Downloading data from %s' % origin)
  29. urllib.request.urlretrieve(origin, dataset)
  30. print('... loading data')
  31. # Load the dataset
  32. with gzip.open(dataset, 'rb') as f:
  33. try:
  34. train_set, valid_set, test_set = pickle.load(f, encoding='latin1')
  35. except:
  36. train_set, valid_set, test_set = pickle.load(f)
  37. def shared_dataset(data_xy, borrow=True):
  38. data_x, data_y = data_xy
  39. shared_x = theano.shared(numpy.asarray(data_x,
  40. dtype=theano.config.floatX),
  41. borrow=borrow)
  42. shared_y = theano.shared(numpy.asarray(data_y,
  43. dtype=theano.config.floatX),
  44. borrow=borrow)
  45. return shared_x, T.cast(shared_y, 'int32')
  46. test_set_x, test_set_y = shared_dataset(test_set)
  47. valid_set_x, valid_set_y = shared_dataset(valid_set)
  48. train_set_x, train_set_y = shared_dataset(train_set)
  49. rval = [(train_set_x, train_set_y), (valid_set_x, valid_set_y),
  50. (test_set_x, test_set_y)]
  51. return rval
这个类在之前我们已经用过,在这里就不详细讲解了。之所以单独定义这个类,是因为如果我们将问题换为其他类型时,我们只需要修改这一个类,就可以实现训练数据的载入了,这样简化了程序修改工作量。

我们所采用的方法是将图像先接入卷积神经网络,之后再接入BP网络的隐藏层,然后再接入逻辑回归的输出层,因此我们需要先定义多层前向网络的隐藏层和逻辑回归输出层。隐藏层的定义如下所示:

  1. from __future__ import print_function
  2. __docformat__ = 'restructedtext en'
  3. import os
  4. import sys
  5. import timeit
  6. import numpy
  7. import theano
  8. import theano.tensor as T
  9. from logistic_regression import LogisticRegression
  10. # start-snippet-1
  11. class HiddenLayer(object):
  12. def __init__(self, rng, input, n_in, n_out, W=None, b=None,
  13. activation=T.tanh):
  14. self.input = input
  15. if W is None:
  16. W_values = numpy.asarray(
  17. rng.uniform(
  18. low=-numpy.sqrt(6. / (n_in + n_out)),
  19. high=numpy.sqrt(6. / (n_in + n_out)),
  20. size=(n_in, n_out)
  21. ),
  22. dtype=theano.config.floatX
  23. )
  24. if activation == theano.tensor.nnet.sigmoid:
  25. W_values *= 4
  26. W = theano.shared(value=W_values, name='W', borrow=True)
  27. if b is None:
  28. b_values = numpy.zeros((n_out,), dtype=theano.config.floatX)
  29. b = theano.shared(value=b_values, name='b', borrow=True)
  30. self.W = W
  31. self.b = b
  32. lin_output = T.dot(input, self.W) + self.b
  33. self.output = (
  34. lin_output if activation is None
  35. else activation(lin_output)
  36. )
  37. # parameters of the model
  38. self.params = [self.W, self.b]
接下来我们定义逻辑回归算法类:

  1. from __future__ import print_function
  2. __docformat__ = 'restructedtext en'
  3. import six.moves.cPickle as pickle
  4. import gzip
  5. import os
  6. import sys
  7. import timeit
  8. import numpy
  9. import theano
  10. import theano.tensor as T
  11. class LogisticRegression(object):
  12. def __init__(self, input, n_in, n_out):
  13. self.W = theano.shared(
  14. value=numpy.zeros(
  15. (n_in, n_out),
  16. dtype=theano.config.floatX
  17. ),
  18. name='W',
  19. borrow=True
  20. )
  21. self.b = theano.shared(
  22. value=numpy.zeros(
  23. (n_out,),
  24. dtype=theano.config.floatX
  25. ),
  26. name='b',
  27. borrow=True
  28. )
  29. self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W) + self.b)
  30. self.y_pred = T.argmax(self.p_y_given_x, axis=1)
  31. self.params = [self.W, self.b]
  32. self.input = input
  33. print("Yantao: ***********************************")
  34. def negative_log_likelihood(self, y):
  35. return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]), y])
  36. def errors(self, y):
  37. if y.ndim != self.y_pred.ndim:
  38. raise TypeError(
  39. 'y should have the same shape as self.y_pred',
  40. ('y', y.type, 'y_pred', self.y_pred.type)
  41. )
  42. if y.dtype.startswith('int'):
  43. return T.mean(T.neq(self.y_pred, y))
  44. else:
  45. raise NotImplementedError()

这段代码在逻辑回归博文中已经详细讨论过了,这里就不再重复了,有兴趣的读者可以查看这篇博文(逻辑回归算法实现)。

做完上述准备工作之后,我们就可以开始卷积神经网络(CNN)实现了。

我们先来定义基于简化版Lenet5的卷积神经网络(CNN)的定义,代码如下所示:

  1. from __future__ import print_function
  2. import os
  3. import sys
  4. import timeit
  5. import numpy
  6. import theano
  7. import theano.tensor as T
  8. from theano.tensor.signal import pool
  9. from theano.tensor.nnet import conv2d
  10. class LeNetConvPoolLayer(object):
  11. def __init__(self, rng, input, filter_shape, image_shape, poolsize=(2, 2)):
  12. assert image_shape[1] == filter_shape[1]
  13. self.input = input
  14. fan_in = numpy.prod(filter_shape[1:])
  15. fan_out = (filter_shape[0] * numpy.prod(filter_shape[2:]) //
  16. numpy.prod(poolsize))
  17. W_bound = numpy.sqrt(6. / (fan_in + fan_out))
  18. self.W = theano.shared(
  19. numpy.asarray(
  20. rng.uniform(low=-W_bound, high=W_bound, size=filter_shape),
  21. dtype=theano.config.floatX
  22. ),
  23. borrow=True
  24. )
  25. b_values = numpy.zeros((filter_shape[0],), dtype=theano.config.floatX)
  26. self.b = theano.shared(value=b_values, borrow=True)
  27. conv_out = conv2d(
  28. input=input,
  29. filters=self.W,
  30. filter_shape=filter_shape,
  31. input_shape=image_shape
  32. )
  33. pooled_out = pool.pool_2d(
  34. input=conv_out,
  35. ds=poolsize,
  36. ignore_border=True
  37. )
  38. self.output = T.tanh(pooled_out + self.b.dimshuffle('x', 0, 'x', 'x'))
  39. self.params = [self.W, self.b]
  40. self.input = input
 上面代码实现了对输入信号的卷积操作,并对结果进行最大化池化。

下面我们来看怎样初始化Lenet层,怎样将Lenet层输出信号转为MLP网络隐藏层的输入信号,具体代码如下所示:

  1. layer0 = LeNetConvPoolLayer(
  2. rng,
  3. input=layer0_input,
  4. image_shape=(batch_size, 1, 28, 28),
  5. filter_shape=(nkerns[0], 1, 5, 5),
  6. poolsize=(2, 2)
  7. )
如上所示,我们的输入信号是28*28的黑白图像,而且我们采用的批量学习,因此输入图像就定义为(batch_size, 1, 28, 28),我们对图像进行5*5卷积操作,根据卷积操作定义,最终得到的卷积输出层为(28-5+1,28-5+1)=(24,24)的“图像”,我们采用2*2的最大池化操作,即取2*2区域像素的最大值作为新的像素点的值,则最终输出层得到12*12的输出信号。

接下来,我们将输出信号继续输入一个Lenet卷积池化层,代码如下所示:

  1. layer1 = LeNetConvPoolLayer(
  2. rng,
  3. input=layer0.output,
  4. image_shape=(batch_size, nkerns[0], 12, 12),
  5. filter_shape=(nkerns[1], nkerns[0], 5, 5),
  6. poolsize=(2, 2)
  7. )
如上所示,这时输入信号变化为12*12的图像,我们还使用5*5的卷积核,可以得到(12-5+1, 12-5+1)=(8,8)的图像,采用2*2最大池化操作后,得到(4,4)图像。可以通过调用layer1.output.flatten(2)将其变为一维信号,从而输入MLP的隐藏层。

下面我们定义Lenet引擎来实现装入数据,定义网络模型,训练网络工作,代码如下所示:

  1. from __future__ import print_function
  2. import os
  3. import sys
  4. import timeit
  5. import numpy
  6. import theano
  7. import theano.tensor as T
  8. from theano.tensor.signal import pool
  9. from theano.tensor.nnet import conv2d
  10. from mnist_loader import MnistLoader
  11. from logistic_regression import LogisticRegression
  12. from hidden_layer import HiddenLayer
  13. from lenet_conv_pool_layer import LeNetConvPoolLayer
  14. class LenetMnistEngine(object):
  15. def __init__(self):
  16. print("create LenetMnistEngine")
  17. def train_model(self):
  18. learning_rate = 0.1
  19. n_epochs = 200
  20. dataset = 'mnist.pkl.gz'
  21. nkerns = [20, 50]
  22. batch_size = 500
  23. (n_train_batches, n_test_batches, n_valid_batches, \
  24. train_model, test_model, validate_model) = \
  25. self.build_model(learning_rate, n_epochs, \
  26. dataset, nkerns, batch_size)
  27. self.train(n_epochs, n_train_batches, n_test_batches, \
  28. n_valid_batches, train_model, test_model, \
  29. validate_model)
  30. def run(self):
  31. print("run the model")
  32. classifier = pickle.load(open('best_model.pkl', 'rb'))
  33. predict_model = theano.function(
  34. inputs=[classifier.input],
  35. outputs=classifier.logRegressionLayer.y_pred
  36. )
  37. dataset='mnist.pkl.gz'
  38. loader = MnistLoader()
  39. datasets = loader.load_data(dataset)
  40. test_set_x, test_set_y = datasets[2]
  41. test_set_x = test_set_x.get_value()
  42. predicted_values = predict_model(test_set_x[:10])
  43. print("Predicted values for the first 10 examples in test set:")
  44. print(predicted_values)
  45. def build_model(self, learning_rate=0.1, n_epochs=200,
  46. dataset='mnist.pkl.gz',
  47. nkerns=[20, 50], batch_size=500):
  48. rng = numpy.random.RandomState(23455)
  49. loader = MnistLoader()
  50. datasets = loader.load_data(dataset)
  51. train_set_x, train_set_y = datasets[0]
  52. valid_set_x, valid_set_y = datasets[1]
  53. test_set_x, test_set_y = datasets[2]
  54. n_train_batches = train_set_x.get_value(borrow=True).shape[0]
  55. n_valid_batches = valid_set_x.get_value(borrow=True).shape[0]
  56. n_test_batches = test_set_x.get_value(borrow=True).shape[0]
  57. n_train_batches //= batch_size
  58. n_valid_batches //= batch_size
  59. n_test_batches //= batch_size
  60. index = T.lscalar()
  61. x = T.matrix('x')
  62. y = T.ivector('y')
  63. print('... building the model')
  64. layer0_input = x.reshape((batch_size, 1, 28, 28))
  65. layer0 = LeNetConvPoolLayer(
  66. rng,
  67. input=layer0_input,
  68. image_shape=(batch_size, 1, 28, 28),
  69. filter_shape=(nkerns[0], 1, 5, 5),
  70. poolsize=(2, 2)
  71. )
  72. layer1 = LeNetConvPoolLayer(
  73. rng,
  74. input=layer0.output,
  75. image_shape=(batch_size, nkerns[0], 12, 12),
  76. filter_shape=(nkerns[1], nkerns[0], 5, 5),
  77. poolsize=(2, 2)
  78. )
  79. layer2_input = layer1.output.flatten(2)
  80. layer2 = HiddenLayer(
  81. rng,
  82. input=layer2_input,
  83. n_in=nkerns[1] * 4 * 4,
  84. n_out=500,
  85. activation=T.tanh
  86. )
  87. layer3 = LogisticRegression(input=layer2.output, n_in=500, n_out=10)
  88. cost = layer3.negative_log_likelihood(y)
  89. test_model = theano.function(
  90. [index],
  91. layer3.errors(y),
  92. givens={
  93. x: test_set_x[index * batch_size: (index + 1) * batch_size],
  94. y: test_set_y[index * batch_size: (index + 1) * batch_size]
  95. }
  96. )
  97. validate_model = theano.function(
  98. [index],
  99. layer3.errors(y),
  100. givens={
  101. x: valid_set_x[index * batch_size: (index + 1) * batch_size],
  102. y: valid_set_y[index * batch_size: (index + 1) * batch_size]
  103. }
  104. )
  105. params = layer3.params + layer2.params + layer1.params + layer0.params
  106. grads = T.grad(cost, params)
  107. updates = [
  108. (param_i, param_i - learning_rate * grad_i)
  109. for param_i, grad_i in zip(params, grads)
  110. ]
  111. train_model = theano.function(
  112. [index],
  113. cost,
  114. updates=updates,
  115. givens={
  116. x: train_set_x[index * batch_size: (index + 1) * batch_size],
  117. y: train_set_y[index * batch_size: (index + 1) * batch_size]
  118. }
  119. )
  120. return (n_train_batches, n_test_batches, n_valid_batches, \
  121. train_model, test_model, validate_model)
  122. def train(self, n_epochs, n_train_batches, n_test_batches, n_valid_batches,
  123. train_model, test_model, validate_model):
  124. print('... training')
  125. patience = 10000
  126. patience_increase = 2
  127. improvement_threshold = 0.995
  128. validation_frequency = min(n_train_batches, patience // 2)
  129. best_validation_loss = numpy.inf
  130. best_iter = 0
  131. test_score = 0.
  132. start_time = timeit.default_timer()
  133. epoch = 0
  134. done_looping = False
  135. while (epoch < n_epochs) and (not done_looping):
  136. epoch = epoch + 1
  137. for minibatch_index in range(n_train_batches):
  138. iter = (epoch - 1) * n_train_batches + minibatch_index
  139. if iter % 100 == 0:
  140. print('training @ iter = ', iter)
  141. cost_ij = train_model(minibatch_index)
  142. if (iter + 1) % validation_frequency == 0:
  143. validation_losses = [validate_model(i) for i
  144. in range(n_valid_batches)]
  145. this_validation_loss = numpy.mean(validation_losses)
  146. print('epoch %i, minibatch %i/%i, validation error %f %%' %
  147. (epoch, minibatch_index + 1, n_train_batches,
  148. this_validation_loss * 100.))
  149. if this_validation_loss < best_validation_loss:
  150. if this_validation_loss < best_validation_loss * \
  151. improvement_threshold:
  152. patience = max(patience, iter * patience_increase)
  153. best_validation_loss = this_validation_loss
  154. best_iter = iter
  155. test_losses = [
  156. test_model(i)
  157. for i in range(n_test_batches)
  158. ]
  159. test_score = numpy.mean(test_losses)
  160. with open('best_model.pkl', 'wb') as f:
  161. pickle.dump(classifier, f)
  162. print((' epoch %i, minibatch %i/%i, test error of '
  163. 'best model %f %%') %
  164. (epoch, minibatch_index + 1, n_train_batches,
  165. test_score * 100.))
  166. if patience <= iter:
  167. done_looping = True
  168. break
  169. end_time = timeit.default_timer()
  170. print('Optimization complete.')
  171. print('Best validation score of %f %% obtained at iteration %i, '
  172. 'with test performance %f %%' %
  173. (best_validation_loss * 100., best_iter + 1, test_score * 100.))
  174. print(('The code for file ' +
  175. os.path.split(__file__)[1] +
  176. ' ran for %.2fm' % ((end_time - start_time) / 60.)), file=sys.stderr)
上述代码与之前的MLP的训练代码类似,这里就不再讨论了。在我的Mac笔记本上,运行大约6个小时,会得到错误率小于1%的结果。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/691094
推荐阅读
相关标签
  

闽ICP备14008679号