当前位置:   article > 正文

利用tensorflow estimator API实现双塔推荐算法

利用tensorflow estimator API实现双塔推荐算法

本文完整代码见: https://github.com/cdj0311/two_tower_recommendation_system

Tensorflow estimator实现分布式训练很简单,只需要将数据进行相应的切分丢给模型就可以很方便的完成分布式训练了。以下代码是一个完整的推荐算法模板,可根据自己的需要修改数据读取和模型结构部分,tensorflow==1.13.1。

1. 特征处理部分,feature_processing.py

  1. #coding:utf-8
  2. import tensorflow as tf
  3. from tensorflow import feature_column as fc
  4. import config
  5. FLAGS = config.FLAGS
  6. class FeatureConfig(object):
  7. def __init__(self):
  8. self.user_columns = dict()
  9. self.item_columns = dict()
  10. self.feature_spec = dict()
  11. def create_features_columns(self):
  12. # 向量类特征
  13. user_vector = fc.numeric_column(key="user_vector", shape=(128,), default_value=[0.0] * 128, dtype=tf.float32)
  14. item_vector = fc.numeric_column(key="item_vector", shape=(128,), default_value=[0.0] * 128, dtype=tf.float32)
  15. # 分桶类特征
  16. age = fc.numeric_column(key="age", shape=(1,), default_value=[0], dtype=tf.int64)
  17. age = fc.bucketized_column(input_fc, boundaries=[0,10,20,30,40,50,60,70,80])
  18. age = fc.embedding_column(age, dimension=32, combiner='mean')
  19. # 分类特征
  20. city = fc.categorical_column_with_identity(key="city", num_buckets=1000, default_value=0)
  21. city = fc.embedding_column(city, dimension=32, combiner='mean')
  22. # hash特征
  23. device_id = fc.categorical_column_with_hash_bucket(key="device_id",
  24. hash_bucket_size=1000000, dtype=tf.int64)
  25. device_id = fc.embedding_column(device_id, dimension=32, combiner='mean')
  26. item_id = fc.categorical_column_with_hash_bucket(key="item_id",
  27. hash_bucket_size=1000000, dtype=tf.int64)
  28. item_id = fc.embedding_column(item_id, dimension=32, combiner='mean')
  29. self.user_columns["user_vector"] = user_vector
  30. self.user_columns["age"] = age
  31. self.user_columns["city"] = city
  32. self.user_columns["device_id"] = device_id
  33. self.item_columns["item_vector"] = item_vector
  34. self.item_columns["item_id"] = item_id
  35. self.feature_spec = tf.feature_column.make_parse_example_spec(self.user_columns.values()+self.item_columns.values())
  36. return self

