赞
踩
本文完整代码见: https://github.com/cdj0311/two_tower_recommendation_system
Tensorflow estimator实现分布式训练很简单,只需要将数据进行相应的切分丢给模型就可以很方便的完成分布式训练了。以下代码是一个完整的推荐算法模板,可根据自己的需要修改数据读取和模型结构部分,tensorflow==1.13.1。
1. 特征处理部分,feature_processing.py
- #coding:utf-8
- import tensorflow as tf
- from tensorflow import feature_column as fc
- import config
-
- FLAGS = config.FLAGS
-
- class FeatureConfig(object):
- def __init__(self):
- self.user_columns = dict()
- self.item_columns = dict()
- self.feature_spec = dict()
-
- def create_features_columns(self):
- # 向量类特征
- user_vector = fc.numeric_column(key="user_vector", shape=(128,), default_value=[0.0] * 128, dtype=tf.float32)
- item_vector = fc.numeric_column(key="item_vector", shape=(128,), default_value=[0.0] * 128, dtype=tf.float32)
-
- # 分桶类特征
- age = fc.numeric_column(key="age", shape=(1,), default_value=[0], dtype=tf.int64)
- age = fc.bucketized_column(input_fc, boundaries=[0,10,20,30,40,50,60,70,80])
- age = fc.embedding_column(age, dimension=32, combiner='mean')
-
- # 分类特征
- city = fc.categorical_column_with_identity(key="city", num_buckets=1000, default_value=0)
- city = fc.embedding_column(city, dimension=32, combiner='mean')
-
- # hash特征
- device_id = fc.categorical_column_with_hash_bucket(key="device_id",
- hash_bucket_size=1000000, dtype=tf.int64)
- device_id = fc.embedding_column(device_id, dimension=32, combiner='mean')
-
- item_id = fc.categorical_column_with_hash_bucket(key="item_id",
- hash_bucket_size=1000000, dtype=tf.int64)
- item_id = fc.embedding_column(item_id, dimension=32, combiner='mean')
-
- self.user_columns["user_vector"] = user_vector
- self.user_columns["age"] = age
- self.user_columns["city"] = city
- self.user_columns["device_id"] = device_id
- self.item_columns["item_vector"] = item_vector
- self.item_columns["item_id"] = item_id
-
- self.feature_spec = tf.feature_column.make_parse_example_spec(self.user_columns.values()+self.item_columns.values())
-
- return self

2. 模型部分,base_model.py
- # coding:utf-8
- import tensorflow as tf
- from tensorflow import feature_column as fc
- import config
- import feature_processing as fe
-
- FLAGS = config.FLAGS
-
-
- def build_user_model(features, mode, params):
- # 特征输入
- feature_inputs = []
- for key, value in params["feature_configs"].user_columns.items():
- feature_inputs.append(tf.input_layer(features, value))
- # 特征拼接
- net = tf.concat(feature_inputs, axis=1)
- # 全连接
- for idx, units in enumerate(params["hidden_units"]):
- net = tf.layers.dense(net, units=units, activation=tf.nn.leaky_relu, name="fc_layer_%s"%idx)
- net = tf.layers.dropout(net, 0.4, training=(mode == tf.estimator.ModeKeys.TRAIN))
- # 最后输出
- net = tf.layers.dense(net, units=128, name="user_output_layer")
- return net
-
- def build_item_model(features, mode, params):
- # 特征输入
- feature_inputs = []
- for key, value in params["feature_configs"].item_columns.items():
- feature_inputs.append(tf.input_layer(features, value))
- # 特征拼接
- net = tf.concat(feature_inputs, axis=1)
- # 全连接
- for idx, units in enumerate(params["hidden_units"]):
- net = tf.layers.dense(net, units=units, activation=tf.nn.leaky_relu, name="fc_layer_%s"%idx)
- net = tf.layers.dropout(net, 0.4, training=(mode == tf.estimator.ModeKeys.TRAIN))
- # 最后输出
- net = tf.layers.dense(net, units=128, name="item_output_layer")
- return net
-
- def model_fn(features, labels, mode, params):
- user_net = build_user_model(features, mode, params)
- item_net = build_item_model(features, mode, params)
- dot = tf.reduce_sum(tf.multiply(user_net, item_net), axis=1, keepdims=True)
- pred = tf.sigmoid(dot)
- if mode == tf.estimator.ModeKeys.PREDICT:
- return tf.estimator.EstimatorSpec(mode, predictions={"output": pred})
- if mode == tf.estimator.ModeKeys.EVAL:
- loss = tf.losses.log_loss(labels, pred)
- metrics = {"auc": tf.metrics.auc(labels=labels, predictions=pred)}
- return tf.estimator.EstimatorSpec(mode, loss=loss, eval_metric_ops=metrics)
- if mode == tf.estimator.ModeKeys.TRAIN:
- loss = tf.losses.log_loss(labels, pred)
- global_step = tf.train.get_global_step()
- learning_rate = tf.train.exponential_decay(params["learning_rate"], global_step, 100000, 0.9, staircase=True)
- train_op = (tf.train.AdagradOptimizer(learning_rate).minimize(loss, global_step=global_step))
- return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

