当前位置:   article > 正文

第十二节,TensorFlow读取数据的几种方法以及队列的使用

tf.fifoqueue(3, [tf.int16], shared_name=queue_name)

TensorFlow程序读取数据一共有3种方法:

  • 供给数据(Feeding): 在TensorFlow程序运行的每一步, 让Python代码来供给数据。
  • 从文件读取数据: 在TensorFlow图的起始, 让一个输入管道从文件中读取数据。
  • 预加载数据: 在TensorFlow图中定义常量或变量来保存所有数据(仅适用于数据量比较小的情况)。

一 预加载数据

  1. import tensorflow as tf
  2. x1 = tf.constant([2,3,4])
  3. x2 = tf.constant([4,0,1])
  4. y = tf.add(x1,x2)
  5. with tf.Session() as sess:
  6. print(sess.run(y))

在这里使用x1,x2保存具体的值,即将数据直接内嵌到图中,再将图传入会话中执行,当数据量较大时,图的输出会遇到效率问题。

二 供给数据

  1. import tensorflow as tf
  2. x1 = tf.placeholder(tf.int32)
  3. x2 = tf.placeholder(tf.int32)
  4. #用python产生数据
  5. v1 = [2,3,4]
  6. v2 = [4,0,1]
  7. y = tf.add(x1,x2)
  8. with tf.Session() as sess:
  9. print(sess.run(y,feed_dict={x1:v1,x2:v2}))

在这里x1,x2只是占位符,没有具体的值,那么运行的时候去哪取值呢?这时候就要用到sess.run()的feed_dict参数,将python产生的数据传入,并计算y。

以上两种方法都很方便,但是遇到大型数据的时候就会很吃力,即使是Feed_dict,中间环节的增加也是不小的开销,因为数据量大的时候,TensorFlow程序运行的每一步,我们都需要使用python代码去从文件中读取数据,并对读取到的文件数据进行解码。最优的方案就是在图中定义好文件读取的方法,让TF自己从文件中读取数据,并解码成可用的样本集。

三 TensorFlow中的队列机制

从文件中读取数据的方法有很多,比如可以在一个文本里面写入图片数据的路径和标签,然后用tensorflow的read_file()读入图片;也可以将图片和标签的值直接存放在CSV或者txt文件。

我们会在后面陆续介绍以下几种读取文件的方式:

  • 从字典结构的数据文件读取
  • 从bin文件读取
  • 从CSV(TXT)读取
  • 从原图读取
  • TFRecord格式文件的读取

在讲解文件的读取之前,我们需要先了解一下TensorFlow中的队列机制,后面也会详细介绍。

TensorFlow提供了一个队列机制,通过多线程将读取数据与计算数据分开。因为在处理海量数据集的训练时,无法把数据集一次全部载入到内存中,需要一边从硬盘中读取,一边进行训练,为了加快训练速度,我们可以采用多个线程读取数据,一个线程消耗数据。

下面简要介绍一下,TensorFlow里与Queue有关的概念和用法。详细内容点击原文

其实概念只有三个:

  • Queue是TF队列和缓存机制的实现
  • QueueRunner是TF中对操作Queue的线程的封装
  • Coordinator是TF中用来协调线程运行的工具

虽然它们经常同时出现,但这三样东西在TensorFlow里面是可以单独使用的,不妨先分开来看待。

1.Queue

据实现的方式不同,分成具体的几种类型,例如:

  • tf.FIFOQueue :按入列顺序出列的队列
  • tf.RandomShuffleQueue :随机顺序出列的队列
  • tf.PaddingFIFOQueue :以固定长度批量出列的队列
  • tf.PriorityQueue :带优先级出列的队列
  • ... ...

这些类型的Queue除了自身的性质不太一样外,创建、使用的方法基本是相同的。

创建函数的参数:

  1. tf.FIFOQueue(capacity, dtypes, shapes=None, names=None,
  2. shared_name=None, name="fifo_queue")
Queue主要包含 入列(enqueue)出列(dequeue)两个操作。队列本身也是图中的一个节点。其他节点(enqueue, dequeue)可以修改队列节点中的内容。enqueue操作返回计算图中的一个Operation节点,dequeue操作返回一个Tensor值。Tensor在创建时同样只是一个定义(或称为“声明”),需要放在Session中运行才能获得真正的数值。下面是一个单独使用Queue的例子:
  1. #创建的图:一个先入先出队列,以及初始化,出队,+1,入队操作
  2. q = tf.FIFOQueue(3, "float")
  3. init = q.enqueue_many(([0.1, 0.2, 0.3],))
  4. x = q.dequeue()
  5. y = x + 1
  6. q_inc = q.enqueue([y])
  7. #开启一个session,session是会话,会话的潜在含义是状态保持,各种tensor的状态保持
  8. with tf.Session() as sess:
  9. sess.run(init)
  10. for i in range(2):
  11. sess.run(q_inc)
  12. quelen = sess.run(q.size())
  13. for i in range(quelen):
  14. print (sess.run(q.dequeue()))

2. QueueRunner

之前的例子中,入队操作都在主线程中进行,Session中可以多个线程一起运行。 在数据输入的应用场景中,入队操作从硬盘上读取,入队操作是从硬盘中读取输入,放到内存当中,速度较慢。 使用QueueRunner可以创建一系列新的线程进行入队操作,让主线程继续使用数据。如果在训练神经网络的场景中,就是训练网络和读取数据是异步的,主线程在训练网络,另一个线程在将数据从硬盘读入内存。

  1. '''
  2. QueueRunner()的使用
  3. '''
  4. q = tf.FIFOQueue(10, "float")
  5. counter = tf.Variable(0.0) #计数器
  6. # 给计数器加一
  7. increment_op = tf.assign_add(counter, 1.0)
  8. # 将计数器加入队列
  9. enqueue_op = q.enqueue(counter)
  10. # 创建QueueRunner
  11. # 用多个线程向队列添加数据
  12. # 这里实际创建了4个线程,两个增加计数,两个执行入队
  13. qr = tf.train.QueueRunner(q, enqueue_ops=[increment_op, enqueue_op] * 2)
  14. #主线程
  15. with tf.Session() as sess:
  16. sess.run(tf.initialize_all_variables())
  17. #启动入队线程
  18. enqueue_threads = qr.create_threads(sess, start=True)
  19. #主线程
  20. for i in range(10):
  21. print (sess.run(q.dequeue()))

能正确输出结果,但是最后会报错,ERROR:tensorflow:Exception in QueueRunner: Session has been closed.也就是说,当循环结束后,该Session就会自动关闭,相当于main函数已经结束了。

  1. '''
  2. QueueRunner()的使用
  3. '''
  4. q = tf.FIFOQueue(10, "float")
  5. counter = tf.Variable(0.0) #计数器
  6. # 给计数器加一
  7. increment_op = tf.assign_add(counter, 1.0)
  8. # 将计数器加入队列
  9. enqueue_op = q.enqueue(counter)
  10. # 创建QueueRunner
  11. # 用多个线程向队列添加数据
  12. # 这里实际创建了4个线程,两个增加计数,两个执行入队
  13. qr = tf.train.QueueRunner(q, enqueue_ops=[increment_op, enqueue_op] * 2)
  14. '''
  15. #主线程
  16. with tf.Session() as sess:
  17. sess.run(tf.initialize_all_variables())
  18. #启动入队线程
  19. enqueue_threads = qr.create_threads(sess, start=True)
  20. #主线程
  21. for i in range(10):
  22. print (sess.run(q.dequeue()))
  23. '''
  24. # 主线程
  25. sess = tf.Session()
  26. sess.run(tf.initialize_all_variables())
  27. # 启动入队线程
  28. enqueue_threads = qr.create_threads(sess, start=True)
  29. # 主线程
  30. for i in range(0, 10):
  31. print(sess.run(q.dequeue()))

不使用with tf.Session,那么Session就不会自动关闭。

