赞
踩
全栈工程师开发手册 (作者:栾鹏)
一站式云原生机器学习平台
开源地址:https://github.com/data-infra/cube-studio
cube studio 开源的国内最热门的一站式机器学习mlops/大模型训练平台,支持多租户,sso单点登录,支持在线镜像调试,在线ide开发,数据集管理,图文音标注和自动化标注,任务模板自定义,拖拉拽任务流,模型分布式多机多卡训练,超参搜索,模型管理,推理服务弹性伸缩,支持ml/tf/pytorch/onnx/tensorrt/llm模型0代码服务发布,以及配套资源监控和算力,存储资源管理。支持机器学习,深度学习,大模型 开发训练推理发布全链路。支持元数据管理,维表,指标,sqllab,数据etl等数据中台对接功能。支持多集群,边缘集群,serverless集群方式部署。支持计量计费,资源额度限制,支持vgpu,rdma,国产gpu,arm64架构。
aihub模型市场:支持AI hub模型市场,支持400+开源模型应用一键开发,一键微调,一键部署。
gpt大模型:支持40+开源大模型部署一键部署,支持ray,volcano,spark等分布式计算框架,支持tf,pytorch,mxnet,mpi,paddle,mindspre分布式多机多卡训练框架,支持deepspeed,colossalai,horovod分布式加速框架,支持llama chatglm baichuan qwen系列大模型微调。支持llama-factory 100+llm微调,支持大模型vllm推理加速,支持智能体私有知识库,智能机器人。
Volcano是一个基于Kubernetes的云原生批量计算平台,也是CNCF的首个批量计算项目。
volcano是华为开源出的分布式训练架构,github官方网址:https://github.com/volcano-sh/volcano
有时候单台机器多进程也无法快速完成代码运行,这个时候就需要多机器实现:
1、单机器算力有限,核数不足
2、有些运行有机器白名单显示,需要多台机器ip增加并发处理
volcano主要为我们提供index job, 也就是启动多个pod,并为每个pod提供index,role,以及其他role的访问地址。这样我们就可以用这些信息来做事情。
为了方便的实现一个volcano多机分布式集群,这里直接使用
https://github.com/tencentmusic/cube-studio 开源的云原生一站式机器学习平台。
使用volcano这个模板,填上自己的worker数量,每个worker的镜像和启动命令就可以了
部署分布式volcano集群 平台已经我们实现了,我们只需要编写分布式的代码。 要想针对实现并发操作
1、通过环境变量VC_WORKER_NUM 有多少个worker
2、通过环境变量VC_TASK_INDEX实现当前pod是第几个worker
3、每个worker里面都判别一遍总共需要处理的数据,和当前worker需要处理的数据。
4、代码根据当前是第几个worker处理自己该做的工作。
保留单机的代码,添加识别集群信息的代码(多少个worker,当前worker是第几个),添加分工(只处理归属于当前worker的任务),
import time, datetime, json, requests, io, os from multiprocessing import Pool from functools import partial import os, random, sys WORLD_SIZE = int(os.getenv('VC_WORKER_NUM', '1')) # 总worker的数目 RANK = int(os.getenv("VC_TASK_INDEX", '0')) # 当前是第几个worker 从0开始 print(WORLD_SIZE, RANK) # 子进程要执行的代码 def task(key): print(datetime.datetime.now(),'worker:', RANK, ', task:', key, flush=True) time.sleep(1) if __name__ == '__main__': # if os.path.exists('./success%s' % RANK): # os.remove('./success%s' % RANK) input = range(300) # 所有要处理的数据 local_task = [] # 当前worker需要处理的任务 for index in input: if index % WORLD_SIZE == RANK: local_task.append(index) # 要处理的数据均匀分配到每个worker # 每个worker内部还可以用多进程,线程池之类的并发操作。 pool = Pool(10) # 开辟包含指定数目线程的线程池 pool.map(partial(task), local_task) # 当前worker,只处理分配给当前worker的任务 pool.close() pool.join() # 添加文件标识,当前worker结束 # open('./success%s' % RANK, mode='w').close() # # rank0做聚合操作 # while (RANK == 0): # success = [x for x in range(WORLD_SIZE) if os.path.exists('./success%s' % x)] # if len(success) != WORLD_SIZE: # time.sleep(5) # else: # # 所有worker全部结束,worker0开始聚合操作 # print('begin reduce') # break
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。