当前位置:   article > 正文

一、TF2.x数据加载处理pipeline_tf2 dataset map

tf2 dataset map

前言

  • 本文内容主要搬运自官方文档
  • 意在梳理相关内容,解决工作中遇到的问题:提升数据加载、处理效率
  • 本文主要围绕tf.record、tf.data、tf.keras展开梳理,会涉及一些个人demo

1. tf.record & tf.Example

  • 对数据进行序列化并将其存储在一组可线性读取的文件(每个文件 100-200MB)中。这尤其适用于通过网络进行流式传输的数据。这种做法对缓冲任何数据预处理也十分有用。
  • tf.Example 消息(或 protobuf)是一种灵活的消息类型,表示 {“string”: value} 映射。
  • 如果使用tf.data并且读取数据是目前训练的瓶颈,官方介绍了一些优化方法

1.1 tf.Example基本概念

从根本上讲,tf.Example 是 {“string”: tf.train.Feature} 映射

  • tf.train.Feature 消息类型可以接受以下三种类型
类型可转换自
tf.train.BytesListString Byte
tf.train.FloatListfloat double
tf.train.Int64Listbool enum int32 uint32 int64 unint64
  • 可以通过快捷函数将标量的输入值转化为上述三个类型之一
    • def _bytes_feature(value)
    • def _float_feature(value)
    • def _int64_feature(value)
  • 对于非标量,可转换为二进制字符串后再输入
  • 最后,使用.SerializeToString方法将所有的消息,序列化成二进制字符串
import tensorflow as tf
import numpy as np

## !!单条数据转换!!
def _bytes_feature(value):
  """Returns a bytes_list from a string / byte."""
  if isinstance(value, type(tf.constant(0))): #  tensorflow.python.framework.ops.EagerTensor
    value = value.numpy() # BytesList won't unpack a string from an EagerTensor.
  return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def _float_feature(value):
  """Returns a float_list from a float / double."""
  return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))

def _int64_feature(value):
  """Returns an int64_list from a bool / enum / int / uint."""
  return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

# 测试
print(_bytes_feature(b'test_string'))
print(_bytes_feature(u'test_bytes'.encode('utf-8')))

print(_float_feature(np.exp(1)))

print(_int64_feature(True))
print(_int64_feature(1))

feature = _float_feature(np.exp(1))
feature.SerializeToString()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

1.2 如何创建tf.Example消息

  • 目标:如何把一个平平无奇的观测值(文本、整数、浮点数…)转化为tf.Example消息 以及 解析
  • 我们的原始数据可能来自任何地方、有各种存储格式,但是从单个观测值来看,创建tf.Example消息的过程是相同的
  • tf.Example 消息只是 Features 消息外围的包装器
# 创建一个包含若干数据类型的数据集
n_observations = int(1e4)
feature0 = np.random.choice([False, True], n_observations) # bool
feature1 = np.random.randint(0, 5, n_observations) # int
strings = np.array([b'cat', b'dog', b'chicken', b'horse', b'goat'])
feature2 = strings[feature1]  # byte
feature3 = np.random.randn(n_observations) # float

def serialize_example(feature0, feature1, feature2, feature3):
  """
  Creates a tf.Example message ready to be written to a file.
  """
  # Create a dictionary mapping the feature name to the tf.Example-compatible
  # data type.
  feature = {
      'feature0': _int64_feature(feature0),
      'feature1': _int64_feature(feature1),
      'feature2': _bytes_feature(feature2),
      'feature3': _float_feature(feature3),
  }

  # Create a Features message using tf.train.Example.

  example_proto = tf.train.Example(features=tf.train.Features(feature=feature))
  return example_proto.SerializeToString()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 解码消息:tf.train.Example.FromString
example_observation = []
serialized_example = serialize_example(False, 4, b'goat', 0.9876)
example_proto = tf.train.Example.FromString(serialized_example)  # 和.SerializeToString()对应
example_proto
  • 1
  • 2
  • 3
  • 4

1.3 TFRecord格式信息

  • 一个TFRecord文件包含一系列记录,每个记录包含:字节字符串、数据长度、CRC32C哈希值
  • 适用于:非结构化数据,文本、图像,或序列化张量
  • 如何加载:使用tf.io.serialize_tensor、tf.io.parse_tensor

1.4 使用 tf.data 的 TFRecord 文件