并不是我们设想的1,2,3,4,本质原因是增加计数的进程会不停的后台运行,执行入队的进程会先执行10次(因为队列长度只有10),然后主线程开始消费数据,当一部分数据消费被后,入队的进程又会开始执行。最终主线程消费完10个数据后停止,但其他线程继续运行,程序不会结束。

经验:因为tensorflow是在图上进行计算,要驱动一张图进行计算,必须要送入数据,如果说数据没有送进去,那么sess.run(),就无法执行,tf也不会主动报错,提示没有数据送进去,其实tf也不能主动报错,因为tf的训练过程和读取数据的过程其实是异步的。tf会一直挂起,等待数据准备好。现象就是tf的程序不报错,但是一直不动,跟挂起类似。 

  1. '''
  2. QueueRunner()的使用
  3. '''
  4. q = tf.FIFOQueue(10, "float")
  5. counter = tf.Variable(0.0) #计数器
  6. # 给计数器加一
  7. increment_op = tf.assign_add(counter, 1.0)
  8. # 将计数器加入队列
  9. enqueue_op = q.enqueue(counter)
  10. # 创建QueueRunner
  11. # 用多个线程向队列添加数据
  12. # 这里实际创建了4个线程,两个增加计数,两个执行入队
  13. qr = tf.train.QueueRunner(q, enqueue_ops=[increment_op, enqueue_op] * 2)
  14. #主线程
  15. with tf.Session() as sess:
  16. sess.run(tf.initialize_all_variables())
  17. #启动入队线程
  18. enqueue_threads = qr.create_threads(sess, start=True)
  19. #主线程
  20. for i in range(10):
  21. print (sess.run(q.dequeue()))

上图将生成数据的线程注释掉,程序就会卡在sess.run(q.dequeue()),等待数据的到来QueueRunner是用来启动入队线程用的。

3.Coordinator

Coordinator是个用来保存线程组运行状态的协调器对象,它和TensorFlow的Queue没有必然关系,是可以单独和Python线程使用的。例如:

  1. '''
  2. Coordinator
  3. '''
  4. import threading, time
  5. # 子线程函数
  6. def loop(coord, id):
  7. t = 0
  8. while not coord.should_stop():
  9. print(id)
  10. time.sleep(1)
  11. t += 1
  12. # 只有1号线程调用request_stop方法
  13. if (t >= 2 and id == 0):
  14. coord.request_stop()
  15. # 主线程
  16. coord = tf.train.Coordinator()
  17. # 使用Python API创建10个线程
  18. threads = [threading.Thread(target=loop, args=(coord, i)) for i in range(10)]
  19. # 启动所有线程,并等待线程结束
  20. for t in threads: t.start()
  21. coord.join(threads)

将这个程序运行起来,会发现所有的子线程执行完两个周期后都会停止,主线程会等待所有子线程都停止后结束,从而使整个程序结束。由此可见,只要有任何一个线程调用了Coordinator的request_stop方法,所有的线程都可以通过should_stop方法感知并停止当前线程。

将QueueRunner和Coordinator一起使用,实际上就是封装了这个判断操作,从而使任何一个出现异常时,能够正常结束整个程序,同时主线程也可以直接调用request_stop方法来停止所有子线程的执行。

简要 介绍完了TensorFlow中队列机制后,我们再来看一下如何从文件中读取数据。

四 从文件中读取数据

1.从字典结构的数据文件读取(python数据格式)

(1)在介绍字典结构的数据文件的读取之前,我们先来介绍怎么创建字典结构的数据文件。

  • 先要准备好图片文件,我们使用Open CV3进行图像读取。
  • 把cv2.imread()读取到的图像进行裁切,扭曲,等处理。
  • 使用numpy才对数据进行处理,比如维度合并。
  • 把处理好的每一张图像的数据和标签分别存放在对应的list(或者ndarray)中。
  • 创建一个字典,包含两个元素‘data’和'labels',并分别赋值为上面的list。
  • 使用pickle模块对字典进行序列化,并保存到文件中。

具体代码我们查看如下文章:图片存储为cifar的Python数据格式

如果针对图片比较多的情况,我们不太可能把所有图像都写入个文件,我们可以分批把图像写入几个文件中。

(2)cifar10数据有三种版本,分别是MATLAB,Python和bin版本 数据下载链接: http://www.cs.toronto.edu/~kriz/cifar.html 

其中Python版本的数据即是以字典结构存储的数据 。

针对字典结构的数据文件读取,我在AlexNet那节中有详细介绍,主要就是通过pickle模块对文件进行反序列化,获取我们所需要的数据。

2.从bin文件读取

在官网的cifar的例子中就是从bin文件中读取的。bin文件需要以一定的size格式存储,比如每个样本的值占多少字节,label占多少字节,且这对于每个样本都是固定的,然后一个挨着一个存储。这样就可以使用tf.FixedLengthRecordReader 类来每次读取固定长度的字节,正好对应一个样本存储的字节(包括label)。并且用tf.decode_raw进行解析。

(1)制作bin file 

如何将自己的图片存为bin file,可以看看下面这篇博客,这篇博客使用C++和opencv将图片存为二进制文件: http://blog.csdn.net/code_better/article/details/53289759 

(2)从bin file读入 
在后面会详细讲解如何从二进制记录文件中读取数据,并以cifar10_input.py作为案例。

  1. def read_cifar10(filename_queue):
  2. """Reads and parses examples from CIFAR10 data files.
  3. Recommendation: if you want N-way read parallelism, call this function
  4. N times. This will give you N independent Readers reading different
  5. files & positions within those files, which will give better mixing of
  6. examples.
  7. Args:
  8. filename_queue: A queue of strings with the filenames to read from.
  9. Returns:
  10. An object representing a single example, with the following fields:
  11. height: number of rows in the result (32)
  12. width: number of columns in the result (32)
  13. depth: number of color channels in the result (3)
  14. key: a scalar string Tensor describing the filename & record number
  15. for this example.
  16. label: an int32 Tensor with the label in the range 0..9.
  17. uint8image: a [height, width, depth] uint8 Tensor with the image data
  18. """
  19. class CIFAR10Record(object):
  20. pass
  21. result = CIFAR10Record()
  22. # Dimensions of the images in the CIFAR-10 dataset.
  23. # See http://www.cs.toronto.edu/~kriz/cifar.html for a description of the
  24. # input format.
  25. label_bytes = 1 # 2 for CIFAR-100
  26. result.height = 32
  27. result.width = 32
  28. result.depth = 3
  29. image_bytes = result.height * result.width * result.depth
  30. # Every record consists of a label followed by the image, with a
  31. # fixed number of bytes for each.
  32. record_bytes = label_bytes + image_bytes
  33. # Read a record, getting filenames from the filename_queue. No
  34. # header or footer in the CIFAR-10 format, so we leave header_bytes
  35. # and footer_bytes at their default of 0.
  36. reader = tf.FixedLengthRecordReader(record_bytes=record_bytes)
  37. result.key, value = reader.read(filename_queue)
  38. # Convert from a string to a vector of uint8 that is record_bytes long.
  39. record_bytes = tf.decode_raw(value, tf.uint8)
  40. # The first bytes represent the label, which we convert from uint8->int32.
  41. result.label = tf.cast(
  42. tf.strided_slice(record_bytes, [0], [label_bytes]), tf.int32)
  43. # The remaining bytes after the label represent the image, which we reshape
  44. # from [depth * height * width] to [depth, height, width].
  45. depth_major = tf.reshape(
  46. tf.strided_slice(record_bytes, [label_bytes],
  47. [label_bytes + image_bytes]),
  48. [result.depth, result.height, result.width])
  49. # Convert from [depth, height, width] to [height, width, depth].
  50. result.uint8image = tf.transpose(depth_major, [1, 2, 0])
  51. return result

这段代码如果看不懂,可以先跳过。

3.从CSV(TXT)文件读取

有的时候在数据量不是很大的时候,可以从CSV或者TXT文件进行读取。 

