赞
踩
目录
本文首先介绍Ray分布式的原理,对原理有了一定认识后,具体到使用就相对简单了,因此本文只会对使用做简要介绍,最后再说一下Ray的不足。文章的目的是让大家对Ray能有基本的了解,供大家判断Ray是否适合用于解决手头的问题。
Ray是一个开源统一框架,为机器学习和大数据处理相关程序提供了用于并行处理的计算层,降低了大规模端到端机器学习工作流的开发难度。
Ray提供了在数据预处理、分布式训练、超参数调优、模型服务和强化学习几个方面的并行方法,分别对应上图的Data、Train、Tune、Serve和RLib模块,用户只需要在原有代码中新增几行代码,即可实现方法的并行操作。
同时,通过KubeRay可以将Ray程序很容易的移植到Kubernetes集群中,利用云原生生态中的基础能力对Ray任务进行更精细的管理。
Ray通常是以集群的方式部署在多台服务器上。Head node是主节点,Worker node是工作节点,上图展示了由1个Head node和2个Worker node组成的Ray集群。
Ray任务在执行的过程中,可以根据用户在代码中的定义,判断将哪些无状态方法(Task)或者有状态类(Actor)进行分布式处理。
以方法举例,在方法上面加入一行@ray.remote装饰器,普通方法便成为了remote方法,可以被多个worker进程同时处理。并且可以指定每个worker进程所需要的资源,比如@ray.remote(num_cpus=4, num_gpus=2),指定需要worker具备4个cpu核心和2个gpu,同时可以指定小于1的资源,比如@ray.remote(num_cpus=0.5, num_gpus=0.5)。
- # By adding the `@ray.remote` decorator, a regular Python function
- # becomes a Ray remote function.
- @ray.remote
- def my_function():
- return 1
现在介绍一下worker进程,这是帮助理解Ray工作原理的核心概念。首先要把worker进程和worker node区分开,worker node指的是服务器,而worker进程是worker node上运行的进程,一个worker node可以运行多个worker进程。
Ray v2 Architecture中如下介绍worker进程:
One or more worker processes, responsible for task submission and execution. A worker process is either stateless (can be reused to execute any @ray.remote function) or an actor (can only execute methods according to its @ray.remote class). Each worker process is associated with a specific job. The default number of initial workers is equal to the number of CPUs on the machine. Each worker stores:
An ownership table. System metadata for the objects to which the worker has a reference, e.g., to store ref counts and object locations.
An in-process store, used to store small objects.
一个任务需要多个worker进程来执行,worker进程可以是无状态方法task或者有状态类actor,每个worker进程都属于某个任务,每个worker节点上默认的worker初始进程数等于CPU核数。每个worker进程存储着方法或者变量调用的关系和small objects。object指的是Ray中的变量,比如task的返回值或者用户自定义put的变量。
Object - An application value. These are values that are returned by a task or created through `ray.put`. Objects are immutable: they cannot be modified once created. A worker can refer to an object using an `ObjectRef`.
- from ray.train import ScalingConfig
-
- scaling_config = ScalingConfig(
- num_workers=2,
- resources_per_worker={
- "CPU": 4,
- "GPU": 2,
- },
- use_gpu=True,
- )
以Ray Train模型训练模块为例,代码设置了2个woker进程,每个woker设置了4个cpu和2个gpu,设置完成后,代码具体执行流程如下图所示,实现多个worker同时执行训练操作。
最后说明一下,分布式多机多卡训练得到的结果和单机单卡训练的结果肯定会有一定差异,差异在可接受的范围内就可以忽略。
根据请求的资源,将Woker Node分为如下几种:
可行:节点具有运行任务或参与者所需的资源。根据这些资源的当前可用性,有两种子状态:
可用:节点拥有所需的资源,并且这些资源现在是免费的。
不可用:节点具有所需的资源,但它们当前正被其他任务或参与者使用。
不可行:节点没有所需的资源。例如,仅包含 CPU 的节点对于 GPU 任务是不可行的。
下图介绍了一个通常的申请资源进行节点选择的过程。首先判断集群中local节点是否有足够的资源并且worker可用,如果有则进行分配,如果没有,则在remote节点进行重新选择。还涉及节点亲和性相关的概念,具体请参考Ray v2 Architecture和Scheduling — Ray 2.9.3。
相信有了前面的介绍,大家应该可以猜出Ray中弹性伸缩是如何实现的。~~思考一分钟。
- from ray import serve
-
-
- @serve.deployment(
- ray_actor_options={"num_cpus": 1},
- max_concurrent_queries=5,
- autoscaling_config={
- "target_num_ongoing_requests_per_replica": 1,
- "min_replicas": 0,
- "initial_replicas": 0,
- "max_replicas": 200,
- },
- )
Ray是通过对worker进程数量的增减实现弹性伸缩的,弹性伸缩主要用在模型服务中,对应的是Ray Serve模块,通过4个参数对其进行控制。
上图是Ray Serve中弹性伸缩模块的设计图,对于每个任务,弹性伸缩程序会定期检查DeploymentHandle副本上的队列和正在进行的查询,以决定是否缩放副本数量。每个都DeploymentHandle不断轮询控制器以检查新的部署副本。每当发现新副本时,它都会向副本发送任何缓冲的或新的查询,直到max_concurrent_queries到达为止。
使用KubeRay可以很容易地在Kubernetes集群中启动关闭Ray集群、提交任务。KubeRay提供了三种CRD: RayCluster、RayJob、RayService。
在Kubernetes中的每个Ray集群包括一个Head Pod和多个Worker Pod,需要为每个Pod设置所需的cpu、gpu资源,这里的Pod等价于Ray服务器集群中的worker进程,用来实现任务的分布式处理和弹性伸缩。官方建议创建少量大资源pod优于创建多个小资源pod,因为同一个Pod可共享内部的对象存储,还可以降低pods之间的通信次数。
本节以Ray Serve简要介绍Ray的使用。
可以用@serve.deployment()设置worker进程的数量和资源,如下:
- import ray
- from ray import serve
- from fastapi import FastAPI
-
- from transformers import pipeline
-
- app = FastAPI()
-
-
- @serve.deployment(num_replicas=2, ray_actor_options={"num_cpus": 0.2, "num_gpus": 0})
- @serve.ingress(app)
- class Translator:
- def __init__(self):
- # Load model
- self.model = pipeline("translation_en_to_fr", model="t5-small")
-
- @app.post("/")
- def translate(self, text: str) -> str:
- # Run inference
- model_output = self.model(text)
-
- # Post-process output to return only the translation text
- translation = model_output[0]["translation_text"]
-
- return translation
-
-
- translator_app = Translator.bind()
利用@serve.deployment可以将多个模型进行组合编排,下一个模型可以读取前一个模型的结果,如下:
- # File name: hello.py
- from ray import serve
- from ray.serve.handle import DeploymentHandle
-
-
- @serve.deployment
- class LanguageClassifer:
- def __init__(
- self, spanish_responder: DeploymentHandle, french_responder: DeploymentHandle
- ):
- self.spanish_responder = spanish_responder
- self.french_responder = french_responder
-
- async def __call__(self, http_request):
- request = await http_request.json()
- language, name = request["language"], request["name"]
-
- if language == "spanish":
- response = self.spanish_responder.say_hello.remote(name)
- elif language == "french":
- response = self.french_responder.say_hello.remote(name)
- else:
- return "Please try again."
-
- return await response
-
-
- @serve.deployment
- class SpanishResponder:
- def say_hello(self, name: str):
- return f"Hola {name}"
-
-
- @serve.deployment
- class FrenchResponder:
- def say_hello(self, name: str):
- return f"Bonjour {name}"
-
-
- spanish_responder = SpanishResponder.bind()
- french_responder = FrenchResponder.bind()
- language_classifier = LanguageClassifer.bind(spanish_responder, french_responder)
Ray提供了数据预处理、分布式训练、超参数调优、模型服务多种功能,覆盖了机器学习的全流程,但是使用过程中也发现了几点不足,如下:
以上列了Ray的一些问题,但是这并不是说Ray不好,每个产品都有其专注的点,Ray已经在其领域做到了顶尖,并且有着非常活跃的社区和详细的使用文档,推荐同学们都去尝试一下。
Ray: A Distributed Framework for Emerging AI Applications
本文介绍了Ray的原理,欢迎大家拍砖交流。如果大家关注MLOps相关的技术,欢迎大家点赞关注
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。