tf.data 模块还提供用于在 TensorFlow 中读取和写入数据的工具

1) 写入 TFRecord 文件
  • 普通数据(tuple list 1D 2D)放入数据集dataset中,可使用tf.data.Dataset.from_slice_tensor方法
  • 使用 tf.data.Dataset.map 方法可将函数应用于 Dataset 的每个元素,比如封装好的tf.Example消息。但是,映射函数必须在计算图模式下运行,即在tf.Tensors上运算,因此对普通的python函数可以用tf.py_function封装一下,以此兼容
# 普通数据转成dataset后使用map方式解析
features_dataset = tf.data.Dataset.from_tensor_slices((feature0, feature1, feature2, feature3))
for f0,f1,f2,f3 in features_dataset.take(1):
    print(f0, f1, f2, f3)

def tf_serialize_example(f0,f1,f2,f3):
    tf_string = tf.py_function(
        serialize_example,
        (f0,f1,f2,f3),  # pass these args to the above function.
        tf.string)      # the return type is `tf.string`.
    return tf.reshape(tf_string, ()) # The result is a scalar

serialized_features_dataset = features_dataset.map(tf_serialize_example)
serialized_features_dataset
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • generator 式处理
    • 使用 TFRecordDataset 对于标准化输入数据和优化性能十分有用
# 做一个generator,将数据写入tfrecord中
def generator():
	for features in features_dataset:
        yield serialize_example(*features)

serialized_features_dataset = tf.data.Dataset.from_generator(
    generator, output_types=tf.string, output_shapes=())

# 将处理好的生成器-dataset写入TFRecord
filename = 'test.tfrecord'
writer = tf.data.experimental.TFRecordWriter(filename)
writer.write(serialized_features_dataset)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
2) 读取TFRecord文件
  • tf.data使用TFRecord实例
  • 使用 tf.data.TFRecordDataset 类来读取 TFRecord 文件

注:在 tf.data.Dataset 上进行迭代仅在启用了 Eager Execution 时有效。

# 创建TFRecordDataset,每个文件是一个TFRecord
filenames = [filename]
raw_dataset = tf.data.TFRecordDataset(filenames)
for raw_record in raw_dataset.take(10):
  print(repr(raw_record))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 使用tf.io来解析
    • 数据集采用计算图执行,需要描述数据形状和类型签名
    • 思考:观测值是数组时,怎么解析??
# Create a description of the features.
feature_description = {
    'feature0': tf.io.FixedLenFeature([], tf.int64, default_value=0),
    'feature1': tf.io.FixedLenFeature([], tf.int64, default_value=0),
    'feature2': tf.io.FixedLenFeature([], tf.string, default_value=''),
    'feature3': tf.io.FixedLenFeature([], tf.float32, default_value=0.0),
}

# 单条解析
def _parse_function(example_proto):
  # Parse the input `tf.Example` proto using the dictionary above.
  return tf.io.parse_single_example(example_proto, feature_description)

parsed_dataset = raw_dataset.map(_parse_function)
for parsed_record in parsed_dataset.take(10):
  print(repr(parsed_record))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

1.5 使用Python写入读取TFRecord

  • tf.io模块
1) 写入:将每个观测转化为tf.Example
# for循环写入数据
with tf.io.TFRecordWriter(filename) as writer:
  for i in range(n_observations):
    example = serialize_example(feature0[i], feature1[i], feature2[i], feature3[i])
    writer.write(example)
  • 1
  • 2
  • 3
  • 4
  • 5
2) 读取: tf.train.Example.ParseFromString 轻松解析序列化张量
filenames = [filename]
raw_dataset = tf.data.TFRecordDataset(filenames)
for raw_record in raw_dataset.take(1):
  example = tf.train.Example()
  example.ParseFromString(raw_record.numpy())
  print(example)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

1.6 小结

上述读写操作,粗看类似“八股文”,但在实际操作过程中会遇到不少问题,个人总结了几点需要注意的地方

  • 数据单条写入?目前我在实际使用中,采取spark并行化逐条写入tfrecord文件,发现文件所占空间非常大。尝试批量写入,体积显著降低,至于后续解析等操作还没有尝试过,有兴趣的朋友可以试试
  • 解析TFRecord:上述介绍了两种读取、解析TFRecord的方式(tf.io.parse_single_example、example.ParseFromString()),那么具体用哪个更合适呢?我目前使用的是tf.io.parse_single_example
  • “翻译”tfrecord过程中的各种格式问题
  • 重点关注:tf.io.FixedLenFeature的几个参数
  • tf.io.parse_single_example解析结果的后续处理,可以参考下述demo
  • 多输入、多输出的模型结构,应该如何组合x, y