(1)制作CSV(TXT)数据文本 
CSV (TXT)一般是一行存一个样本(包括样本值和label),用逗号隔开。用python的普通文本写入即可。

(2)读取的时候tf.TextLineReader 类来每次读取一行,并使用tf.decode_csv来对每一行进行解析。 
这里主要介绍一下:

 tf.decode_csv(records, record_defaults, field_delim=None, name=None) 
  • 首先records与第二种方法中相同,为reader读到的内容,这里为CSV (TXT)的一行。
  • 一般一行里面的值会用逗号或者空格隔开,这里第三个输入参数就是指定用什么来进行分割,默认为逗号。
  • 第二个输入参数是指定分割后每个属性的类型,比如分割后会有三列,那么第二个参数就应该是[[‘int32’], [], [‘string’]], 可不指定类型(设为空[])也可以。如果分割后的属性比较多,比如有100个,可以用[ []*100 ]来表示 
col= tf.decode_csv(records, record_defaults=[ [ ]*100 ], field_delim=' ', name=None)

返回的col是长度为100的list。 
需要注意的是,当数据量比较大的时候,存成CSV或TXT文件要比BIN文件大的多,因此在TF中读取的速度也会慢很多。因此尽量不要读取大的CSV的方式来输入。

在后面我们会详细讲解如何从CSV文件中读取数据,并有具体的案例。

4 从原图中读取

(1)制作数据路径文件 
一行一例,每例包括该样本的地址和label,用逗号分割开,用python普通文件写入即可 
(2)读取 
很多情况下我们的图片训练集就是原始图片本身,并没有像cifar dataset那样存成bin等格式。因此我们需要根据一个train_list列表,去挨个读取图片。这里我用到的方法是首先将train_list.txt中的image list(也就是每一行有图片的路径和label组成)读入队列中,那么对每次dequeue的内容中可以提取当前图片的路径和label。

  1. filename = os.path.join(data_dir, trainfilename)
  2. with open(filename) as fid:
  3. content = fid.read()
  4. content = content.split('\n')
  5. content = content[:-1]
  6. valuequeue = tf.train.string_input_producer(content,shuffle=True)
  7. value = valuequeue.dequeue()
  8. dir, labels = tf.decode_csv(records=value, record_defaults=[["string"], [""]], field_delim=" ")
  9. labels = tf.string_to_number(labels, tf.int32)
  10. imagecontent = tf.read_file(dir)
  11. image = tf.image.decode_png(imagecontent, channels=3, dtype=tf.uint8)
  12. image = tf.cast(image, tf.float32)
  13. #将图片统一为32*32大小的
  14. image = tf.image.resize_images(image,[32,32])
  15. image = tf.reshape(image,[result.depth, result.height, result.width])
  16. # Convert from [depth, height, width] to [height, width, depth].
  17. result.uint8image = tf.transpose(image, [1, 2, 0])

不过这个方法对电脑输入输出要求比较高,如果机械硬盘有坏道,就会报Input/Output error,出现这种情况,要修复机械硬盘坏道。

5.从TFRecord文件读取

TFrecord是Tensorflow推荐的数据集格式,与Tensorflow框架紧密结合。在TensorFlow中提供了一系列接口可以访问TFRecord格式。后面会详细介绍如何将原始图片文件转换为TFRecord格式,然后在运行中通过多线程的方式来读取。

五 QueueRunner和Coordinator结合方式一

在TensorFlow中用Queue的经典模式有两种,都是配合了QueueRunner和Coordinator一起使用的。

这里先介绍第一种方法,显式的创建QueueRunner,然后调用它的create_threads方法启动线程。例如下面这段代码:

  1. '''
  2. 配合使用
  3. '''
  4. import numpy as np
  5. # 10004维输入向量,每个数取值为1-10之间的随机数
  6. data = 10 * np.random.randn(1000, 4) + 1
  7. # 1000个随机的目标值,值为01
  8. target = np.random.randint(0, 2, size=1000)
  9. # 创建Queue,队列中每一项包含一个输入数据和相应的目标值
  10. queue = tf.FIFOQueue(capacity=50, dtypes=[tf.float32, tf.int32], shapes=[[4], []])
  11. # 批量入列数据(这是一个Operation)
  12. enqueue_op = queue.enqueue_many([data, target])
  13. # 出列数据(这是一个Tensor定义)
  14. data_sample, label_sample = queue.dequeue()
  15. # 创建包含4个线程的QueueRunner
  16. qr = tf.train.QueueRunner(queue, [enqueue_op] * 4)
  17. with tf.Session() as sess:
  18. # 创建Coordinator
  19. coord = tf.train.Coordinator()
  20. # 启动QueueRunner管理的线程
  21. enqueue_threads = qr.create_threads(sess, coord=coord, start=True)
  22. # 主线程,消费100个数据
  23. for step in range(100):
  24. if coord.should_stop():
  25. break
  26. data_batch, label_batch = sess.run([data_sample, label_sample])
  27. # 主线程计算完成,停止所有采集数据的进程
  28. coord.request_stop()
  29. coord.join(enqueue_threads)

六 QueueRunner和Coordinator结合方式二

这一小节我们会使用QueueRunner和Coordinator来实现bin文件,以及csv文件、TFRecord格式文件的读取,不过这里我们采用隐式创建线程的方法。在讲解具体代码之前,我们需要先来讲解关于TensorFlow中的文件队列机制和内存队列机制。

首先需要思考的一个问题是,什么是数据读取?以图像数据为例,读取数据的过程可以用下图来表示:

假设我们的硬盘中有一个图片数据集0001.jpg,0002.jpg,0003.jpg……我们只需要把它们读取到内存中,然后提供给GPU或是CPU进行计算就可以了。这听起来很容易,但事实远没有那么简单。事实上,我们必须要把数据先读入后才能进行计算,假设读入用时0.1s,计算用时0.9s,那么就意味着每过1s,GPU都会有0.1s无事可做,这就大大降低了运算的效率。

如何解决这个问题?方法就是将读入数据和计算分别放在两个线程中,将数据读入内存的一个队列,如下图所示:

 

读取线程源源不断地将文件系统中的图片读入到一个内存的队列中,而负责计算的是另一个线程,计算需要数据时,直接从内存队列中取就可以了。这样就可以解决GPU因为IO而空闲的问题!

而在tensorflow中,为了方便管理,在内存队列前又添加了一层所谓的“文件名队列”。

为什么要添加这一层文件名队列?我们首先得了解机器学习中的一个概念:epoch。对于一个数据集来讲,运行一个epoch就是将这个数据集中的图片全部计算一遍。如一个数据集中有三张图片A.jpg、B.jpg、C.jpg,那么跑一个epoch就是指对A、B、C三张图片都计算了一遍。两个epoch就是指先对A、B、C各计算一遍,然后再全部计算一遍,也就是说每张图片都计算了两遍。

tensorflow使用文件名队列+内存队列双队列的形式读入文件,可以很好地管理epoch。下面我们用图片的形式来说明这个机制的运行方式。如下图,还是以数据集A.jpg, B.jpg, C.jpg为例,假定我们要跑一个epoch,那么我们就在文件名队列中把A、B、C各放入一次,并在之后标注队列结束。

程序运行后,内存队列首先读入A(此时A从文件名队列中出队):

 

再依次读入B和C:

 

此时,如果再尝试读入,系统由于检测到了“结束”,就会自动抛出一个异常(OutOfRange)。外部捕捉到这个异常后就可以结束程序了。这就是tensorflow中读取数据的基本机制。如果我们要跑2个epoch而不是1个epoch,那只要在文件名队列中将A、B、C依次放入两次再标记结束就可以了。

典型的文件数据读取会包含下面这些步骤:

1.文件名列表

可以使用字符串张量(比如["file0", "file1"][("file%d" % i) for i in range(2)], [("file%d" % i) for i in range(2)]) 或者tf.train.match_filenames_once ()函数来产生文件名列表。

  1. filenames = [os.path.join(data_dir, 'data_batch_%d.bin' % i)
  2. for i in xrange(1, 6)]
