赞
踩
Dask源码剖析是一个专栏,更多章节请点击文章列表查看。后续我会更新更多内容上来。
对于Bag数据模型,其实从Dask官方进行的用户调研情况来看,这种数据模型较其他数据模型使用的情况是最少的:
但是Bag是较为简单的数据模型,对于理解其他数据模型,比如DataFrame、Array,我认为是有帮助的。所以咱们本节就先了解一下Bag。
从源码目录里,我们可以很快的定位到Bag的包:
就在dask源码包的下面,有个bag的包。而其核心代码都在core.py中。
还是老规矩,先构建一些简单的demo,然后根据demo通过debug的形式看下是如何实现的。(例子来自dask-tutorial)
import dask.bag as db
# 从序列创建bag
b = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], npartitions=2)
b.take(3)
上述例子从长度为10的整数数组,npartitions=2即分成两片,创建了一个bag。然后.take(3)即从bag中获取3个元素。
在from_sequence插入断点,我们开始深入源码,step into,可以看到:
# dask/bag/core.py def from_sequence(seq, partition_size=None, npartitions=None): """ 从序列创建一个dask.Bag。 如果非用这个方法,这个序列在内存中不要太大,最好让Dask Bag自己加载你的数据。比如我们加载一个文件名序列放入Bag中,然后使用“.map”打开它们。 Parameters ---------- seq: Iterable 一个可迭代的序列,导入dask中 partition_size: int (可选) 每个分片的大小 npartitions: int (可选) 分片的数量 最好提供 ``partition_size`` 或 ``npartitions``参数,但别两个参数都都填 另外参考: -------- read_text: 从文本文件创建bag """ seq = list(seq) if npartitions and not partition_size: partition_size = int(math.ceil(len(seq) / npartitions)) # 如果两个参数都不给,就按100的分片大小去切割元素 if npartitions is None and partition_size is None: if len(seq) < 100: partition_size = 1 else: partition_size = int(len(seq) / 100) # 这里是对序列进行切分的地方,这里还是用了toolz三方库, # 在客户端把seq序列按partition_size(此处为2)进行切分 parts = list(partition_all(partition_size, seq)) # 这时候parts=[(1, 2, 3, 4, 5), (6, 7, 8, 9, 10)] # 参考上一节Delayed介绍,这里应该是要开始转异步任务了,先创建了任务的name: name = "from_sequence-" + tokenize(seq, partition_size) # name = 'from_sequence-4206ac43a7f088cbbf77f5dc46ca024c' if len(parts) > 0: # enumerate不熟的看下https://www.runoob.com/python/python-func-enumerate.html # 这里实际就是把name和parts的序号作为key,part内容作为value,存到字典里。 d = dict(((name, i), list(part)) for i, part in enumerate(parts)) # d长这样: # {('from_sequence-4206ac43a7f088cbbf77f5dc46ca024c', 0): [1, 2, 3, 4, 5], # ('from_sequence-4206ac43a7f088cbbf77f5dc46ca024c', 1): [6, 7, 8, 9, 10]} else: d = {(name, 0): []} # 最后创建Bag,这里先不跟进细看了,我们先看完其他的构建方法。 return Bag(d, name, len(d))
上面的从客户端(往往是在笔记本上)内存创建的bag,受限于客户端的硬件配置,不适合大数据的加载。所以dask提供了read_text更为常见的数据加载方式。这里还是用了教程给的例子:
# 构建Demo数据:
%run prep.py -d accounts
这个时候在教程文件夹的data文件夹下,多了很多accounts.*.json.gz的数据:
总共50个吧。每个都是经过GZIP压缩的。所以扩展名是gz。每个大小在500KB左右。我们开始加载数据:
import os
b = db.read_text(os.path.join('data', 'accounts.*.json.gz'))
b.take(1)
通过debug,我们看下read_text内容:
# dask/bag/text.py def read_text( urlpath, blocksize=None, compression="infer", encoding=system_encoding, errors="strict", linedelimiter=os.linesep, collection=True, storage_options=None, files_per_partition=None, include_path=False, ): """ 从文本文件中加载 参数列表: ---------- urlpath : string 或 list 类型 可以支持:绝对或相对路径、带有协议前缀的url,比如``s3://``。其他类型我们过会看代码吧。 blocksize: None, int, or str 这个比较有意思,把文件切割成多大的块(按bytes为单位)。None的话会根据流(如http的报文流)大小来切。 也可以传个整数类型,或者"128MiB"这样的字符串 compression: string 文件的压缩格式,默认是根据文件自适应。 encoding: string errors: string linedelimiter: string collection: bool, optional 如果是True则返回dask.bag , 否则返回 delayed 数组 storage_options: dict 这块对于适配各种大数据平台比较有用,比如hdfs或s3的一些密码、host、port等等 files_per_partition: None or int 不设的话,一个文件一个分片(partition ),设了就按输入文件group 后再分片。这个参数和blocksize互斥。 include_path: bool 是否在Bag里包含path,是的话按元组 (line, path)构建bag,默认是不带path的 例子 -------- >>> b = read_text('myfiles.1.txt') # doctest: +SKIP >>> b = read_text('myfiles.*.txt') # doctest: +SKIP >>> b = read_text('myfiles.*.txt.gz') # doctest: +SKIP >>> b = read_text('s3://bucket/myfiles.*.txt') # doctest: +SKIP >>> b = read_text('s3://key:secret@bucket/myfiles.*.txt') # doctest: +SKIP >>> b = read_text('hdfs://namenode.example.com/myfiles.*.txt') # doctest: +SKIP 将未压缩字节数(blocksize)加载到分片: >>> b = read_text('largefile.txt', blocksize='10MB') # doctest: +SKIP include_path=True的情况: >>> b = read_text('myfiles.*.txt', include_path=True) # doctest: +SKIP >>> b.take(1) # doctest: +SKIP (('first line of the first file', '/home/dask/myfiles.0.txt'),) 返回值 ------- dask.bag.Bag 或 list dask.bag.Bag 或 Delayed 列表。取决于collection传True还是False """ # 这两个参数互斥,注释里提到过 if blocksize is not None and files_per_partition is not None: raise ValueError("Only one of blocksize or files_per_partition can be set") if isinstance(blocksize, str): blocksize = parse_bytes(blocksize) # 这里用了fsspec三方库,通过官方文档可以知道,这是一个key支持多种存储后端的文件操作库 # 查了下fsspec的官方文档:https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations # 大概支持hdfs、gcs、s3、ftp等协议,当然也支持本地绝对路径与相对路径。 files = open_files( urlpath, mode="rt", encoding=encoding, errors=errors, compression=compression, **(storage_options or {}) ) # 下面的逻辑是按照blocksize 或files_per_partition 进行分片。 if blocksize is None: if files_per_partition is None: blocks = [ delayed(list)(delayed(partial(file_to_blocks, include_path))(fil)) for fil in files ] else: blocks = [] for start in range(0, len(files), files_per_partition): block_files = files[start : (start + files_per_partition)] block_lines = delayed(concat)( delayed(map)(partial(file_to_blocks, include_path), block_files,) ) blocks.append(block_lines) else: o = read_bytes( urlpath, delimiter=linedelimiter.encode(), blocksize=blocksize, sample=False, compression=compression, include_path=include_path, **(storage_options or {}) ) raw_blocks = o[1] blocks = [delayed(decode)(b, encoding, errors) for b in concat(raw_blocks)] if include_path: paths = list( concat([[path] * len(raw_blocks[i]) for i, path in enumerate(o[2])]) ) blocks = [ delayed(attach_path)(entry, path) for entry, path in zip(blocks, paths) ] if not blocks: raise ValueError("No files found", urlpath) # 是否把block的delayed转成Bag对象。 if collection: blocks = from_delayed(blocks) return block
按照上述的例子加载50个json.gz,read_text我们实际上会按照一个文件一个block,最后把delayed构建成Bag对象中。用图表示Graph(DAG)如下:
回顾文本文件创建Bag的过程:
def file_to_blocks(include_path, lazy_file):
with lazy_file as f:
for line in f:
yield (line, lazy_file.path) if include_path else line
通过上述两种构建方法,Bag对象实际存储的是一个graph、它的name,还有就是它的分片(partition)数。而本身的数据,要么是内存中的元素数组,要么是获取元素的Delayed对象。
上面例子中,最先被使用的,便是take方法了:
import dask.bag as db
b = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], npartitions=2)
# 获取3个元素
b.take(3)
import os
b = db.read_text(os.path.join('data', 'accounts.*.json.gz'))
# 获取1个元素
b.take(1)
下面我们看一下take,源码在dask/bag/core.py,这里就补贴源码了。
take接受4个参数:
通过源码走读可以看到,take内部会创建一个新的Bag对象,而新的Bag对象(子Bag)是依赖于总的Bag的。如果获取元素的个数>1,会将结果通过toolz的concat函数拼接起来。子Bag的分片固定为1(毕竟take是想要获取结果了)。
那么问题来了,既然take创建的子Bag依赖于父Bag,那么是不是会把所有的元素(例如第二个例子里的50个json.gz)都加载到dask后,才能take呢?
如果从子Bag构建的graph看,是需要的,毕竟take依赖于父bag的bag-from-delayed-abd0…,而父Bag是会读取所有partitions的。但如果Dask真要这么搞,对大数据处理来说绝对是一场灾难,后面我也不会写下去了。通过跟源码我们发现,dask还是很智能的为我们解决了这个问题,其秘密就在take最后进行compute的时候,有optimize_graph这个选项。会对graph进行优化,以下截图自debug跟进到compute的时候:
优化后的dsk对象(要计算的graph)只会加载1个分片的文件。collections_to_dsk这一步是对其进行优化的关键,跟进去:
# dask/base.py def collections_to_dsk(collections, optimize_graph=True, **kwargs): """ Convert many collections into a single dask graph, after optimization """ optimizations = kwargs.pop("optimizations", None) or config.get("optimizations", []) # 对graph进行优化 # 假如optimize_graph改成False,那么是会对所有的Bag分片进行加载的。 if optimize_graph: # 这里的optimization_function代码我粘贴到本函数后面. # 可以看到,真实的优化函数,会定义在__dask_optimize__属性里。也就说, # Bag模块有自己的优化函数 groups = groupby(optimization_function, collections) _opt_list = [] for opt, val in groups.items(): _graph_and_keys = _extract_graph_and_keys(val) groups[opt] = _graph_and_keys # 调用优化函数,进行graph优化 _opt_list.append(opt(_graph_and_keys[0], _graph_and_keys[1], **kwargs)) for opt in optimizations: _opt_list = [] group = {} for k, (dsk, keys) in groups.items(): group[k] = (opt(dsk, keys), keys) _opt_list.append(opt(dsk, keys, **kwargs)) groups = group dsk = merge(*map(ensure_dict, _opt_list,)) else: dsk, _ = _extract_graph_and_keys(collections) return dsk def optimization_function(x): return getattr(x, "__dask_optimize__", dont_optimize)
Bag的优化函数:
# dask/bag/core.py
def optimize(dsk, keys, fuse_keys=None, rename_fused_keys=None, **kwargs):
""" Optimize a dask from a dask Bag. """
dsk = ensure_dict(dsk)
# 调用的公用的优化方法cull,就是在这里,Bag移除掉了不必要的加载。
dsk2, dependencies = cull(dsk, keys)
kwargs = {}
if rename_fused_keys is not None:
kwargs["rename_keys"] = rename_fused_keys
dsk3, dependencies = fuse(dsk2, keys + (fuse_keys or []), dependencies, **kwargs)
dsk4 = inline_singleton_lists(dsk3, keys, dependencies)
dsk5 = lazify(dsk4)
return dsk5
dask公用的优化方法:
# dask/optimization.py def cull(dsk, keys): """ Return new dask with only the tasks required to calculate keys. In other words, remove unnecessary tasks from dask. ``keys`` may be a single key or list of keys. Examples -------- >>> d = {'x': 1, 'y': (inc, 'x'), 'out': (add, 'x', 10)} >>> dsk, dependencies = cull(d, 'out') # doctest: +SKIP >>> dsk # doctest: +SKIP {'x': 1, 'out': (add, 'x', 10)} >>> dependencies # doctest: +SKIP {'x': set(), 'out': set(['x'])} Returns ------- dsk: culled dask graph dependencies: Dict mapping {key: [deps]}. Useful side effect to accelerate other optimizations, notably fuse. """ if not isinstance(keys, (list, set)): keys = [keys] seen = set() dependencies = dict() out = {} work = list(set(flatten(keys))) while work: new_work = [] for k in work: # 根据key找到真实依赖的任务 # 由于take-xxxx实际依赖的并不是bag-from-delayed-xxx,而是 # (bag-from-delayed-xxx,0),也就是说是可以定位到具体一个分片的 # 所以可以理论上是可以对dask的graph进行裁剪的,裁剪方法大致逻辑就是用 # (bag-from-delayed-xxx,0)作为key,在dsk字典里索引对应的子任务 # 例如list、再索引到file-to-block dependencies_k = get_dependencies(dsk, k, as_list=True) # fuse needs lists out[k] = dsk[k] dependencies[k] = dependencies_k for d in dependencies_k: if d not in seen: seen.add(d) new_work.append(d) work = new_work return out, dependencie
如果我们对optimize_graph改成False,那么dask是会把所有分片都加载到内存的。感兴趣的可以修改take的源码试一下:
# dask/bag/core.py
# def take(...):
graph = HighLevelGraph.from_collections(name, dsk, dependencies=[self])
b = Bag(graph, name, 1)
# 改为False试一下
if compute:
return tuple(b.compute(optimize_graph=False))
else:
return b
filter比较好理解,按照predicate(谓词)进行过滤,谓词函数会返回True或False,满足的结果会返回。
def filter(self, predicate): """ Filter elements in collection by a predicate function. >>> def iseven(x): ... return x % 2 == 0 >>> import dask.bag as db >>> b = db.from_sequence(range(5)) >>> list(b.filter(iseven)) # doctest: +SKIP [0, 2, 4] """ # 可以看到,又是组件graph的三步,name、dsk、HighLevelGraph name = "filter-{0}-{1}".format(funcname(predicate), tokenize(self, predicate)) dsk = dict( ((name, i), (reify, (filter, predicate, (self.name, i)))) for i in range(self.npartitions) ) graph = HighLevelGraph.from_collections(name, dsk, dependencies=[self]) # 最后返回的仍是Bag对象,名字改为filter-xxx-xxx,分片数不变 return type(self)(graph, name, self.npartitions)
map其实是更通用一点的filter,如果说filter是调用了系统的filter结合传入的谓词函数实现的过滤,那么map也可以做类似的事情。
原理上也是根据传入的函数以及参数,构成graph。具体代码不再展开。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。