当前位置:   article > 正文

分布式深度学习框架--百度paddlepaddle的环境搭建以及测试代码_深度学习分布式测试

深度学习分布式测试

一、搭建环境

      分布式paddle的环境和tensorflow的环境很相似,在我之前的博客中已经写得非常清楚了

      我的是 python3.6+cuda9+cudnn7+ubuntu16.04

      博客链接:https://blog.csdn.net/qq_28626909/article/details/85007363

      搭建好环境知道我们开始安装paddlepaddle,

      官方网站链接:http://www.paddlepaddle.org/

      

    我的配置是NVIDIA384+cuda9.0+cudnn7.0,所以我直接pip install paddlepaddle-gpu即可

     cuda8+cudnn7.0是pip install paddlepaddle-gpu==1.2.0.post87

     cuda8+cudnn5.0是pip install paddlepaddle-gpu==1.2.0.post85

二,测试单机paddle

    为了测试我们的paddle是否安装成功,我们这里放出代码,可以直接跑的。

  1. #!D:/workplace/python
  2. # -*- coding: utf-8 -*-
  3. # @File : first_test.py
  4. # @Author: WangYe
  5. # @Date : 2019/1/10
  6. # @Software: PyCharm
  7. import paddle
  8. import paddle.fluid as fluid
  9. import numpy
  10. import six
  11. # Configure the neural network.
  12. def net(x, y):
  13. y_predict = fluid.layers.fc(input=x, size=1, act=None)
  14. cost = fluid.layers.square_error_cost(input=y_predict, label=y)
  15. avg_cost = fluid.layers.mean(cost)
  16. return y_predict, avg_cost
  17. # Define train function.
  18. def train(save_dirname):
  19. x = fluid.layers.data(name='x', shape=[13], dtype='float32')
  20. y = fluid.layers.data(name='y', shape=[1], dtype='float32')
  21. y_predict, avg_cost = net(x, y)
  22. sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001)
  23. sgd_optimizer.minimize(avg_cost)
  24. train_reader = paddle.batch(
  25. paddle.reader.shuffle(paddle.dataset.uci_housing.train(), buf_size=500),
  26. batch_size=20)
  27. place = fluid.CPUPlace()
  28. exe = fluid.Executor(place)
  29. def train_loop(main_program):
  30. feeder = fluid.DataFeeder(place=place, feed_list=[x, y])
  31. exe.run(fluid.default_startup_program())
  32. PASS_NUM = 1000
  33. for pass_id in range(PASS_NUM):
  34. total_loss_pass = 0
  35. for data in train_reader():
  36. avg_loss_value, = exe.run(
  37. main_program, feed=feeder.feed(data), fetch_list=[avg_cost])
  38. total_loss_pass += avg_loss_value
  39. if avg_loss_value < 5.0:
  40. if save_dirname is not None:
  41. fluid.io.save_inference_model(
  42. save_dirname, ['x'], [y_predict], exe)
  43. return
  44. print("Pass %d, total avg cost = %f" % (pass_id, total_loss_pass))
  45. train_loop(fluid.default_main_program())
  46. # Infer by using provided test data.
  47. def infer(save_dirname=None):
  48. place = fluid.CPUPlace()
  49. exe = fluid.Executor(place)
  50. inference_scope = fluid.core.Scope()
  51. with fluid.scope_guard(inference_scope):
  52. [inference_program, feed_target_names, fetch_targets] = (
  53. fluid.io.load_inference_model(save_dirname, exe))
  54. test_reader = paddle.batch(paddle.dataset.uci_housing.test(), batch_size=20)
  55. test_data = six.next(test_reader())
  56. test_feat = numpy.array(list(map(lambda x: x[0], test_data))).astype("float32")
  57. test_label = numpy.array(list(map(lambda x: x[1], test_data))).astype("float32")
  58. results = exe.run(inference_program,
  59. feed={feed_target_names[0]: numpy.array(test_feat)},
  60. fetch_list=fetch_targets)
  61. print("infer results: ", results[0])
  62. print("ground truth: ", test_label)
  63. # Run train and infer.
  64. if __name__ == "__main__":
  65. save_dirname = "fit_a_line.inference.model"
  66. train(save_dirname)
  67. infer(save_dirname)

代码的很多部分应该用过tensorflow的人都能看懂,paddlepaddle还是比tensorflow更简洁的   