2.文件名队列

对于文件名队列,我们使用tf.train.string_input_producer()函数。这个函数需要传入一个文件名list,系统会自动将它转为一个先入先出的文件名队列, 文件阅读器会需要它来读取数据。

  1. # 同时打开多个文件,显示创建Queue,同时隐含了QueueRunner的创建
  2. filename_queue = tf.train.string_input_producer(filenames)
3.可配置的 文件名乱序(shuffling),可配置的最大训练迭代数(epoch limit)

tf.train.string_input_producer还有两个重要的参数,一个是num_epochs,它就是我们上文中提到的epoch数。另外一个就是shuffle,shuffle是指在一个epoch内文件的顺序是否被打乱。若设置shuffle=False,如下图,每个epoch内,数据还是按照A、B、C的顺序进入文件名队列,这个顺序不会改变:

如果设置shuffle=True,那么在一个epoch内,数据的前后顺序就会被打乱,如下图所示:

在tensorflow中,内存队列不需要我们自己建立,我们只需要使用reader对象从文件名队列中读取数据就可以了。

4.针对输入文件格式的阅读器

根据你的文件格式, 选择对应的文件阅读器, 然后将文件名队列提供给阅读器的read()方法。阅读器的read()方法会输出一个key来表征输入的文件和其中的纪录(对于调试非常有用),同时得到一个字符串标量, 这个字符串标量可以被一个或多个解析器,或者转换操作将其解码为张量并且构造成为样本。

CSV 文件读取

从CSV文件中读取数据, 需要使用TextLineReader()decode_csv() 操作, 如下面的例子所示:我们需要从iris_0.csv和iris_1.csv文件读取数据,iris_0.csv和iris_1.csv文件数据是从iris数据集中选取的部分数据,内容如下:总共有21条记录。

  1. # 同时打开多个文件(文件格式必须一样),隐式示创建Queue,同时隐含了QueueRunner的创建
  2. filename_queue = tf.train.string_input_producer(["iris_0.csv","iris_1.csv"])
  3. reader = tf.TextLineReader()
  4. # Tensorflow的Reader对象可以直接接受一个Queue作为输入 每次read的执行都会从文件中读取一行内容
  5. key, value = reader.read(filename_queue)
  6. # 如果某一列为空,指定默认值,同时指定了默认列的类型
  7. record_defaults = [[0.0], [0.0], [0.0], [0.0], [0]]
  8. # decode_csv 操作会解析读取的一行内容并将其转为张量列表
  9. col1, col2, col3, col4, col5 = tf.decode_csv(value, record_defaults=record_defaults)
  10. features = [col1, col2, col3, col4]
  11. #获取一行数据
  12. #row = tf.decode_csv(value, record_defaults=record_defaults)
  13. with tf.Session() as sess:
  14. coord = tf.train.Coordinator()
  15. # 启动计算图中所有的队列线程 调用tf.train.start_queue_runners来将文件名填充到队列,否则read操作会被阻塞到文件名队列中有值为止。
  16. threads = tf.train.start_queue_runners(coord=coord)
  17. # 主线程,消费50个数据
  18. for _ in range(50):
  19. example, label = sess.run([features, col5])
  20. print('Step {0} {1} {2}'.format(_,example,label))
  21. # 主线程计算完成,停止所有采集数据的进程
  22. coord.request_stop()   # 指定等待某个线程结束
  23. coord.join(threads)
  • 在这个例子中,tf.train.string_input_produecer()会将一个隐含的QueueRunner添加到全局图中(类似的操作还有tf.train.shuffle_batch()等)。由于没有显式地返回QueueRunner()调用create_threads()启动线程,这里使用了tf.train.start_queue_runners()方法直接启动tf.GraphKeys.QUEUE_RUNNERS集合中的所有队列线程。初学者会经常在代码中看到tf.train.start_queue_runners()这个函数,但往往很难理解它的用处,在这里,有了上面的铺垫后,我们就可以解释这个函数的作用了。

在我们使用tf.train.string_input_producer创建文件名队列后,整个系统其实还是处于“停滞状态”的,也就是说,我们文件名并没有真正被加入到队列中(如下图所示)。此时如果我们开始计算,因为内存队列中什么也没有,计算单元就会一直等待,导致整个系统被阻塞。

而使用tf.train.start_queue_runners()之后,才会启动填充队列的线程,这时系统就不再“停滞”。此后计算单元就可以拿到数据并进行计算,整个程序也就跑起来了,这就是函数tf.train.start_queue_runners的用处。

每次read()的执行都会从文件中读取一行内容, decode_csv() 操作会解析这一行内容并将其转为张量列表。如果输入的参数有缺失,record_default参数可以根据张量的类型来设置默认值。

5.纪录解析器(从bin文件读入)

二进制bin文件中读取固定长度纪录, 可以使用tf.FixedLengthRecordReadertf.decode_raw操作。decode_raw操作可以将一个字符串转换为一个uint8的张量。

举例来说,the CIFAR-10 dataset的文件格式定义是:每条记录的长度都是固定的,一个字节的标签,后面是3072字节的图像数据。uint8的张量的标准操作就可以从中获取图像片并且根据需要进行重组。 例子代码可以在tensorflow/models/image/cifar10/cifar10_input.py找到。

  1. def read_cifar10(filename_queue):
  2. """Reads and parses examples from CIFAR10 data files.
  3. Recommendation: if you want N-way read parallelism, call this function
  4. N times. This will give you N independent Readers reading different
  5. files & positions within those files, which will give better mixing of
  6. examples.
  7. Args:
  8. filename_queue: A queue of strings with the filenames to read from.
  9. Returns:
  10. An object representing a single example, with the following fields:
  11. height: number of rows in the result (32)
  12. width: number of columns in the result (32)
  13. depth: number of color channels in the result (3)
  14. key: a scalar string Tensor describing the filename & record number
  15. for this example.
  16. label: an int32 Tensor with the label in the range 0..9.
  17. uint8image: a [height, width, depth] uint8 Tensor with the image data
  18. """
  19. class CIFAR10Record(object):
  20. pass
  21. result = CIFAR10Record()
  22. # Dimensions of the images in the CIFAR-10 dataset.
  23. # See http://www.cs.toronto.edu/~kriz/cifar.html for a description of the
  24. # input format.
  25. label_bytes = 1 # 2 for CIFAR-100
  26. result.height = 32
  27. result.width = 32
  28. result.depth = 3
  29. image_bytes = result.height * result.width * result.depth
  30. # Every record consists of a label followed by the image, with a
  31. # fixed number of bytes for each.
  32. record_bytes = label_bytes + image_bytes
  33. # Read a record, getting filenames from the filename_queue. No
  34. # header or footer in the CIFAR-10 format, so we leave header_bytes
  35. # and footer_bytes at their default of 0.
  36. reader = tf.FixedLengthRecordReader(record_bytes=record_bytes)
  37. result.key, value = reader.read(filename_queue)
  38. # Convert from a string to a vector of uint8 that is record_bytes long.
  39. record_bytes = tf.decode_raw(value, tf.uint8)
  40. # The first bytes represent the label, which we convert from uint8->int32.
  41. result.label = tf.cast(
  42. tf.strided_slice(record_bytes, [0], [label_bytes]), tf.int32)
  43. # The remaining bytes after the label represent the image, which we reshape
  44. # from [depth * height * width] to [depth, height, width].
  45. depth_major = tf.reshape(
  46. tf.strided_slice(record_bytes, [label_bytes],
  47. [label_bytes + image_bytes]),
  48. [result.depth, result.height, result.width])
  49. # Convert from [depth, height, width] to [height, width, depth].
  50. result.uint8image = tf.transpose(depth_major, [1, 2, 0])
  51. return result