2. 模型部分,base_model.py

  1. # coding:utf-8
  2. import tensorflow as tf
  3. from tensorflow import feature_column as fc
  4. import config
  5. import feature_processing as fe
  6. FLAGS = config.FLAGS
  7. def build_user_model(features, mode, params):
  8. # 特征输入
  9. feature_inputs = []
  10. for key, value in params["feature_configs"].user_columns.items():
  11. feature_inputs.append(tf.input_layer(features, value))
  12. # 特征拼接
  13. net = tf.concat(feature_inputs, axis=1)
  14. # 全连接
  15. for idx, units in enumerate(params["hidden_units"]):
  16. net = tf.layers.dense(net, units=units, activation=tf.nn.leaky_relu, name="fc_layer_%s"%idx)
  17. net = tf.layers.dropout(net, 0.4, training=(mode == tf.estimator.ModeKeys.TRAIN))
  18. # 最后输出
  19. net = tf.layers.dense(net, units=128, name="user_output_layer")
  20. return net
  21. def build_item_model(features, mode, params):
  22. # 特征输入
  23. feature_inputs = []
  24. for key, value in params["feature_configs"].item_columns.items():
  25. feature_inputs.append(tf.input_layer(features, value))
  26. # 特征拼接
  27. net = tf.concat(feature_inputs, axis=1)
  28. # 全连接
  29. for idx, units in enumerate(params["hidden_units"]):
  30. net = tf.layers.dense(net, units=units, activation=tf.nn.leaky_relu, name="fc_layer_%s"%idx)
  31. net = tf.layers.dropout(net, 0.4, training=(mode == tf.estimator.ModeKeys.TRAIN))
  32. # 最后输出
  33. net = tf.layers.dense(net, units=128, name="item_output_layer")
  34. return net
  35. def model_fn(features, labels, mode, params):
  36. user_net = build_user_model(features, mode, params)
  37. item_net = build_item_model(features, mode, params)
  38. dot = tf.reduce_sum(tf.multiply(user_net, item_net), axis=1, keepdims=True)
  39. pred = tf.sigmoid(dot)
  40. if mode == tf.estimator.ModeKeys.PREDICT:
  41. return tf.estimator.EstimatorSpec(mode, predictions={"output": pred})
  42. if mode == tf.estimator.ModeKeys.EVAL:
  43. loss = tf.losses.log_loss(labels, pred)
  44. metrics = {"auc": tf.metrics.auc(labels=labels, predictions=pred)}
  45. return tf.estimator.EstimatorSpec(mode, loss=loss, eval_metric_ops=metrics)
  46. if mode == tf.estimator.ModeKeys.TRAIN:
  47. loss = tf.losses.log_loss(labels, pred)
  48. global_step = tf.train.get_global_step()
  49. learning_rate = tf.train.exponential_decay(params["learning_rate"], global_step, 100000, 0.9, staircase=True)
  50. train_op = (tf.train.AdagradOptimizer(learning_rate).minimize(loss, global_step=global_step))
  51. return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

3. 数据输入部分,data_input.py

  1. # coding:utf-8
  2. import tensorflow as tf
  3. import config
  4. FLAGS = config.FLAGS
  5. def parse_exp(example):
  6. features_def = {}
  7. features_def["label"] = tf.io.FixedLenFeature([1], tf.int64)
  8. """
  9. 数据解析逻辑
  10. """
  11. features = tf.io.parse_single_example(example, features_def)
  12. label = features["label"]
  13. return features, label
  14. def train_input_fn(filenames=None, batch_size=128):
  15. with tf.gfile.Open(filenames) as f:
  16. filenames = f.read().split()
  17. files_all = []
  18. for f in filenames:
  19. files_all += tf.gfile.Glob(f)
  20. train_worker_num = len(FLAGS.worker_hosts.split(","))
  21. hash_id = FLAGS.task_index if FLAGS.job_name == "worker" else train_worker_num - 1
  22. files_shard = [files for i, files in enumerate(files_all) if i % train_worker_num == hash_id]
  23. dataset = tf.data.TFRecordDataset(files_shard)
  24. dataset = dataset.shuffle(batch_size*100)
  25. dataset = dataset.map(parse_exp, num_parallel_calls=8)
  26. dataset = dataset.repeat().batch(batch_size).prefetch(1)
  27. return dataset
  28. def eval_input_fn(filenames=None, batch_size=128):
  29. with tf.gfile.Open(filenames) as f:
  30. filenames = f.read().split()
  31. files = tf.data.Dataset.list_files(filenames)
  32. dataset = files.apply(tf.contrib.data.parallel_interleave(lambda filename: tf.data.TFRecordDataset(files), buffer_output_elements=batch_size*20, cycle_length=10))
  33. dataset = dataset.map(parse_exp, num_parallel_calls=4)
  34. dataset = dataset.batch(batch_size)
  35. return dataset