运行结果截图:

  三、分布式paddle的测试

      分布式paddle和tensorflow是有很多相似之处,我这里对官方网站的代码进行了一点改动,这样测试起来就更为方便

   tensorflow把参数服务器叫ps,计算节点叫worker

   padddle把参数服务器叫PSERVER,计算节点叫TRAINER

    这是第一段,也就是PSERVER的代码,我把代码命名为:

   dist_train_ps.py
  1. #!D:/workplace/python
  2. # -*- coding: utf-8 -*-
  3. # @File : dist_train_ps.py
  4. # @Author: WangYe
  5. # @Date : 2019/1/11
  6. # @Software: PyCharm
  7. import os
  8. import paddle
  9. import paddle.fluid as fluid
  10. # train reader
  11. BATCH_SIZE = 20
  12. EPOCH_NUM = 30
  13. BATCH_SIZE = 8
  14. train_reader = paddle.batch(
  15. paddle.reader.shuffle(
  16. paddle.dataset.uci_housing.train(), buf_size=500),
  17. batch_size=BATCH_SIZE)
  18. def train():
  19. y = fluid.layers.data(name='y', shape=[1], dtype='float32')
  20. x = fluid.layers.data(name='x', shape=[13], dtype='float32')
  21. y_predict = fluid.layers.fc(input=x, size=1, act=None)
  22. loss = fluid.layers.square_error_cost(input=y_predict, label=y)
  23. avg_loss = fluid.layers.mean(loss)
  24. opt = fluid.optimizer.SGD(learning_rate=0.001)
  25. opt.minimize(avg_loss)
  26. place = fluid.CPUPlace()
  27. feeder = fluid.DataFeeder(place=place, feed_list=[x, y])
  28. exe = fluid.Executor(place)
  29. # fetch distributed training environment setting
  30. training_role = os.getenv("PADDLE_TRAINING_ROLE", "PSERVER")
  31. port = os.getenv("PADDLE_PSERVER_PORT", "6174")
  32. pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "localhost")
  33. trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
  34. eplist = []
  35. for ip in pserver_ips.split(","):
  36. eplist.append(':'.join([ip, port]))
  37. pserver_endpoints = ",".join(eplist)
  38. trainers = int(os.getenv("PADDLE_TRAINERS","1"))
  39. current_endpoint = os.getenv("PADDLE_CURRENT_IP", "localhost") + ":" + port
  40. t = fluid.DistributeTranspiler()
  41. t.transpile(
  42. trainer_id = trainer_id,
  43. pservers = pserver_endpoints,
  44. trainers = trainers)
  45. if training_role == "PSERVER":
  46. pserver_prog = t.get_pserver_program(current_endpoint)
  47. startup_prog = t.get_startup_program(current_endpoint, pserver_prog)
  48. exe.run(startup_prog)
  49. exe.run(pserver_prog)
  50. elif training_role == "TRAINER":
  51. trainer_prog = t.get_trainer_program()
  52. exe.run(fluid.default_startup_program())
  53. for epoch in range(EPOCH_NUM):
  54. for batch_id, batch_data in enumerate(train_reader()):
  55. avg_loss_value, = exe.run(trainer_prog,
  56. feed=feeder.feed(batch_data),
  57. fetch_list=[avg_loss])
  58. if (batch_id + 1) % 10 == 0:
  59. print("Epoch: {0}, Batch: {1}, loss: {2}".format(
  60. epoch, batch_id, avg_loss_value[0]))
  61. # destory the resource of current trainer node in pserver server node
  62. exe.close()
  63. else:
  64. raise AssertionError("PADDLE_TRAINING_ROLE should be one of [TRAINER, PSERVER]")
  65. train()

    下面的是第二段,也就是的TRAINER,我把代码命名为

   dist_train_worker.py
  1. #!D:/workplace/python
  2. # -*- coding: utf-8 -*-
  3. # @File : dist_train_ps.py
  4. # @Author: WangYe
  5. # @Date : 2019/1/11
  6. # @Software: PyCharm
  7. import os
  8. import paddle
  9. import paddle.fluid as fluid
  10. # train reader
  11. BATCH_SIZE = 20
  12. EPOCH_NUM = 30
  13. BATCH_SIZE = 8
  14. train_reader = paddle.batch(
  15. paddle.reader.shuffle(
  16. paddle.dataset.uci_housing.train(), buf_size=500),
  17. batch_size=BATCH_SIZE)
  18. def train():
  19. y = fluid.layers.data(name='y', shape=[1], dtype='float32')
  20. x = fluid.layers.data(name='x', shape=[13], dtype='float32')
  21. y_predict = fluid.layers.fc(input=x, size=1, act=None)
  22. loss = fluid.layers.square_error_cost(input=y_predict, label=y)
  23. avg_loss = fluid.layers.mean(loss)
  24. opt = fluid.optimizer.SGD(learning_rate=0.001)
  25. opt.minimize(avg_loss)
  26. place = fluid.CPUPlace()
  27. feeder = fluid.DataFeeder(place=place, feed_list=[x, y])
  28. exe = fluid.Executor(place)
  29. # fetch distributed training environment setting
  30. training_role = os.getenv("PADDLE_TRAINING_ROLE", "TRAINER")
  31. port = os.getenv("PADDLE_PSERVER_PORT", "6174")
  32. pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "localhost")
  33. trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
  34. eplist = []
  35. for ip in pserver_ips.split(","):
  36. eplist.append(':'.join([ip, port]))
  37. pserver_endpoints = ",".join(eplist)
  38. trainers = int(os.getenv("PADDLE_TRAINERS","1"))
  39. current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port
  40. t = fluid.DistributeTranspiler()
  41. t.transpile(
  42. trainer_id = trainer_id,
  43. pservers = pserver_endpoints,
  44. trainers = trainers)
  45. if training_role == "PSERVER":
  46. pserver_prog = t.get_pserver_program(current_endpoint)
  47. startup_prog = t.get_startup_program(current_endpoint, pserver_prog)
  48. exe.run(startup_prog)
  49. exe.run(pserver_prog)
  50. elif training_role == "TRAINER":
  51. trainer_prog = t.get_trainer_program()
  52. exe.run(fluid.default_startup_program())
  53. for epoch in range(EPOCH_NUM):
  54. for batch_id, batch_data in enumerate(train_reader()):
  55. avg_loss_value, = exe.run(trainer_prog,
  56. feed=feeder.feed(batch_data),
  57. fetch_list=[avg_loss])
  58. if (batch_id + 1) % 10 == 0:
  59. print("Epoch: {0}, Batch: {1}, loss: {2}".format(
  60. epoch, batch_id, avg_loss_value[0]))
  61. # destory the resource of current trainer node in pserver server node
  62. exe.close()
  63. else:
  64. raise AssertionError("PADDLE_TRAINING_ROLE should be one of [TRAINER, PSERVER]")
  65. train()

