赞
踩
分布式paddle的环境和tensorflow的环境很相似,在我之前的博客中已经写得非常清楚了
我的是 python3.6+cuda9+cudnn7+ubuntu16.04
博客链接:https://blog.csdn.net/qq_28626909/article/details/85007363
搭建好环境知道我们开始安装paddlepaddle,
官方网站链接:http://www.paddlepaddle.org/
我的配置是NVIDIA384+cuda9.0+cudnn7.0,所以我直接pip install paddlepaddle-gpu即可
cuda8+cudnn7.0是pip install paddlepaddle-gpu==1.2.0.post87
cuda8+cudnn5.0是pip install paddlepaddle-gpu==1.2.0.post85
为了测试我们的paddle是否安装成功,我们这里放出代码,可以直接跑的。
- #!D:/workplace/python
- # -*- coding: utf-8 -*-
- # @File : first_test.py
- # @Author: WangYe
- # @Date : 2019/1/10
- # @Software: PyCharm
- import paddle
- import paddle.fluid as fluid
- import numpy
- import six
-
-
- # Configure the neural network.
- def net(x, y):
- y_predict = fluid.layers.fc(input=x, size=1, act=None)
- cost = fluid.layers.square_error_cost(input=y_predict, label=y)
- avg_cost = fluid.layers.mean(cost)
- return y_predict, avg_cost
-
-
- # Define train function.
- def train(save_dirname):
- x = fluid.layers.data(name='x', shape=[13], dtype='float32')
- y = fluid.layers.data(name='y', shape=[1], dtype='float32')
- y_predict, avg_cost = net(x, y)
- sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001)
- sgd_optimizer.minimize(avg_cost)
- train_reader = paddle.batch(
- paddle.reader.shuffle(paddle.dataset.uci_housing.train(), buf_size=500),
- batch_size=20)
- place = fluid.CPUPlace()
- exe = fluid.Executor(place)
-
- def train_loop(main_program):
- feeder = fluid.DataFeeder(place=place, feed_list=[x, y])
- exe.run(fluid.default_startup_program())
-
- PASS_NUM = 1000
- for pass_id in range(PASS_NUM):
- total_loss_pass = 0
- for data in train_reader():
- avg_loss_value, = exe.run(
- main_program, feed=feeder.feed(data), fetch_list=[avg_cost])
- total_loss_pass += avg_loss_value
- if avg_loss_value < 5.0:
- if save_dirname is not None:
- fluid.io.save_inference_model(
- save_dirname, ['x'], [y_predict], exe)
- return
- print("Pass %d, total avg cost = %f" % (pass_id, total_loss_pass))
-
- train_loop(fluid.default_main_program())
-
-
- # Infer by using provided test data.
- def infer(save_dirname=None):
- place = fluid.CPUPlace()
- exe = fluid.Executor(place)
- inference_scope = fluid.core.Scope()
- with fluid.scope_guard(inference_scope):
- [inference_program, feed_target_names, fetch_targets] = (
- fluid.io.load_inference_model(save_dirname, exe))
- test_reader = paddle.batch(paddle.dataset.uci_housing.test(), batch_size=20)
-
- test_data = six.next(test_reader())
- test_feat = numpy.array(list(map(lambda x: x[0], test_data))).astype("float32")
- test_label = numpy.array(list(map(lambda x: x[1], test_data))).astype("float32")
-
- results = exe.run(inference_program,
- feed={feed_target_names[0]: numpy.array(test_feat)},
- fetch_list=fetch_targets)
- print("infer results: ", results[0])
- print("ground truth: ", test_label)
-
-
- # Run train and infer.
- if __name__ == "__main__":
- save_dirname = "fit_a_line.inference.model"
- train(save_dirname)
- infer(save_dirname)
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
代码的很多部分应该用过tensorflow的人都能看懂,paddlepaddle还是比tensorflow更简洁的
运行结果截图:
分布式paddle和tensorflow是有很多相似之处,我这里对官方网站的代码进行了一点改动,这样测试起来就更为方便
tensorflow把参数服务器叫ps,计算节点叫worker
padddle把参数服务器叫PSERVER,计算节点叫TRAINER
这是第一段,也就是PSERVER的代码,我把代码命名为:
dist_train_ps.py
- #!D:/workplace/python
- # -*- coding: utf-8 -*-
- # @File : dist_train_ps.py
- # @Author: WangYe
- # @Date : 2019/1/11
- # @Software: PyCharm
-
- import os
- import paddle
- import paddle.fluid as fluid
-
- # train reader
- BATCH_SIZE = 20
- EPOCH_NUM = 30
- BATCH_SIZE = 8
-
- train_reader = paddle.batch(
- paddle.reader.shuffle(
- paddle.dataset.uci_housing.train(), buf_size=500),
- batch_size=BATCH_SIZE)
-
- def train():
- y = fluid.layers.data(name='y', shape=[1], dtype='float32')
- x = fluid.layers.data(name='x', shape=[13], dtype='float32')
- y_predict = fluid.layers.fc(input=x, size=1, act=None)
-
- loss = fluid.layers.square_error_cost(input=y_predict, label=y)
- avg_loss = fluid.layers.mean(loss)
- opt = fluid.optimizer.SGD(learning_rate=0.001)
- opt.minimize(avg_loss)
-
- place = fluid.CPUPlace()
- feeder = fluid.DataFeeder(place=place, feed_list=[x, y])
- exe = fluid.Executor(place)
-
- # fetch distributed training environment setting
- training_role = os.getenv("PADDLE_TRAINING_ROLE", "PSERVER")
- port = os.getenv("PADDLE_PSERVER_PORT", "6174")
- pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "localhost")
- trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
- eplist = []
- for ip in pserver_ips.split(","):
- eplist.append(':'.join([ip, port]))
- pserver_endpoints = ",".join(eplist)
- trainers = int(os.getenv("PADDLE_TRAINERS","1"))
- current_endpoint = os.getenv("PADDLE_CURRENT_IP", "localhost") + ":" + port
-
- t = fluid.DistributeTranspiler()
- t.transpile(
- trainer_id = trainer_id,
- pservers = pserver_endpoints,
- trainers = trainers)
-
- if training_role == "PSERVER":
- pserver_prog = t.get_pserver_program(current_endpoint)
- startup_prog = t.get_startup_program(current_endpoint, pserver_prog)
- exe.run(startup_prog)
- exe.run(pserver_prog)
- elif training_role == "TRAINER":
- trainer_prog = t.get_trainer_program()
- exe.run(fluid.default_startup_program())
-
- for epoch in range(EPOCH_NUM):
- for batch_id, batch_data in enumerate(train_reader()):
- avg_loss_value, = exe.run(trainer_prog,
- feed=feeder.feed(batch_data),
- fetch_list=[avg_loss])
- if (batch_id + 1) % 10 == 0:
- print("Epoch: {0}, Batch: {1}, loss: {2}".format(
- epoch, batch_id, avg_loss_value[0]))
- # destory the resource of current trainer node in pserver server node
- exe.close()
- else:
- raise AssertionError("PADDLE_TRAINING_ROLE should be one of [TRAINER, PSERVER]")
-
- train()
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
下面的是第二段,也就是的TRAINER,我把代码命名为
dist_train_worker.py
- #!D:/workplace/python
- # -*- coding: utf-8 -*-
- # @File : dist_train_ps.py
- # @Author: WangYe
- # @Date : 2019/1/11
- # @Software: PyCharm
-
- import os
- import paddle
- import paddle.fluid as fluid
-
- # train reader
- BATCH_SIZE = 20
- EPOCH_NUM = 30
- BATCH_SIZE = 8
-
- train_reader = paddle.batch(
- paddle.reader.shuffle(
- paddle.dataset.uci_housing.train(), buf_size=500),
- batch_size=BATCH_SIZE)
-
- def train():
- y = fluid.layers.data(name='y', shape=[1], dtype='float32')
- x = fluid.layers.data(name='x', shape=[13], dtype='float32')
- y_predict = fluid.layers.fc(input=x, size=1, act=None)
-
- loss = fluid.layers.square_error_cost(input=y_predict, label=y)
- avg_loss = fluid.layers.mean(loss)
- opt = fluid.optimizer.SGD(learning_rate=0.001)
- opt.minimize(avg_loss)
-
- place = fluid.CPUPlace()
- feeder = fluid.DataFeeder(place=place, feed_list=[x, y])
- exe = fluid.Executor(place)
-
- # fetch distributed training environment setting
- training_role = os.getenv("PADDLE_TRAINING_ROLE", "TRAINER")
- port = os.getenv("PADDLE_PSERVER_PORT", "6174")
- pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "localhost")
- trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
- eplist = []
- for ip in pserver_ips.split(","):
- eplist.append(':'.join([ip, port]))
- pserver_endpoints = ",".join(eplist)
- trainers = int(os.getenv("PADDLE_TRAINERS","1"))
- current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port
-
- t = fluid.DistributeTranspiler()
- t.transpile(
- trainer_id = trainer_id,
- pservers = pserver_endpoints,
- trainers = trainers)
-
- if training_role == "PSERVER":
- pserver_prog = t.get_pserver_program(current_endpoint)
- startup_prog = t.get_startup_program(current_endpoint, pserver_prog)
- exe.run(startup_prog)
- exe.run(pserver_prog)
- elif training_role == "TRAINER":
- trainer_prog = t.get_trainer_program()
- exe.run(fluid.default_startup_program())
-
- for epoch in range(EPOCH_NUM):
- for batch_id, batch_data in enumerate(train_reader()):
- avg_loss_value, = exe.run(trainer_prog,
- feed=feeder.feed(batch_data),
- fetch_list=[avg_loss])
- if (batch_id + 1) % 10 == 0:
- print("Epoch: {0}, Batch: {1}, loss: {2}".format(
- epoch, batch_id, avg_loss_value[0]))
- # destory the resource of current trainer node in pserver server node
- exe.close()
- else:
- raise AssertionError("PADDLE_TRAINING_ROLE should be one of [TRAINER, PSERVER]")
-
- train()
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
对于这两端代码,有一下几点说明
1.分布式测试代码用到的数据是上面第二部分单机测试的数据,它的数据是自动下载的。也就是说运行过上面的单机代码后,就自动有了分布式的数据
2.分布式代码需要分开执行,同tensorflow一样,不过我做的改动在代码里了,所以执行代码之后命令行不用添加参数,如下截图
3.可能遇到的问题:
除开我刚开始放出的链接中遇到的环境问题,我遇到了一个 段错误的一个问题,那是因为ps和worker的ip写的有问题
- training_role = os.getenv("PADDLE_TRAINING_ROLE", "TRAINER")
- port = os.getenv("PADDLE_PSERVER_PORT", "6174")
- pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "localhost")
- trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
- eplist = []
- for ip in pserver_ips.split(","):
- eplist.append(':'.join([ip, port]))
- pserver_endpoints = ",".join(eplist)
- trainers = int(os.getenv("PADDLE_TRAINERS","1"))
- current_endpoint = os.getenv("PADDLE_CURRENT_IP", "localhost") + ":" + port
我们一个一个来看,以上面的worker的代码为例(paddle中把worker成为tariner),从上往下讲
paddle_tarining_role(str):训练对象(trainer还是pserver)
paddle_pserver_port(int):端口(这里和tensorflow不同的是必须指定同一个端口,即所有tariner和ps都是6174)
paddle_pserver_ips(str):ps的ip地址,如果多个就用逗号隔开
paddle_trainer_id(int):tariner各自的编号,在ps中全部填写0
paddle_trainers(int):trainer的数量,即tensorflow中多少个worker
paddle_current_ip(str):当前ps节点的ip地址,如果是worker,则可以不指定
官方参数解读网站截图:
运行截图:
ok,大功告成!!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。