read_cifar10()函数需要传入一个文件名队列,这个函数主要做了以下事情:

  • 计算每个记录(样本)包含多少字节。一张图像所占字节数 + 图像标签所占字节数。
  • 每执行一次read()的执行都会从文件中读取一行内容,decode_raw() 操作会解析这一行内容并将其转为张量。
  • 提取第一个字节,即标签,并把类型从uint8->int32
  • 提取剩下的字节,即图像。
  • 把图像数据转换为[height,width,depth]形状。
  • 返回一个对象resulit。包含两个属性(都是张量),result.uint8image包含一张形状为[height,width,depth]的图像,result.label存储该图像对应的标签。
6.可配置的预处理器

你可以对输入的样本进行任意的预处理, 这些预处理不依赖于训练参数,比如上面read_cifar10()的函数,获取一张图像数据张量后,我们可以对图像进行处理,比如裁切,水平翻转,以及对图片进行归一化处理等等。我们可以在tensorflow/models/image/cifar10/cifar10.py找到数据归一化, 提取随机数据片,增加噪声或失真等等预处理的例子。

7.批处理(TFRecord格式文件读写)

在数据输入管道的末端, 我们需要有内存队列来执行输入样本的批量读取。我们使用tf.train.shuffle_batch() 函数来对内存队列中的样本进行乱序处理。 

 我们用一个具体的例子来演示一下tf.train.shuffle_batch()函数的使用。如图,假设我们在当前文件夹中已经有A.、B.、C三个子文件夹。并在每个文件夹下下面放置对应的图片。

针对这些文件我们需要做下面几步处理:

  • 读取所有图片文件,并存为TFRecord格式文件。
  • 我们读取记录文中的数据。使用tf.TFRecordReader类创建一个文件读取器,每执行一次read()方法会读取一个样本。
  • 使用tf.train.shuffle_batch()函数每次读取batch_size张图像数据。
  1. '''
  2. shuffle_batch()的使用
  3. '''
  4. import os
  5. import cv2
  6. def write_binary():
  7. '''
  8. 将默认路径下的所有图片存为TFRecord格式 保存到文件data.tfrecord中
  9. '''
  10. #获取当前工作目录
  11. cwd = os.getcwd()
  12. #当前目录下的子目录 一共有12张图片
  13. classes=['A','B','C']
  14. #创建对象 用于向记录文件写入记录
  15. writer = tf.python_io.TFRecordWriter('data.tfrecord')
  16. #遍历每一个子文件夹
  17. for index, name in enumerate(classes):
  18. class_path = os.path.join(cwd,name)
  19. #遍历子目录下的每一个文件
  20. for img_name in os.listdir(class_path):
  21. #每一个图片全路径
  22. img_path = os.path.join(class_path , img_name)
  23. #读取图像
  24. img = cv2.imread(img_path)
  25. #缩放
  26. img1 = cv2.resize(img,(250,250))
  27. #将图片转化为原生bytes
  28. img_raw = img1.tobytes()
  29. #将数据整理成 TFRecord 需要的数据结构
  30. example = tf.train.Example(features=tf.train.Features(feature={
  31. 'img_raw': tf.train.Feature(bytes_list=tf.train.BytesList(value=[img_raw])),
  32. "label": tf.train.Feature(int64_list=tf.train.Int64List(value=[index]))}))
  33. #序列化
  34. serialized = example.SerializeToString()
  35. #写入文件
  36. writer.write(serialized)
  37. writer.close()
  38. def read_and_decode(filename):
  39. '''
  40. 读取TFRecord格式格式文件,返回读取到的一张图像以及对应的标签
  41. args:
  42. filename:TFRecord格式文件路径
  43. '''
  44. #创建文件队列,不限读取的数量
  45. filename_queue = tf.train.string_input_producer([filename],shuffle=False)
  46. #创建一个文件读取器 从队列文件中读取数据
  47. reader = tf.TFRecordReader()
  48. #reader从 TFRecord 读取内容并保存到 serialized_example中
  49. _, serialized_example = reader.read(filename_queue)
  50. # 读取serialized_example的格式
  51. features = tf.parse_single_example(
  52. serialized_example,
  53. features={
  54. 'label': tf.FixedLenFeature([], tf.int64),
  55. 'img_raw': tf.FixedLenFeature([], tf.string)
  56. }
  57. )
  58. # 解析从 serialized_example 读取到的内容
  59. img=tf.decode_raw(features['img_raw'],tf.uint8)
  60. img = tf.reshape(img, [250, 250, 3])
  61. label = tf.cast(features['label'], tf.int32)
  62. return img,label
  63. #将默认路径下的所有图片存为TFRecord格式 保存到文件data.tfrecord中
  64. write_binary()
  65. #读取TFRecord格式格式文件,返回读取到的一张图像以及对应的标签
  66. img,label = read_and_decode('data.tfrecord')
  67. '''
  68. 读取批量数据 这里设置batch_size=12,即一次从内存队列中随机读取12张图片,这读取到的图片可能有重复的,
  69. 这主要是因为设置内存队列最小元素个数为100,最大元素个数为2000,而我们总共只有12张图片,所以队列中有许多重复的图片
  70. '''
  71. img_batch, label_batch = tf.train.shuffle_batch([img,label], batch_size=12, capacity=2000, min_after_dequeue=100, num_threads=2)
  72. with tf.Session() as sess:
  73. sess.run(tf.global_variables_initializer())
  74. #创建一个协调器,管理线程
  75. coord = tf.train.Coordinator()
  76. #启动QueueRunner, 此时文件名才开始进队。
  77. threads=tf.train.start_queue_runners(sess=sess,coord=coord)
  78. img, label = sess.run([img_batch, label_batch])
  79. for i in range(12):
  80. cv2.imwrite('%d_%d_p.jpg'%(i,label[i]),img[i])
  81. #终止线程
  82. coord.request_stop()
  83. coord.join(threads)

运行后:

我们在读取TFRecord文件时,一次读取12张图片,这主要是因为我们设置batch_size=12,并且我们可以看到读取到的12张图片是随机,这里出现了重复的。

8.实验

我们再用一个具体的例子感受tensorflow中的数据读取。如图,假设我们在当前文件夹中已经有A.jpg、B.jpg、C.jpg三张图片,我们希望读取这三张图片5个epoch并且把读取的结果重新存到read文件夹中。

  1. '''
  2. 测试
  3. '''
  4. tf.reset_default_graph()
  5. # 新建一个Session
  6. with tf.Session() as sess:
  7. # 我们要读三幅图片A.jpg, B.jpg, C.jpg
  8. filename = ['A.jpg', 'B.jpg', 'C.jpg']
  9. # string_input_producer会产生一个文件名队列
  10. filename_queue = tf.train.string_input_producer(filename, shuffle=True, num_epochs=5)
  11. # reader从文件名队列中读数据。对应的方法是reader.read
  12. reader = tf.WholeFileReader()
  13. key, value = reader.read(filename_queue)
  14. # tf.train.string_input_producer定义了一个epoch变量,要对它进行初始化
  15. tf.local_variables_initializer().run()
  16. # 使用start_queue_runners之后,才会开始填充队列
  17. threads = tf.train.start_queue_runners(sess=sess)
  18. for i in range(15):
  19. # 获取图片数据并保存
  20. image_data = sess.run(value)
  21. with open('read/test_%d.jpg' % (i+1), 'wb') as f:
  22. f.write(image_data)

我们这里使用filename_queue = tf.train.string_input_producer(filename, shuffle=False, num_epochs=5)建立了一个会跑5个epoch的文件名队列。并使用reader读取,reader每次读取一张图片并保存。

运行代码后,我们得到就可以看到read文件夹中的图片,正好是按顺序的5个epoch:

如果我们设置filename_queue = tf.train.string_input_producer(filename, shuffle=False, num_epochs=5)中的shuffle=True,那么在每个epoch内图像就会被打乱,如图所示:

我们这里只是用三张图片举例,实际应用中一个数据集肯定不止3张图片,不过涉及到的原理都是共通的。

