赞
踩
全栈工程师开发手册 (作者:栾鹏)
python教程全解
并行和分布式计算是现代应用程序的主要内容。我们需要利用多个核心或多台机器来加速应用程序或大规模运行它们。网络爬虫和搜索所使用的基础设施并不是在某人笔记本电脑上运行的单线程程序,而是相互通信和交互的服务的集合。
ray的api接口教程:https://ray.readthedocs.io/en/latest/api.html
很多教程解释了如何使用 Python 的多进程模块(https://docs.python.org/2/library/multiprocessing.html)。遗憾的是,多进程模块在处理现代应用程序的要求方面存在严重的短板。这些要求包括以下这些内容:
在多台计算机上运行相同的代码。
构建有状态且可以与之通信的微服务和 actor。
优雅地处理机器故障。
有效处理大对象和数值数据。
Ray(https://github.com/ray-project/ray)解决了所有这些问题,在保持简单性的同时让复杂的行为成为可能。
与深度学习框架的关系: Ray与TensorFlow,PyTorch和MXNet等深度学习框架完全兼容,在许多应用中与Ray一起使用一个或多个深度学习框架是很自然的(例如,我们的强化学习库使用TensorFlow和PyTorch)。
与其他分布式系统的关系:今天使用了许多流行的分布式系统,但是其中大多数并不是用AI应用程序构建的,并且缺乏支持所需的性能以及表示AI应用程序的API。从今天的分布式系统来看,它们缺少以下功能(以各种组合方式):
支持毫秒级任务和每秒数百万个任务
嵌套并行(在任务内并行化任务,例如超参数搜索内部的并行模拟)(见下图)
在运行时动态确定任意任务依赖关系(例如,为了避免等待缓慢的工作人员)
在共享可变状态下运行的任务(例如,神经网络权重或模拟器)
支持异构资源(CPU,GPU等)
有两种使用Ray的主要方法:通过其较低级别的API和更高级别的库。较高级别的库建立在较低级别的API之上。目前这些包括Ray RLlib,一个可扩展的强化学习库和Ray.tune,一个高效的分布式超参数搜索库。
ray.init() 命令将启动所有相关的 Ray 进程。在切换到集群时,这是唯一需要更改的行(我们需要传入redis的地址)。
ray.init(redis_address="123.45.67.89:6379")
这些过程包括:
有很多 worker 进程并行执行 Python 函数(大概是每个 CPU 核心对应一个 worker)。
用于将“任务”分配给 worker(以及其他计算机)的调度程序进程。任务是 Ray 调度的工作单元,对应于一个函数调用或方法调用。
共享内存对象存储库,用于在 worker 之间有效地共享对象(无需创建副本)。
内存数据库,用于存储在发生机器故障时重新运行任务所需的元数据。
Ray worker 是独立的进程,而不是线程,因为在 Python 中存在全局解释器锁,所以对多线程的支持非常有限。
Ray API的目标是自然地表达非常普遍的计算模式和应用程序,而不受像MapReduce这样的固定模式的限制。
Ray应用程序或作业中的基础基元是一个动态任务图。这与TensorFlow中的计算图非常不同。而在TensorFlow中,一个计算图代表一个神经网络,并且在单个应用程序中执行多次,在Ray中,任务图代表整个应用程序,并且只执行一次。任务图不是事先知道的。它是在应用程序运行时动态构建的,执行一个任务可能会触发创建更多任务。
任意的Python函数都可以作为任务执行,并且可以任意依赖其他任务的输出。下面的例子给出了说明。
要将 Python 函数 f 转换为一个“远程函数”(可以远程和异步执行的函数),可以使用 @ray.remote 装饰器来声明这个函数。然后函数调用 f.remote() 将立即返回一个 future(future 是对最终输出的引用),实际的函数执行将在后台进行(我们将这个函数执行称为任务)。
要将一个任务的输出作为输入提供给后续任务,只需将第一个任务返回的 future 作为参数传给第二个任务。Ray 的调度程序会自动考虑任务依赖关系。在第一个任务完成之前不会执行第二个任务,第一个任务的输出将自动被发送给执行第二个任务的机器。
import ray import time,datetime # Start Ray. ray.init() import numpy as np # 定义两个远程函数。 # 这些函数的调用创建了远程执行的任务 @ray.remote def create_matrix(size): return np.random.normal(size=size) @ray.remote def multiply_matrices(x, y): return np.dot(x, y) result_ids = [] for i in range(400): # 开始两个并行的任务,这些会立即返回futures并在后台执行 x_id = create_matrix.remote([1000, 1000]) print(datetime.datetime.now()) y_id = create_matrix.remote([1000, 1000]) print(datetime.datetime.now()) # 开始第三个任务,但这并不会被提前计划,直到前两个任务都完成了. result_ids.append(multiply_matrices.remote(x_id, y_id)) print(datetime.datetime.now()) # 获取结果。这个结果直到第三个任务完成才能得到。只有get创建以后所有的任务才开始创建执行。 z_id = ray.get(result_ids) print(z_id)
可以看看机器的性能损耗
下图是两个聚合过程和相应的函数。以线性方式聚合值与以树形结构方式聚合值的对比
右图方式的聚合函数会比左图方式的聚合更高校,因为在一个任务
import time @ray.remote def add(x, y): time.sleep(1) return x + y # Aggregate the values slowly. This approach takes O(n) where n is the # number of values being aggregated. In this case, 7 seconds. id1 = add.remote(1, 2) id2 = add.remote(id1, 3) id3 = add.remote(id2, 4) id4 = add.remote(id3, 5) id5 = add.remote(id4, 6) id6 = add.remote(id5, 7) id7 = add.remote(id6, 8) result = ray.get(id7) # Aggregate the values in a tree-structured pattern. This approach # takes O(log(n)). In this case, 3 seconds. id1 = add.remote(1, 2) id2 = add.remote(3, 4) id3 = add.remote(5, 6) id4 = add.remote(7, 8) id5 = add.remote(id1, id2) id6 = add.remote(id3, id4) id7 = add.remote(id5, id6) result = ray.get(id7)
# Slow approach.
values = [1, 2, 3, 4, 5, 6, 7, 8]
while len(values) > 1:
values = [add.remote(values[0], values[1])] + values[2:]
result = ray.get(values[0])
# Fast approach.
values = [1, 2, 3, 4, 5, 6, 7, 8]
while len(values) > 1:
values = values[2:] + [add.remote(values[0], values[1])]
result = ray.get(values[0])
首先来看一下最简单的Ray程序是如何编写的。
# 导入ray,并初始化执行环境 import ray ray.init() # 定义ray remote函数 @ray.remote def hello(): return "Hello world !" # 异步执行remote函数,返回结果id object_id = hello.remote() # 同步获取计算结果 hello = ray.get(object_id) # 输出计算结果 print hello
在Ray里,通过Python注解@ray.remote
定义remote函数。使用此注解声明的函数都会自带一个默认的方法remote
,通过此方法发起的函数调用都是以提交分布式任务的方式异步执行的,函数的返回值是一个对象id,使用ray.get
内置操作可以同步获取该id对应的对象。熟悉Java里的Future机制的话对此应该并不陌生,或许会有人疑惑这和普通的异步函数调用没什么大的区别,但是这里最大的差异是,函数hello是分布式异步执行的。
remote函数是Ray分布式计算抽象中的核心概念,通过它开发者拥有了动态定制计算依赖(任务DAG)的能力。比如:
@ray.remote def A(): return "A" @ray.remote def B(): return "B" @ray.remote def C(a, b): return "C" a_id = A.remote() b_id = B.remote() c_id = C.remote(a_id, b_id) print ray.get(c_id)
例子代码中,对函数A、B的调用是完全并行执行的,但是对函数C的调用依赖于A、B函数的返回结果。Ray可以保证函数C需要等待A、B函数的结果真正计算出来后才会执行。如果将函数A、B、C类比为DAG的节点的话,那么DAG的边就是函数C参数对函数A、B计算结果的依赖,自由的函数调用方式允许Ray可以自由地定制DAG的结构和计算依赖关系。另外,提及一点的是Python的函数可以定义函数具有多个返回值,这也使得Python的函数更天然具备了DAG节点多入和多出的特点。
Ray是使用什么样的架构对分布式计算做出如上抽象的呢,一下给出了Ray的系统架构(来自Ray论文,参考文献1)。
作为分布式计算系统,Ray仍旧遵循了典型的Master-Slave的设计:Master负责全局协调和状态维护,Slave执行分布式计算任务。不过和传统的分布式计算系统不同的是,Ray使用了混合任务调度的思路。在集群部署模式下,Ray启动了以下关键组件:
需要说明的是,Ray的论文中提及,全局调度器可以启动一到多个,而目前Ray的实现文档里讨论的内容都是基于一个全局调度器的情况。我猜测可能是Ray尚在建设中,一些机制还未完善,后续读者可以留意此处的细节变化。
Ray的任务也是通过类似Spark中Driver的概念的方式进行提交的,有所不同的是:
论文给出的架构图里并未画出Driver的概念,因此我在其基础上做了一些修改和扩充。
Ray的Driver节点和和Slave节点启动的组件几乎相同,不过却有以下区别:
基于以上架构,我们简单讨论一下Ray中关键的操作和流程。
在PythonShell中,使用ray.init()
可以在本地启动ray,包括Driver、HeadNode(Master)和若干Slave。
import ray
ray.init()
如果是直连已有的Ray集群,只需要指定RedisServer的地址即可。
ray.init(redis_address="<redis-address>")
本地启动Ray得到的输出如下:
>>> ray.init()
Waiting for redis server at 127.0.0.1:58807 to respond...
Waiting for redis server at 127.0.0.1:23148 to respond...
Allowing the Plasma store to use up to 13.7439GB of memory.
Starting object store with directory /tmp and huge page support disabled
Starting local scheduler with 8 CPUs, 0 GPUs
======================================================================
View the web UI at http://localhost:8888/notebooks/ray_ui62614.ipynb?token=7c253b0fd66fe41294d9f2c6739e3f002c1e76f6f59b99f5
======================================================================
{'object_store_addresses': [ObjectStoreAddress(name='/tmp/plasma_store73540254', manager_name='/tmp/plasma_manager78072648', manager_port=39874)], 'redis_address': '127.0.0.1:58807', 'local_scheduler_socket_names': ['/tmp/scheduler98624129'], 'webui_url': 'http://localhost:8888/notebooks/ray_ui62614.ipynb?token=7c253b0fd66fe41294d9f2c6739e3f002c1e76f6f59b99f5', 'node_ip_address': '127.0.0.1'}
>>>
本地启动Ray时,可以看到Ray的WebUI的访问地址。
使用ray.put()
可以将Python对象存入本地ObjectStore,并且异步返回一个唯一的ObjectID。通过该ID,Ray可以访问集群中任一个节点上的对象(远程对象通过查阅Master的对象表获得)。
对象一旦存入ObjectStore便不可更改,Ray的remote函数可以将直接将该对象的ID作为参数传入。使用ObjectID作为remote函数参数,可以有效地减少函数参数的写ObjectStore的次数。
@ray.remote
def f(x):
pass
x = "hello"
# 对象x往ObjectStore拷贝里10次
[f.remote(x) for _ in range(10)]
# 对象x仅往ObjectStore拷贝1次
x_id = ray.put(x)
[f.remote(x_id) for _ in range(10)]
使用ray.get()
可以通过ObjectID获取ObjectStore内的对象并将之转换为Python对象。对于数组类型的对象,Ray使用共享内存机制减少数据的拷贝成本。而对于其它对象则需要将数据从ObjectStore拷贝到进程的堆内存中。
如果调用ray.get()
操作时,对象尚未创建好,则get操作会阻塞,直到对象创建完成后返回。get操作的关键流程如下:
另外,ray.get()
可以一次性读取多个对象的数据:
result_ids = [ray.put(i) for i in range(10)]
ray.get(result_ids) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Ray中使用注解@ray.remote
可以声明一个remote function。remote函数时Ray的基本任务调度单元,remote函数定义后会立即被序列化存储到RedisServer中,并且分配了一个唯一的ID,这样就保证了集群的所有节点都可以看到这个函数的定义。
不过,这样对remote函数定义有了一个潜在的要求,即remote函数内如果调用了其它的用户函数,则必须提前定义,否则remote函数无法找到对应的函数定义内容。
remote函数内也可以调用其它的remote函数,Driver和Slave每次调用remote函数时,其实都是向集群提交了一个计算任务,从这里也可以看到Ray的分布式计算的自由性。
Ray中调用remote函数的关键流程如下:
ray.put()
操作存入ObjectStore然后返回ObjectID)、函数返回值对象的ID。@ray.remote
注解有一个参数num_return_vals
用于声明remote函数的返回值个数,基于此实现remote函数的多返回值机制。
@ray.remote(num_return_vals=2)
def f():
return 1, 2
x_id, y_id = f.remote()
ray.get(x_id) # 1
ray.get(y_id) # 2
@ray.remote
注解的另一个参数num_gpus
可以为任务指定GPU的资源。使用内置函数ray.get_gpu_ids()
可以获取当前任务可以使用的GPU信息。
@ray.remote(num_gpus=1)
def gpu_method():
return "This function is allowed to use GPUs {}.".format(ray.get_gpu_ids())
ray.wait()
操作支持批量的任务等待,基于此可以实现一次性获取多个ObjectID对应的数据。
# 启动5个remote函数调用任务
results = [f.remote(i) for i in range(5)]
# 阻塞等待4个任务完成,超时时间为2.5s
ready_ids, remaining_ids = ray.wait(results, num_returns=4, timeout=2500)
上述例子中,results包含了5个ObjectID,使用ray.wait
操作可以一直等待有4个任务完成后返回,并将完成的数据对象放在第一个list类型返回值内,未完成的ObjectID放在第二个list返回值内。如果设置了超时时间,那么在超时时间结束后仍未等到预期的返回值个数,则已超时完成时的返回值为准。
使用ray.error_info()可以获取任务执行时产生的错误信息。
>>> import time >>> @ray.remote >>> def f(): >>> time.sleep(5) >>> raise Exception("This task failed!!") >>> f.remote() Remote function __main__.f failed with: Traceback (most recent call last): File "<stdin>", line 4, in f Exception: This task failed!! You can inspect errors by running ray.error_info() If this driver is hanging, start a new one with ray.init(redis_address="127.0.0.1:65452") >>> ray.error_info() [{'type': 'task', 'message': 'Remote function \x1b[31m__main__.f\x1b[39m failed with:\n\nTraceback (most recent call last):\n File "<stdin>", line 4, in f\nException: This task failed!!\n', 'data': '{\'function_id\': "Hm\\xde\\x93\'\\x91\\xce\\x13ld\\xf4O\\xd7\\xce\\xc2\\xe1\\x151\\x1e3", \'function_name\': u\'__main__.f\'}'}]
Ray的remote函数只能处理无状态的计算需求,有状态的计算需求需要使用Ray的Actor实现。在Python的class定义前使用@ray.remote
可以声明Actor。
@ray.remote
class Counter(object):
def __init__(self):
self.value = 0
def increment(self):
self.value += 1
return self.value
使用如下方式创建Actor对象。
a1 = Counter.remote()
a2 = Counter.remote()
Ray创建Actor的流程为:
从流程可以看出,Actor对象的创建时并行的。
通过调用Actor对象的方法使用Actor。
a1.increment.remote() # ray.get returns 1
a2.increment.remote() # ray.get returns 1
调用Actor对象的方法的流程为:
为了保证Actor状态的一致性,对同一个Actor的方法调用是串行执行的。
如果只是使用Ray,可以使用如下命令直接安装。
pip intall ray
如果需要编译Ray的最新源码进行安装,按照如下步骤进行(MaxOS):
# 更新编译依赖包 brew update brew install cmake pkg-config automake autoconf libtool boost wget pip install numpy cloudpickle funcsigs click colorama psutil redis flatbuffers cython --ignore-installed six # 下载源码编译安装 git clone https://github.com/ray-project/ray.git cd ray/python python setup.py install # 测试 python test/runtest.py # 安装WebUI需要的库[可选] pip install jupyter ipywidgets bokeh # 编译Ray文档[可选] cd ray/doc pip install -r requirements-doc.txt make html open _build/html/index.html
我在MacOS上安装jupyter时,遇到了Python的setuptools库无法升级的情况,原因是MacOS的安全性设置问题,可以使用如下方式解决:
Command+R
进入Mac保护模式。csrutils disable
关闭系统安全策略。csrutils enable
,再次重启即可。进入PythonShell,输入代码本地启动Ray:
import ray
ray.init()
浏览器内打开WebUI界面如下:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。