4. 主函数部分,main.py

  1. # encoding:utf-8
  2. import os
  3. import tensorflow as tf
  4. from tensorflow import feature_column as fc
  5. import feature_engineering as fe
  6. from feature_processing import FeatureConfig
  7. import base_model
  8. import data_input
  9. import config
  10. FLAGS = config.FLAGS
  11. if FLAGS.run_on_cluster:
  12. cluster = json.loads(os.environ["TF_CONFIG"])
  13. task_index = int(os.environ["TF_INDEX"])
  14. task_type = os.environ["TF_ROLE"]
  15. def main(unused_argv):
  16. feature_configs = FeatureConfig().create_features_columns()
  17. classifier = tf.estimator.Estimator(model_fn=model.model_fn,
  18. config=tf.estimator.RunConfig(model_dir=FLAGS.model_dir,
  19. save_checkpoints_steps=FLAGS.save_checkpoints_steps,
  20. keep_checkpoint_max=3),
  21. params={"feature_configs": feature_configs,
  22. "hidden_units": list(map(int, FLAGS.hidden_units.split(","))),
  23. "learning_rate": FLAGS.learning_rate}
  24. )
  25. def train_eval_model():
  26. train_spec = tf.estimator.TrainSpec(input_fn=lambda: data_input.train_input_fn(FLAGS.train_data, FLAGS.batch_size),
  27. max_steps=FLAGS.train_steps)
  28. eval_spec = tf.estimator.EvalSpec(input_fn=lambda: data_input.eval_input_fn(FLAGS.eval_data, FLAGS.batch_size),
  29. start_delay_secs=60,
  30. throttle_secs = 30,
  31. steps=1000)
  32. tf.estimator.train_and_evaluate(classifier, train_spec, eval_spec)
  33. def export_model():
  34. feature_spec = feature_configs.feature_spec
  35. feature_map = {}
  36. for key, feature in feature_spec.items():
  37. if key not in fe.feature_configs:
  38. continue
  39. if isinstance(feature, tf.io.VarLenFeature): # 可变长度
  40. feature_map[key] = tf.placeholder(dtype=feature.dtype, shape=[1], name=key)
  41. elif isinstance(feature, tf.io.FixedLenFeature): # 固定长度
  42. feature_map[key] = tf.placeholder(dtype=feature.dtype, shape=[None, feature.shape[0]], name=key)
  43. serving_input_recevier_fn = tf.estimator.export.build_raw_serving_input_receiver_fn(feature_map)
  44. export_dir = classifier.export_saved_model(FLAGS.output_model, serving_input_recevier_fn)
  45. # 模型训练
  46. train_eval_model()
  47. # 导出模型,只在chief中导出一次即可
  48. if FLAGS.run_on_cluster:
  49. if task_type == "chief":
  50. export_model()
  51. else:
  52. export_model()
  53. if __name__ == "__main__":
  54. os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
  55. tf.logging.set_verbosity(tf.logging.INFO)
  56. tf.app.run(main=main)

5. config.py

  1. import json, os, re, codecs
  2. import tensorflow as tf
  3. flags = tf.app.flags
  4. flags.DEFINE_boolean("run_on_cluster", True, "Whether the cluster info need to be passed in as input")
  5. flags.DEFINE_string("train_dir", "", "")
  6. flags.DEFINE_string("data_dir", "", "")
  7. flags.DEFINE_string("log_dir", "", "")
  8. flags.DEFINE_string("ps_hosts", "","")
  9. flags.DEFINE_string("worker_hosts", "","")
  10. flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'")
  11. flags.DEFINE_integer("task_index", 0, "Index of task within the job")
  12. flags.DEFINE_string("model_dir", "hdfs:/user/ranker/ckpt/", "Base directory for the model.")
  13. flags.DEFINE_string("output_model", "hdfs:/user/ranker/model/", "Saved model.")
  14. flags.DEFINE_string("train_data", "./train_data.txt", "Directory for storing mnist data")
  15. flags.DEFINE_string("eval_data", "./eval_data.txt", "Path to the evaluation data.")
  16. flags.DEFINE_string("hidden_units", "512,256,128", "hidden units.")
  17. flags.DEFINE_integer("train_steps",1000000, "Number of (global) training steps to perform")
  18. flags.DEFINE_integer("batch_size", 256, "Training batch size")
  19. flags.DEFINE_float("learning_rate", 0.0001, "learning rate")
  20. flags.DEFINE_integer("save_checkpoints_steps", 10000, "Save checkpoints every this many steps")
  21. FLAGS = flags.FLAGS

6. 最后是任务提交命令,各公司应该都有自己的提交平台,这里就不贴出来了。

当然该代码直接单机运行也是没有任何问题的,只需要将run_on_cluster设置为False即可。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/123638
推荐阅读
相关标签
  

闽ICP备14008679号