# 原始观测值:0维
def parse_exmp(example_proto, column_list, data_type, default_value):
    fea_description = {}
    for name in column_list:
        fea_description[name] = tf.io.FixedLenFeature(
            					shape=(), dtype=data_type, 
            					default_value=default_value)
    out = tf.io.parse_single_example(example_proto, fea_description) 
    fea_list = []
    for name in col_list:
        fea_list.append(tf.reshape(out[name], (1,)))
    out = tf.concat(fea_list, axis=0)
    return out

# 原始观测值:1*5维的数组
def parse_exmp_array(example_proto, column_list, data_type, default_value):
    fea_description = {}
    for name in column_list:
        fea_description[name] = tf.io.FixedLenSequenceFeature(
            					shape=[5], dtype=data_type, 
                                allow_missing=True, default_value=-1)
    out = tf.io.parse_single_example(example_proto, fea_description) 
    fea_list = []
    for name in col_list:
        fea_list.append(tf.reshape(out[name], (1,5)))
    out = tf.concat(fea_list, axis=0)
    return out

file_names = [file_name]
dataset = tf.data.TFRecordDataset(file_names)
num_fea = dataset.map(lambda x: parse_exmp(x, numeric_columns, tf.float32, 0.0))
cate_fea = dataset.map(lambda x: parse_exmp(x, cate_columns, tf.int64, 0))
arr_fea = dataset.map(lambda x: parse_exmp_array(x, arr_columns, tf.int64, -1))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

2. tf.data 构建Tensorflow输入管道

  • 上文1. tf.record & tf.Example 主要介绍数据源为TFRecord时,数据的读取、解析、tf.data输入管道的构建。这小节简单梳理一下数据源为其他格式时的数据管道构建过程,具体操作见官方文档

2.1 基本概念

1. 数据来源

Dataset对象是一个 Python 可迭代对象,创建数据集(dataset)有两种不同的方法:

  • 来自内存/文件:数据源Dataset从存储在内存中或一个或多个文件中的数据构造一个
    • tf.data.Dataset.from_tensors()
    • tf.data.Dataset.from_tensor_slices()
    • tf.data.TFRecordDataset()
    • … …
  • 来自已存在的Dataset对象:从一个或多个tf.data.Dataset对象构造数据集
    • Dataset.map()
    • Dataset.batch()
    • Dataset.reduce()
    • … …
2. 数据结构
  • 查看dataset结构:Dataset.element_spec
  • 查看元素类型:Dataset.element_spec.value_type

2.2 读取数据

1)Numpy数据

如果输入数据可以放在内存中,创建Dataset的最简单方法是使用Dataset.from_tensor_slices()
在这里插入图片描述

2)Python 生成器
  • 注意:可移植性和可扩展性有限,必须在创建生成器的同一个 python 进程中运行,效率受限
  • output_shapes 参数需要注意,批量处理非定长数据时,需要利用.padded_batch

在这里插入图片描述

3)TFRecord数据
  • tf.data.TFRecordDataset类以一个或多个TFRecord文件的内容流作为输入管道的一部分
  • 使用tf.Example消息解析
  • 上文详述过,不再赘述

4)文本数据
  • tf.data.TextLineDataset:提供了一种简单的方法来提取一个或多个文本文件行。给定一个或多个文件名,TextLineDataset将在这些文件的每一行生成一个字符串值元素
  • Dataset.interleave:将文件混合在一起,便于并行化处理文件

  • 一些必要的文本读取处理:
    • Dataset.skip(k):跳过前k行
    • Dataset.filter(func):筛选行

5)CSV数据
  • 从内存转化:tf.data.Dataset.from_tensor_slices(dict(df))
  • 从磁盘加载:
    • tf.data模块提供了从一个或多个符合RFC 4180的CSV 文件中提取记录的方法
    • 比较高级的API:tf.data.experimental.make_csv_dataset,支持batch、select columns、指定label列
    • 较低级的API:tf.data.experimental.CsvDataset

