赞
踩
DASK
一、Dask简介
Dask是一个并行计算库,能在集群中进行分布式计算,能以一种更方便简洁的方式处理大数据量,与Spark这些大数据处理框架相比较,Dask更轻。Dask更侧重与其他框架,如:Numpy,Pandas,Scikit-learning相结合,从而使其能更加方便进行分布式并行计算。
二、Dask数据结构
Dask存在三种最基本的数据结构,分别是:Arrays、Dataframes以及Bags,接下来详细介绍这三种数据结构以及其应用场景。
2.1. Arrays
Dask中的Arrays(位于包dask.arrays下),其实就是对Numpy中的ndarray的部分接口进行了改进,从而方便处理大数据量。对于大数据集,特别是其大小大于内存时,如果我们要对其计算,按照传统的方式,,我们会将其全部塞进内存里,那么这就会报Out-Of-Memory错误,当然,我们也可以一次读取一部分数据,那么我们是否可以提前将大数据集进行分块处理了,我们只需要控制每块数据集不超过内存,从而满足In-Memory计算了?Dask就是这样做的。盗用Dask官网的一张图:
从上图中可以看出,Dask Arrays里面的每块其实就是Numpy Array,那么我们很好奇,Dask是如何分块的了,我们从dask.array里面的from_array(dask.array.core)方法来解析一下:
我们发现对于一个Numpy Array,Dask对其分块的方式一共有6种,总结来看就是:对一个多维Numpy Array,假设其维度为:(D1, D2, …, Dn),那么chunks表示沿着不同Axis对其进行划分的块数,Dask会对这六种不同的方式转换成一种统一的方式(统一转换成第三种方式),如下所示:
其转换方式如下所示:
这里有一点需要注意,当chunks的值是类似‘1kiB’、‘1MiB’这种表示块大小的时候,那么在划分的时候,需要保证每块大小不能超过chunks限制的大小,例如:
因为 1kiB = 1000字节,而np.float32是4个字节,也就是将shape=(2000,)的数组分块时,没块的大小不能超过1000字节,如果将其分成8块,这每块的大小为250*4=1000,刚好满足块大小的限制。具体是怎么计算的了?
我们假设chunks=1000 Byte,待分块的Numpy Array dtype=float32, 即该数组每个元素占用了4个字节,因此我们知道,块中的元素个数不能超过 1000/4 = 250个,因此我们只需从最大块开始划分。
2.2. Dataframes
Dataframe是基于Pandas Dataframe改进的一个可以并行处理大数据量的数据结构,即使对大于内存的数据也是能够处理的(注意:dask.array并不能直接处理大于内存的处理,从其源码中可以看出从Numpy Array转为Dask Array时,首先需要将Numpy Array放入内存)。
Dask Dataframe的分区如下所示:
即以行来进行分区划分。我们接下来以dask.dataframe.read_csv(dask.dataframe.io.csv)来解析一下Dask是如何进行分区的。
接着通过read_pandas方法,然后通过read_bytes方法基于blocksize对每个文件进行一行一行的块划分(注意这里返回的是Delayed对象),最后通过text_blocks_to_pandas转成Dask Dataframe,那么为什么即使是Out-Of-Memory的数据仍然可以正常导入了,其实我们从下面的代码里面可以看出:
在这里有一个重要的概念——Delayed(延迟计算),这个之后会讲解,这里就暂时将其看作是一个指针,其并没有真的将所有分块后的数据读入内存,只是在内存中存放了一个指针指向了这些块数据。
通过以上的步骤,我们知道了当我读取一个csv文件时,其实Dask会将其转换成一个Delayed List,列表中的每一个Delayed对象计算后的大小都不会超过blocksize。那么当我们有了一个csv文件的Delayed对象后,我们怎么将其转换成Dask Dataframe了?我们接着分析:
在from_delayed(dask.dataframe.io.io.py)方法中我们可以看到:
因为我们知道这里的dfs其实是一个Delayed列表,那么就需要了解Delayed.dask是什么东西?为此我们进入到Delayed(dask.delayed.py)类中的dask属性:
在这里只要知道这里其实就是构建了一个任务图,接着创建Dask Dataframe:
具体过程如下所示(dask.dataframe.core.py):
我们发现最后其实就是基于构建的任务依赖图创建一个Dataframe。
2.3. Bags
对于Bags,其最主要的是用于半结构化的大数据集,比如日志或者博客等等,我们从其read_text(dask.bag.text.py)中来解析如何创建一个Dask Bag对象:
从里面的delayed方法我们可以看出,该方法最后返回的是一个Delayed对象。针对Delayed对象我们之后再介绍。
三、High Level Graph简介
正如我们在前一节中看到的,创建Dask Dataframe时,其实是通过HighLevelGraph构建了一个任务图,那么这个任务图是什么了?其实他本质上就是一个字典结构(Dict),从组成元素来看,一共由两部分组成,一个是动作(可看做是Task Graph中的节点),一个是依赖(可看做是Task Graph中的边),其定义为:
我们以官网上的一个例子来详细了解一下:
对于这个例子中的df,其指向的是最后一层的输出:
四、Dask分布式简介
Dask之所以能够高效的处理大数据量,在于其可进行分布式计算,对于分布式计算,那么就面临着两大基本问题:
1.如何通信或者说数据如何传输?
2.如何调度或者说Scheduler如何将Task分发给Worker?
在Dask 分布式中(也可以是伪分布式,即在本机中通过线程或者进程来并行处理,在本文中不作介绍),共有三种角色:Client端,Scheduler端以及Worker端,其中Client负责提交Task给Scheduler,Scheduler负责对提交的Task按照一定的策略分发给Worker,Worker进行实际的计算、数据存储,在此期间,Scheduler时刻关注着Worker的状态。
接下来我们将就这两个方面说明Dask是如何处理的。
1.如何通信问题
说到通信,无非需要解决两个问题:通信各节点怎么能互相找到,在Dask中,也就是Client端、Scheduler端及Worker端怎么能相互发现?首先在Dask中,需要通信的各个节点都需要有自己的一个身份ID,在Dask中是通过URI来表示的,例如tcp://192.168.0.1:35072,在Dask中,通常是通过TCP进行通信(也可以用TLS何Inproc);另一个问题,怎么传输信息,也就是待传输的数据格式如何定义?我们知道当Client提交一个Task给Scheduler时,最后的Task计算是由Worker来做的,同时需要考虑网络带宽,因此节点之间传输的数据量尽可能的少,同时传输速度尽可能快。因此,我们需要一个协议(Protocol)来指导数据的编码和解码,这里的数据不仅仅指Python Object,也指Python Function。
在Dask中,需要传输的原始Message是通过字典来表示的,如下所示:
接下来就是将其进行序列化,在Dask中目前是通过MsgPack与Pickle和CloudPickle进行序列化及反序列化。
2.如何调度问题
当在Client端提交了一批Task,那么Scheduler如何调度,这直接影响到整个分布式计算的效率。我们首先了解一下整体流程如下图所示:
当整个提交的Task完成后,用于可以通过Scheduler从Worker中获取Task运行结果。
在整个任务调度中,对于每个Worker,会经历接受新Task -> 等待依赖Task完成 -> 计算Task -> 保存数据 -> Garbage回收。而Scheduler的任务就是追踪所有的Worker,在Dask中,Scheduler会对下面三种实体的状态进行记录:
1.分解后的Task Graph。每个任务及对应的状态都是会被Scheduler记录
2.与Worker的连接
3.与Client的连接
对于每个任务,在Scheduler中有多个不同的状态表示,以及对应的状态转移过程,通过Scheduler中的TaskState类可以获取Scheduler管理的所有任务当前状态信息。
对于每个Worker,可以通过Scheduler中的WorkerState获取Scheduler管理的所有Worker。
对于Scheduler,为了能够更好的对Task进行调度,他会维护两个结构:一个用来收集那些资源占用情况占比过大(大大超过了平均值)的Worker,另一个用来收集那些资源占用率不高的Worker,以便可以立即执行Task。
对于每个Client,可以通过Scheduler中的ClientState获取Scheduler管理的所有Client。
在Scheduler中,监听着Client和Worker的状态,基于以下的动作从而更新Scheduler对于Client或Worker的状态:
对于Worker:
1.Task完成了
2.Task失败了
3.Task缺少必要的数据
4.添加Worker
5.移除Worker
对于Client:
1.更新Task Graph,添加更多Task
2.释放结果Key值,即不再需要Scheduler返回Task Graph运行结果了
从上面的介绍中,我们知道了Scheduler记录了关于所有Task、Client和Worker的信息,那么基于这些信息,该如何有效的进行调度?或者说Scheduler怎么从一个任务池中选择待分发的任务,又如何从候选Worker中选择可以接受任务的Worker?
对于Worker的选择,Scheduler会根据当前待分配任务的限制(比如是否限制GPU等)得到一个满足基本条件的Worker列表,然后会基于Worker的当前资源占用信息(比如当前内存占用率,当前运行Task数),还会综合考虑分配至哪个Worker上,在Worker间传输的数据最少,具体的Worker选择策略见 decide_worker方法。
对于Task的选择,Dask有自己的一套准则:
1.保证相对公平,即多个Client提交的Task,尽量保证先到先服务的原则
2.尽量优先运行重要的Task序列,从而减少总体运行时间和集群负载
3.尽量优先运行依赖了多个Task的任务,因为只有运行了这种任务,才能释放其所依赖Task占用的资源
4.尽量优先运行相关较高的Task
要保证上面的设计原则都完全满足是很困难的,最终Dask采用了一些启发原则,具体见dask/order.py。
五、延迟计算与即时计算
在Dask中有两种方式的计算,一种是延迟计算,称为Delayed,另一种是即时计算,称为Future,这两种计算是什么意思了?我们先从Delayed进行介绍。
1.Delayed
我们只需要在一个普通的Python Function上面通过dask.delayed函数进行封装,就能得到一个Delayed对象,如下所示:
所示Delayed只是构建一个Task Graph,并没有进行实际的计算,只有调用compute的时候,才开始进行计算。这对于处理那些可进行并行处理的操作有着很大的作用,不论是对于大数据量或是CPU密集型任务。
2.Future
对于Delayed,其并没有立即计算,只是构建了一个Task Graph,而对于Future是立即执行的,可以通过submit、map方法将一个Function提交给Scheduler,在后台,Scheduler会对提交的任务进行处理并分发给Workers进行实际的计算。当任务提交后,会返回一个指向任务运行结果的Key值,即Future对象,我们可以跟踪其当前状态,当然我们也可以通过result和gather方法等待任务完成后从而将结果收集到本地。
六、Dask机器学习库简介
目前Dask-ML对Sklearn支持较好,对Xgboost也能直接调用使用,对Tensorflow,由于TF已经在分布式处理上非常优秀了,所有Dask-ML并没有对其分布式过程进行过多的处理,其主要是在通过TF训练模型过程中,通过start_tensorflow指定集群的Parameter Server和 Worker Server,这些Server对应的就是Dask中的Workers。针对Dask-ML,如下图所示:
七、Dask与Tensorflow的结合
对于Dask与TF的结合,分为两个部分,一个是训练阶段,一个是预测阶段。对于训练阶段,相对来说较复杂些,可以参考这个链接:
http://matthewrocklin.com/blog/work/2017/02/11/dask-tensorflow
而对于预测阶段,就相对容易一些:
总体步骤就是:
1.将样本封装成Dask样本数据。这里有个问题需要注意,对于csv文本数据,我们可以很容易通过read_csv这样的方式得到Dask Dataframe格式数据,但是对于Image类型数据,因为Dask Dataframe仅支持二维Numpy Array内部数据,所以对于图片的处理需要借助Dask Array和Delayed进行处理。
2.通过client提交predict任务。
3.在predict任务中进行模型加载及实际预测任务。
八、Dask分布式性能诊断
TO DO
九、Dask与K8s的结合
目前Dask与K8s的结合,主要有如下几种方式:
目前自己测试,主要是基于Helm Repo中的stable/dask为基础镜像,然后添加一些必须的包依赖,构建的一个镜像,大小大约1.83G。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。