完整代码:

  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on Wed May 2 20:39:25 2018
  4. @author: zy
  5. """
  6. import tensorflow as tf
  7. '''
  8. TensorFlow中队列的使用
  9. '''
  10. '''
  11. 下面是一个单独使用Queue的例子:
  12. '''
  13. #创建的图:一个先入先出队列,以及初始化,出队,+1,入队操作
  14. q = tf.FIFOQueue(3, "float")
  15. init = q.enqueue_many(([0.1, 0.2, 0.3],))
  16. x = q.dequeue()
  17. y = x + 1
  18. q_inc = q.enqueue([y])
  19. #开启一个session,session是会话,会话的潜在含义是状态保持,各种tensor的状态保持
  20. with tf.Session() as sess:
  21. sess.run(init)
  22. for i in range(2):
  23. sess.run(q_inc)
  24. quelen = sess.run(q.size())
  25. for i in range(quelen):
  26. print (sess.run(q.dequeue()))
  27. '''
  28. QueueRunner()的使用
  29. '''
  30. q = tf.FIFOQueue(10, "float")
  31. counter = tf.Variable(0.0) #计数器
  32. # 给计数器加一
  33. increment_op = tf.assign_add(counter, 1.0)
  34. # 将计数器加入队列
  35. enqueue_op = q.enqueue(counter)
  36. # 创建QueueRunner
  37. # 用多个线程向队列添加数据
  38. # 这里实际创建了4个线程,两个增加计数,两个执行入队
  39. qr = tf.train.QueueRunner(q, enqueue_ops=[increment_op, enqueue_op] * 2)
  40. '''
  41. #主线程
  42. with tf.Session() as sess:
  43. sess.run(tf.initialize_all_variables())
  44. #启动入队线程
  45. enqueue_threads = qr.create_threads(sess, start=True)
  46. #主线程
  47. for i in range(10):
  48. print (sess.run(q.dequeue()))
  49. '''
  50. # 主线程
  51. sess = tf.Session()
  52. sess.run(tf.initialize_all_variables())
  53. # 启动入队线程
  54. enqueue_threads = qr.create_threads(sess, start=True)
  55. # 主线程
  56. for i in range(0, 10):
  57. print(sess.run(q.dequeue()))
  58. '''
  59. Coordinator
  60. '''
  61. import threading, time
  62. # 子线程函数
  63. def loop(coord, id):
  64. t = 0
  65. while not coord.should_stop():
  66. print(id)
  67. time.sleep(1)
  68. t += 1
  69. # 只有1号线程调用request_stop方法
  70. if (t >= 2 and id == 0):
  71. coord.request_stop()
  72. # 主线程
  73. coord = tf.train.Coordinator()
  74. # 使用Python API创建10个线程
  75. threads = [threading.Thread(target=loop, args=(coord, i)) for i in range(10)]
  76. # 启动所有线程,并等待线程结束
  77. for t in threads: t.start()
  78. coord.join(threads)
  79. '''
  80. QueueRunner和Coordinator结合方式一
  81. '''
  82. '''
  83. import numpy as np
  84. # 1000个4维输入向量,每个数取值为1-10之间的随机数
  85. data = 10 * np.random.randn(1000, 4) + 1
  86. # 1000个随机的目标值,值为0或1
  87. target = np.random.randint(0, 2, size=1000)
  88. # 创建Queue,队列中每一项包含一个输入数据和相应的目标值
  89. queue = tf.FIFOQueue(capacity=50, dtypes=[tf.float32, tf.int32], shapes=[[4], []])
  90. # 批量入列数据(这是一个Operation)
  91. enqueue_op = queue.enqueue_many([data, target])
  92. # 出列数据(这是一个Tensor定义)
  93. data_sample, label_sample = queue.dequeue()
  94. # 创建包含4个线程的QueueRunner
  95. qr = tf.train.QueueRunner(queue, [enqueue_op] * 4)
  96. with tf.Session() as sess:
  97. # 创建Coordinator
  98. coord = tf.train.Coordinator()
  99. # 启动QueueRunner管理的线程
  100. enqueue_threads = qr.create_threads(sess, coord=coord, start=True)
  101. # 主线程,消费100个数据
  102. for step in range(100):
  103. if coord.should_stop():
  104. break
  105. data_batch, label_batch = sess.run([data_sample, label_sample])
  106. # 主线程计算完成,停止所有采集数据的进程
  107. coord.request_stop()
  108. coord.join(enqueue_threads)
  109. '''
  110. '''
  111. QueueRunner和Coordinator结合的数据读取机制 读取CSV文件
  112. '''
  113. tf.reset_default_graph()
  114. # 同时打开多个文件(文件格式必须一样),隐式创建Queue,同时隐含了QueueRunner的创建
  115. filename_queue = tf.train.string_input_producer(["iris_0.csv","iris_1.csv"])
  116. reader = tf.TextLineReader()
  117. # Tensorflow的Reader对象可以直接接受一个Queue作为输入 每次read的执行都会从文件中读取一行内容
  118. key, value = reader.read(filename_queue)
  119. # 如果某一列为空,指定默认值,同时指定了默认列的类型
  120. record_defaults = [[0.0], [0.0], [0.0], [0.0], [0]]
  121. # decode_csv 操作会解析读取的一行内容并将其转为张量列表
  122. col1, col2, col3, col4, col5 = tf.decode_csv(value, record_defaults=record_defaults)
  123. features = [col1, col2, col3, col4]
  124. #获取一行数据
  125. #row = tf.decode_csv(value, record_defaults=record_defaults)
  126. with tf.Session() as sess:
  127. coord = tf.train.Coordinator()
  128. # 启动计算图中所有的队列线程 调用tf.train.start_queue_runners来将文件名填充到队列,否则read操作会被阻塞到文件名队列中有值为止。
  129. threads = tf.train.start_queue_runners(coord=coord)
  130. # 主线程,消费50个数据
  131. for _ in range(50):
  132. example, label = sess.run([features, col5])
  133. print('Step {0} {1} {2}'.format(_,example,label))
  134. # 主线程计算完成,停止所有采集数据的进程
  135. coord.request_stop()
  136. #指定等待某个线程结束
  137. coord.join(threads)
  138. '''
  139. shuffle_batch()的使用
  140. '''
  141. import os
  142. import cv2
  143. def write_binary():
  144. '''
  145. 将默认路径下的所有图片存为TFRecord格式 保存到文件data.tfrecord中
  146. '''
  147. #获取当前工作目录
  148. cwd = os.getcwd()
  149. #当前目录下的子目录 一共有12张图片
  150. classes=['A','B','C']
  151. #创建对象 用于向记录文件写入记录
  152. writer = tf.python_io.TFRecordWriter('data.tfrecord')
  153. #遍历每一个子文件夹
  154. for index, name in enumerate(classes):
  155. class_path = os.path.join(cwd,name)
  156. #遍历子目录下的每一个文件
  157. for img_name in os.listdir(class_path):
  158. #每一个图片全路径
  159. img_path = os.path.join(class_path , img_name)
  160. #读取图像
  161. img = cv2.imread(img_path)
  162. #缩放
  163. img1 = cv2.resize(img,(250,250))
  164. #将图片转化为原生bytes
  165. img_raw = img1.tobytes()
  166. #将数据整理成 TFRecord 需要的数据结构
  167. example = tf.train.Example(features=tf.train.Features(feature={
  168. 'img_raw': tf.train.Feature(bytes_list=tf.train.BytesList(value=[img_raw])),
  169. "label": tf.train.Feature(int64_list=tf.train.Int64List(value=[index]))}))
  170. #序列化
  171. serialized = example.SerializeToString()
  172. #写入文件
  173. writer.write(serialized)
  174. writer.close()
  175. def read_and_decode(filename):
  176. '''
  177. 读取TFRecord格式格式文件,返回读取到的一张图像以及对应的标签
  178. args:
  179. filename:TFRecord格式文件路径
  180. '''
  181. #创建文件队列,不限读取的数量
  182. filename_queue = tf.train.string_input_producer([filename],shuffle=False)
  183. #创建一个文件读取器 从队列文件中读取数据
  184. reader = tf.TFRecordReader()
  185. #reader从 TFRecord 读取内容并保存到 serialized_example中
  186. _, serialized_example = reader.read(filename_queue)
  187. # 读取serialized_example的格式
  188. features = tf.parse_single_example(
  189. serialized_example,
  190. features={
  191. 'label': tf.FixedLenFeature([], tf.int64),
  192. 'img_raw': tf.FixedLenFeature([], tf.string)
  193. }
  194. )
  195. # 解析从 serialized_example 读取到的内容
  196. img=tf.decode_raw(features['img_raw'],tf.uint8)
  197. img = tf.reshape(img, [250, 250, 3])
  198. label = tf.cast(features['label'], tf.int32)
  199. return img,label
  200. tf.reset_default_graph()
  201. #将默认路径下的所有图片存为TFRecord格式 保存到文件data.tfrecord中
  202. write_binary()
  203. #读取TFRecord格式格式文件,返回读取到的一张图像以及对应的标签
  204. img,label = read_and_decode('data.tfrecord')
  205. '''
  206. 读取批量数据 这里设置batch_size=12,即一次从内存队列中随机读取12张图片,这读取到的图片可能有重复的,
  207. 这主要是因为设置内存队列最小元素个数为100,最大元素个数为2000,而我们总共只有12张图片,所以队列中有许多重复的图片
  208. '''
  209. img_batch, label_batch = tf.train.shuffle_batch([img,label], batch_size=12, capacity=2000, min_after_dequeue=100, num_threads=2)
  210. with tf.Session() as sess:
  211. sess.run(tf.global_variables_initializer())
  212. #创建一个协调器,管理线程
  213. coord = tf.train.Coordinator()
  214. #启动QueueRunner, 此时文件名才开始进队。
  215. threads=tf.train.start_queue_runners(sess=sess,coord=coord)
  216. img, label = sess.run([img_batch, label_batch])
  217. for i in range(12):
  218. cv2.imwrite('%d_%d_p.jpg'%(i,label[i]),img[i])
  219. #终止线程
  220. coord.request_stop()
  221. coord.join(threads)
  222. '''
  223. 测试
  224. '''
  225. tf.reset_default_graph()
  226. # 新建一个Session
  227. with tf.Session() as sess:
  228. # 我们要读三幅图片A.jpg, B.jpg, C.jpg
  229. filename = ['A.jpg', 'B.jpg', 'C.jpg']
  230. # string_input_producer会产生一个文件名队列
  231. filename_queue = tf.train.string_input_producer(filename, shuffle=True, num_epochs=5)
  232. # reader从文件名队列中读数据。对应的方法是reader.read
  233. reader = tf.WholeFileReader()
  234. key, value = reader.read(filename_queue)
  235. # tf.train.string_input_producer定义了一个epoch变量,要对它进行初始化
  236. tf.local_variables_initializer().run()
  237. # 使用start_queue_runners之后,才会开始填充队列
  238. threads = tf.train.start_queue_runners(sess=sess)
  239. for i in range(15):
  240. # 获取图片数据并保存
  241. image_data = sess.run(value)
  242. with open('read/test_%d.jpg' % (i+1), 'wb') as f:
  243. f.write(image_data)
View Code

七 读取原始图片转换为小批量大小的样本数据

假如我们现在需要对猫和狗的图片进行分类,我们已经收集了许多猫和狗的图片,首先我们需要建立一个文件夹命名为data,在该文件夹下面创建两个子文件夹train,test分别用于保存测试集和训练集图片,然后还需要在每个文件夹下面创建两个文件夹,分别命名cat和dog,用来存放对应类别的图片。

有了这些图片之后,我们想每次读取指定batch_size大小得数据样本,并且这些样本是打乱的。

  • 我们把这些文件保存为TFRecord格式文件
  • 从TFRecord文件中读取batch_size样本集。

代码如下:

  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on Thu May 10 16:48:17 2018
  4. @author: zy
  5. """
  6. '''
  7. 如何将给定的数据原始图片以及标签保存成TFRecord格式的文件
  8. 并使用队列每次读取batch_size大小样本集
  9. '''
  10. import tensorflow as tf
  11. import os
  12. import cv2
  13. import random
  14. import numpy as np
  15. #随机种子,使得每次运行结果一样
  16. random.seed(0)
  17. def get_files(dirpath):
  18. '''
  19. 获取文件相对路径和标签(非one_hot) 返回一个元组
  20. args:
  21. dirpath:数据所在的目录 记做父目录
  22. 假设有10类数据,则父目录下有10个子目录,每个子目录存放着对应的图片
  23. '''
  24. #保存读取到的的文件和标签
  25. image_list = []
  26. label_list = []
  27. #遍历子目录
  28. classes = [x for x in os.listdir(dirpath) if os.path.isdir(dirpath)]
  29. #遍历每一个子文件夹
  30. for index, name in enumerate(classes):
  31. #子文件夹路径
  32. class_path = os.path.join(dirpath,name)
  33. #遍历子目录下的每一个文件
  34. for img_name in os.listdir(class_path):
  35. #每一个图片全路径
  36. img_path = os.path.join(class_path , img_name)
  37. #追加
  38. image_list.append(img_path)
  39. label_list.append(index)
  40. #保存打乱后的文件和标签
  41. images = []
  42. labels = []
  43. # 打乱文件顺序 连续打乱两次
  44. indices = list(range(len(image_list)))
  45. random.shuffle(indices)
  46. for i in indices:
  47. images.append(image_list[i])
  48. labels.append(label_list[i])
  49. random.shuffle([images,labels])
  50. print('样本长度为:',len(images))
  51. #print(images[0:10],labels[0:10])
  52. return images, labels
  53. '''
  54. 生成数据的格式
  55. 先生成 TFRecord 格式的样例数据,Example 的结构如下,表示第1个文件中的第1个数据
  56. {
  57. 'i':0,
  58. 'j':0
  59. }
  60. '''
  61. def WriteTFRecord(dirpath,dstpath='.',train_data=True,IMAGE_HEIGHT=227,IMAGE_WIDTH=227):
  62. '''
  63. 把指定目录下的数据写入同一个TFRecord格式文件中
  64. args:
  65. dirpath:数据所在的目录 记做父目录
  66. 假设有10类数据,则父目录下有10个子目录,每个子目录存放着对应的图片
  67. dstpath:保存TFRecord文件的目录
  68. train_data:表示传入的是不是训练集文件所在路径
  69. IMAGE_HEIGHT:保存的图片数据高度
  70. IMAGE_WIDTH:保存的图片数据宽度
  71. '''
  72. if not os.path.isdir(dstpath):
  73. os.mkdir(dstpath)
  74. #获取所有数据文件路径,以及对应标签
  75. image_list, label_list = get_files(dirpath)
  76. #把海量数据写入多个TFrecord文件
  77. length_per_shard = 10000 #每个记录文件的样本长度
  78. num_shards = int(np.ceil(len(image_list) / length_per_shard))
  79. print('记录文件个数:',num_shards)
  80. '''
  81. 当所有数类别图片都在一个文件夹下面时,可以将数据写入不同的文件
  82. 但是如果同一类别的图片放在相同的文件下,就不可以将数据写入不同的文件
  83. 这主要是因为后者保存的TFRecord文件中都是同一类别,而队列取数据时,是从一个文件读取完,才会读取另一个文件,
  84. 这样会导致一次读取的batch_size图像都是同一类别,对训练不利
  85. 因此我们必须想个办法让一个TFRecord格式的文件包含各种类别的图片,并且顺序是打乱的
  86. '''
  87. #依次写入每一个TFRecord文件
  88. for index in range(num_shards):
  89. #按0000n-of-0000m的后缀区分文件。n代表当前文件标号,没代表文件总数
  90. if train_data:
  91. filename = os.path.join(dstpath,'train_data.tfrecord-%.5d-of-%.5d'%(index,num_shards))
  92. else:
  93. filename = os.path.join(dstpath,'test_data.tfrecord-%.5d-of-%.5d'%(index,num_shards))
  94. print(filename)
  95. #创建对象 用于向记录文件写入记录
  96. writer = tf.python_io.TFRecordWriter(filename)
  97. #起始索引
  98. idx_start = index*length_per_shard
  99. #结束索引
  100. idx_end = np.min([(index+1)*length_per_shard - 1,len(image_list)])
  101. #遍历子目录下的每一个文件
  102. for img_path,label in zip(image_list[idx_start:idx_end], label_list[idx_start:idx_end]):
  103. #读取图像
  104. img = cv2.imread(img_path)
  105. '''
  106. 在这里可以对图片进行处理,也可以扩大数据集,或者归一化输入等待,不过我在这里不对原始图片进行其它处理,只是把图片大小设置为固定的
  107. '''
  108. #缩放
  109. img = cv2.resize(img,(IMAGE_HEIGHT,IMAGE_WIDTH))
  110. #将图片转化为原生bytes
  111. image = img.tobytes()
  112. #将数据整理成 TFRecord 需要的数据结构
  113. example = tf.train.Example(features=tf.train.Features(feature={
  114. 'image': tf.train.Feature(bytes_list=tf.train.BytesList(value=[image])),
  115. "label": tf.train.Feature(int64_list=tf.train.Int64List(value=[label]))}))
  116. #序列化
  117. serialized = example.SerializeToString()
  118. #写入文件
  119. writer.write(serialized)
  120. writer.close()
  121. def read_and_decode(filename,num_epochs = None,IMAGE_HEIGHT=227,IMAGE_WIDTH=227):
  122. '''
  123. 读取TFRecord格式格式文件,返回读取到的一张图像以及对应的标签
  124. args:
  125. filename:TFRecord格式文件路径 list列表
  126. num_epochs:每个数据集文件迭代轮数
  127. IMAGE_HEIGHT:保存的图片数据高度
  128. IMAGE_WIDTH:保存的图片数据宽度
  129. '''
  130. '''
  131. 创建文件队列,通过设置 shuffle 参数为 True,将文件的入队顺序打乱,所以出队顺序是随机的。随机打乱文件顺序和入队操作
  132. 会跑在一个单独的线程上,不会影响出队的速度.
  133. 当输入队列中的所有文件都处理完后,它会将文件列表中的文件重新加入队列。可以通过设置 num_epochs 参数来限制加载初始
  134. 文件列表的最大轮数
  135. '''
  136. filename_queue = tf.train.string_input_producer(filename,shuffle=False,num_epochs = num_epochs)
  137. #创建一个文件读取器 从队列文件中读取数据
  138. reader = tf.TFRecordReader()
  139. #reader从 TFRecord 读取内容并保存到 serialized_example中
  140. _, serialized_example = reader.read(filename_queue)
  141. # 读取serialized_example的格式
  142. features = tf.parse_single_example(
  143. serialized_example,
  144. features={
  145. 'image': tf.FixedLenFeature([], tf.string),
  146. 'label': tf.FixedLenFeature([], tf.int64)
  147. }
  148. )
  149. # 解析从 serialized_example 读取到的内容
  150. img = tf.decode_raw(features['image'],tf.uint8)
  151. img = tf.reshape(img, [IMAGE_HEIGHT, IMAGE_WIDTH, 3])
  152. '''
  153. 在这里可以对读取到的图片数据进行预处理,比如归一化输入,PCA处理等,但是不可以增加数据
  154. '''
  155. label = tf.cast(features['label'], tf.int32)
  156. return img,label
  157. def input_data(filenames,num_epochs=None,batch_size=256, capacity=4096, min_after_dequeue=1024, num_threads=10):
  158. '''
  159. 读取小批量batch_size数据
  160. args:
  161. filenames:TFRecord文件路径组成的list
  162. num_epochs:每个数据集文件迭代轮数
  163. batch_size:小批量数据大小
  164. capacity:内存队列元素最大个数
  165. min_after_dequeue:内存队列元素最小个数
  166. num_threads:线城数
  167. '''
  168. '''
  169. 读取批量数据 这里设置batch_size,即一次从内存队列中随机读取batch_size张图片,这里设置内存队列最小元素个数为1024,最大元素个数为4096
  170. shuffle_batch 函数会将数据顺序打乱
  171. bacth 函数不会将数据顺序打乱
  172. '''
  173. img,label = read_and_decode(filenames,num_epochs)
  174. images_batch, labels_batch = tf.train.shuffle_batch([img,label], batch_size=batch_size, capacity=capacity, min_after_dequeue=batch_size*5, num_threads=num_threads)
  175. return images_batch,labels_batch
  176. def file_match(s,root='.'):
  177. '''
  178. 寻找指定目录下(不包含子目录)中的文件名含有指定字符串的文件,并打印出其相对路径
  179. args:
  180. s:要匹配的字符串
  181. root : 指定要搜索的目录
  182. return:返回符合条件的文件列表
  183. '''
  184. #用来保存目录
  185. dirs=[]
  186. #用来保存匹配字符串的文件
  187. matchs=[]
  188. for current_name in os.listdir(root):
  189. add_root_name = os.path.join(root,current_name)
  190. if os.path.isdir(add_root_name):
  191. dirs.append(add_root_name)
  192. elif os.path.isfile(add_root_name) and s in add_root_name:
  193. matchs.append(add_root_name)
  194. '''
  195. #这里用来递归搜索子目录的
  196. for dir in dirs:
  197. file_match(s,dir)
  198. '''
  199. return matchs
  200. '''
  201. 测试
  202. '''
  203. if __name__ == '__main__':
  204. #训练集数据所在的目录
  205. dirpath = './data/train'
  206. training_step = 1
  207. '''
  208. 判断训练测试集TFRecord格式文件是否存在,不存在则生成
  209. 如果存在,直接读取
  210. '''
  211. # 获取当前目录下包含指定字符串的文件列表
  212. files = file_match('train_data.tfrecord')
  213. #判断数据集是否存在
  214. if len(files) == 0:
  215. print('开始读图片文件并写入TFRecord格式文件中.........')
  216. #将指定路径下所有图片存为TFRecord格式 保存到文件data.tfrecord中
  217. WriteTFRecord(dirpath)
  218. print('写入完毕!\n')
  219. #正则表达式匹配
  220. files = tf.train.match_filenames_once('./train_data.tfrecord')
  221. #读取TFRecord格式格式文件,返回读取到的batch_size图像以及对应的标签
  222. images_batch, labels_batch = input_data(files)
  223. with tf.Session() as sess:
  224. sess.run(tf.global_variables_initializer())
  225. #创建一个协调器,管理线程
  226. coord = tf.train.Coordinator()
  227. #启动QueueRunner, 此时文件名才开始进队
  228. threads = tf.train.start_queue_runners(sess=sess,coord=coord)
  229. print('开始训练!\n')
  230. for step in range(training_step):
  231. img, label = sess.run([images_batch, labels_batch])
  232. print('setp :',step)
  233. for i in range(256):
  234. cv2.imwrite('%d_%d_p.jpg'%(i,label[i]),img[i])
  235. #终止线程
  236. coord.request_stop()
  237. coord.join(threads)

程序运行后,会生成三个TFRecord文件,如下:

并且我们生成了了一个小批量样本大小的样本图片:

 参考文章

[1]Tensorflow读取数据的4种方式(8)---《深度学习》

[2]数据读取

[3]tensorflow学习——tfreader格式,队列读取数据tf.train.shuffle_batch()

[4]十图详解tensorflow数据读取机制(附代码)

[5]Tensorflow10下如何玩转TFRecord?!比赛学习必备,有源码!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/123849
推荐阅读
相关标签
  

闽ICP备14008679号