6)常用dataset操作API
  • batch、unbatch、padded_batch、shuffle、repeat
  • 数据预处理:Dataset.map(func) 、 tf.py_function、zip、window、flat_map
  • 重采样:处理类非常不平衡的数据集
    • eg: tf.data.experimental.sample_from_datasets([negative_ds, positive_ds], [0.5, 0.5]).batch(10)
    • tf.data.experimental.rejection_resample
    • 有需求再仔细看看

3. 案例demo:使用TFRecord + tf.data + tf.keras

  • 没有实际数据输入,不保证能运行hhhhh
  • 确定数据来源自 2.2
  • 使用.fit() .evaluate() .predict() 完成模型训练、验证、预测
# 原始观测值:0维
def parse_exmp(example_proto, column_list, data_type, default_value):
    fea_description = {}
    for name in column_list:
        fea_description[name] = tf.io.FixedLenFeature(
            					shape=(), dtype=data_type, 
            					default_value=default_value)
    out = tf.io.parse_single_example(example_proto, fea_description) 
    fea_list = []
    for name in col_list:
        fea_list.append(tf.reshape(out[name], (1,)))
    out = tf.concat(fea_list, axis=0)
    return out

# 原始观测值:1*5维的数组
def parse_exmp_array(example_proto, column_list, data_type, default_value):
    fea_description = {}
    for name in column_list:
        fea_description[name] = tf.io.FixedLenSequenceFeature(
            					shape=[5], dtype=data_type, 
                                allow_missing=True, default_value=-1)
    out = tf.io.parse_single_example(example_proto, fea_description) 
    fea_list = []
    for name in col_list:
        fea_list.append(tf.reshape(out[name], (1,5)))
    out = tf.concat(fea_list, axis=0)
    return out

file_names = [file_name]
dataset = tf.data.TFRecordDataset(file_names)
num_fea = dataset.map(lambda x: parse_exmp(x, numeric_columns, tf.float32, 0.0))
cate_fea = dataset.map(lambda x: parse_exmp(x, cate_columns, tf.int64, 0))
arr_fea = dataset.map(lambda x: parse_exmp_array(x, arr_columns, tf.int64, -1))

## 把输入整理成模型需要的格式
data_set = tf.data.Dataset.zip(((num_fea, cate_fea), arr_fea))
data_set = data_set.shuffle(buffer_size = buffer_size).batch(batch_size)


print("_____开始构建模型_____")

## 双输入
inputs_num_col = Input(shape=(len(numeric_columns),), name = "num_col", dtype='float32')
inputs_cate_col = Input(shape=(len(cate_columns),), name = "cate_col", dtype='int32')

## 自定义的Embedding层,忽略
emb = MyEmb(emb_map, hash_emb_map)(inputs_cate_col)
feature_input = tf.concat([emb, inputs_num_col], axis = -1, 
                          name='concat_cate_num_feature')

## 自定义的MMOE层,忽略
mmoe_layers = MMoE(units=layer_units, num_experts=num_experts, num_tasks=num_tasks)(feature_input)

## 自定义Tower层,忽略
CTR_logits = Tower(layer_num=tower_num_layer, layer_units=tower_num_layer_units, 
                   activation='relu', name='CTR_logits')(mmoe_layers[0])
CTCVR_logits = Tower(layer_num=tower_num_layer, layer_units=tower_num_layer_units, 
                     activation='relu', name='CVR_logits')(mmoe_layers[1])
output = tf.concat([CTR_logits, CTCVR_logits], axis = 1, name='output')
model = tf.keras.Model(inputs=[inputs_num_col, inputs_cate_col], outputs=output)

model.compile(loss='binary_crossentropy',
              optimizer=tf.keras.optimizers.Adam(learning_rate),
              metrics=['accuracy'])

## 模型训练
model.fit(x=data_set, epochs=epoch)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67

原文链接:https://www.yuque.com/docs/share/518e1fc9-b530-4e9e-a38e-6611f5c71346?# 《一、TF2.x数据加载处理pipeline》
内容多搬运自官方文档,建议具体细节、最新更新直接点击链接查看官方说明
作者邮箱:hu.yf@outlook.com 欢迎交流~

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/812235
推荐阅读
相关标签
  

闽ICP备14008679号