3. 数据输入部分,data_input.py
- # coding:utf-8
- import tensorflow as tf
- import config
-
- FLAGS = config.FLAGS
-
- def parse_exp(example):
- features_def = {}
- features_def["label"] = tf.io.FixedLenFeature([1], tf.int64)
- """
- 数据解析逻辑
- """
- features = tf.io.parse_single_example(example, features_def)
- label = features["label"]
- return features, label
-
-
- def train_input_fn(filenames=None, batch_size=128):
- with tf.gfile.Open(filenames) as f:
- filenames = f.read().split()
- files_all = []
- for f in filenames:
- files_all += tf.gfile.Glob(f)
- train_worker_num = len(FLAGS.worker_hosts.split(","))
- hash_id = FLAGS.task_index if FLAGS.job_name == "worker" else train_worker_num - 1
- files_shard = [files for i, files in enumerate(files_all) if i % train_worker_num == hash_id]
- dataset = tf.data.TFRecordDataset(files_shard)
- dataset = dataset.shuffle(batch_size*100)
- dataset = dataset.map(parse_exp, num_parallel_calls=8)
- dataset = dataset.repeat().batch(batch_size).prefetch(1)
- return dataset
-
- def eval_input_fn(filenames=None, batch_size=128):
- with tf.gfile.Open(filenames) as f:
- filenames = f.read().split()
- files = tf.data.Dataset.list_files(filenames)
- dataset = files.apply(tf.contrib.data.parallel_interleave(lambda filename: tf.data.TFRecordDataset(files), buffer_output_elements=batch_size*20, cycle_length=10))
- dataset = dataset.map(parse_exp, num_parallel_calls=4)
- dataset = dataset.batch(batch_size)
- return dataset

4. 主函数部分,main.py
- # encoding:utf-8
- import os
- import tensorflow as tf
- from tensorflow import feature_column as fc
- import feature_engineering as fe
- from feature_processing import FeatureConfig
- import base_model
- import data_input
- import config
-
- FLAGS = config.FLAGS
-
- if FLAGS.run_on_cluster:
- cluster = json.loads(os.environ["TF_CONFIG"])
- task_index = int(os.environ["TF_INDEX"])
- task_type = os.environ["TF_ROLE"]
-
-
- def main(unused_argv):
- feature_configs = FeatureConfig().create_features_columns()
- classifier = tf.estimator.Estimator(model_fn=model.model_fn,
- config=tf.estimator.RunConfig(model_dir=FLAGS.model_dir,
- save_checkpoints_steps=FLAGS.save_checkpoints_steps,
- keep_checkpoint_max=3),
- params={"feature_configs": feature_configs,
- "hidden_units": list(map(int, FLAGS.hidden_units.split(","))),
- "learning_rate": FLAGS.learning_rate}
- )
- def train_eval_model():
- train_spec = tf.estimator.TrainSpec(input_fn=lambda: data_input.train_input_fn(FLAGS.train_data, FLAGS.batch_size),
- max_steps=FLAGS.train_steps)
- eval_spec = tf.estimator.EvalSpec(input_fn=lambda: data_input.eval_input_fn(FLAGS.eval_data, FLAGS.batch_size),
- start_delay_secs=60,
- throttle_secs = 30,
- steps=1000)
- tf.estimator.train_and_evaluate(classifier, train_spec, eval_spec)
-
- def export_model():
- feature_spec = feature_configs.feature_spec
- feature_map = {}
- for key, feature in feature_spec.items():
- if key not in fe.feature_configs:
- continue
- if isinstance(feature, tf.io.VarLenFeature): # 可变长度
- feature_map[key] = tf.placeholder(dtype=feature.dtype, shape=[1], name=key)
- elif isinstance(feature, tf.io.FixedLenFeature): # 固定长度
- feature_map[key] = tf.placeholder(dtype=feature.dtype, shape=[None, feature.shape[0]], name=key)
- serving_input_recevier_fn = tf.estimator.export.build_raw_serving_input_receiver_fn(feature_map)
- export_dir = classifier.export_saved_model(FLAGS.output_model, serving_input_recevier_fn)
-
- # 模型训练
- train_eval_model()
-
- # 导出模型,只在chief中导出一次即可
- if FLAGS.run_on_cluster:
- if task_type == "chief":
- export_model()
- else:
- export_model()
-
-
- if __name__ == "__main__":
- os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
- tf.logging.set_verbosity(tf.logging.INFO)
- tf.app.run(main=main)

5. config.py
- import json, os, re, codecs
- import tensorflow as tf
-
- flags = tf.app.flags
-
- flags.DEFINE_boolean("run_on_cluster", True, "Whether the cluster info need to be passed in as input")
-
- flags.DEFINE_string("train_dir", "", "")
- flags.DEFINE_string("data_dir", "", "")
- flags.DEFINE_string("log_dir", "", "")
- flags.DEFINE_string("ps_hosts", "","")
- flags.DEFINE_string("worker_hosts", "","")
- flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'")
- flags.DEFINE_integer("task_index", 0, "Index of task within the job")
- flags.DEFINE_string("model_dir", "hdfs:/user/ranker/ckpt/", "Base directory for the model.")
- flags.DEFINE_string("output_model", "hdfs:/user/ranker/model/", "Saved model.")
- flags.DEFINE_string("train_data", "./train_data.txt", "Directory for storing mnist data")
- flags.DEFINE_string("eval_data", "./eval_data.txt", "Path to the evaluation data.")
- flags.DEFINE_string("hidden_units", "512,256,128", "hidden units.")
- flags.DEFINE_integer("train_steps",1000000, "Number of (global) training steps to perform")
- flags.DEFINE_integer("batch_size", 256, "Training batch size")
- flags.DEFINE_float("learning_rate", 0.0001, "learning rate")
- flags.DEFINE_integer("save_checkpoints_steps", 10000, "Save checkpoints every this many steps")
-
- FLAGS = flags.FLAGS

6. 最后是任务提交命令,各公司应该都有自己的提交平台,这里就不贴出来了。
当然该代码直接单机运行也是没有任何问题的,只需要将run_on_cluster设置为False即可。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。