对于这两端代码,有一下几点说明

1.分布式测试代码用到的数据是上面第二部分单机测试的数据,它的数据是自动下载的。也就是说运行过上面的单机代码后,就自动有了分布式的数据

2.分布式代码需要分开执行,同tensorflow一样,不过我做的改动在代码里了,所以执行代码之后命令行不用添加参数,如下截图

 

 

 

3.可能遇到的问题:

  除开我刚开始放出的链接中遇到的环境问题,我遇到了一个 段错误的一个问题,那是因为ps和worker的ip写的有问题

四、分布式paddle的关键参数

  

  1. training_role = os.getenv("PADDLE_TRAINING_ROLE", "TRAINER")
  2. port = os.getenv("PADDLE_PSERVER_PORT", "6174")
  3. pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "localhost")
  4. trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
  5. eplist = []
  6. for ip in pserver_ips.split(","):
  7. eplist.append(':'.join([ip, port]))
  8. pserver_endpoints = ",".join(eplist)
  9. trainers = int(os.getenv("PADDLE_TRAINERS","1"))
  10. current_endpoint = os.getenv("PADDLE_CURRENT_IP", "localhost") + ":" + port

  我们一个一个来看,以上面的worker的代码为例(paddle中把worker成为tariner),从上往下讲

  paddle_tarining_role(str):训练对象(trainer还是pserver)

 paddle_pserver_port(int):端口(这里和tensorflow不同的是必须指定同一个端口,即所有tariner和ps都是6174)

 paddle_pserver_ips(str):ps的ip地址,如果多个就用逗号隔开

 paddle_trainer_id(int):tariner各自的编号,在ps中全部填写0

 paddle_trainers(int):trainer的数量,即tensorflow中多少个worker

paddle_current_ip(str):当前ps节点的ip地址,如果是worker,则可以不指定

官方网站链接:http://www.paddlepaddle.org/documentation/docs/zh/1.2/user_guides/howto/training/cluster_quick_start.html

 官方参数解读网站截图:

 

 运行截图:

 

ok,大功告成!!

 

 

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号