当前位置:   article > 正文

02 Dask源码剖析-Dask的数据模型-Bag_dask bag

dask bag

Dask源码剖析是一个专栏,更多章节请点击文章列表查看。后续我会更新更多内容上来。

Collection:Bag

在这里插入图片描述
对于Bag数据模型,其实从Dask官方进行的用户调研情况来看,这种数据模型较其他数据模型使用的情况是最少的:
在这里插入图片描述
但是Bag是较为简单的数据模型,对于理解其他数据模型,比如DataFrame、Array,我认为是有帮助的。所以咱们本节就先了解一下Bag。

从源码目录里,我们可以很快的定位到Bag的包:
在这里插入图片描述
就在dask源码包的下面,有个bag的包。而其核心代码都在core.py中。

还是老规矩,先构建一些简单的demo,然后根据demo通过debug的形式看下是如何实现的。(例子来自dask-tutorial)

Bag的创建

从内存序列创建(from_sequence)

import dask.bag as db
# 从序列创建bag
b = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], npartitions=2)
b.take(3)
  • 1
  • 2
  • 3
  • 4

上述例子从长度为10的整数数组,npartitions=2即分成两片,创建了一个bag。然后.take(3)即从bag中获取3个元素。
在from_sequence插入断点,我们开始深入源码,step into,可以看到:

# dask/bag/core.py
def from_sequence(seq, partition_size=None, npartitions=None):
    """ 从序列创建一个dask.Bag。
    如果非用这个方法,这个序列在内存中不要太大,最好让Dask Bag自己加载你的数据。比如我们加载一个文件名序列放入Bag中,然后使用“.map”打开它们。

    Parameters
    ----------
    seq: Iterable
        一个可迭代的序列,导入dask中
    partition_size: int (可选)
        每个分片的大小
    npartitions: int (可选)
        分片的数量

    最好提供 ``partition_size`` 或 ``npartitions``参数,但别两个参数都都填

    另外参考:
    --------
    read_text: 从文本文件创建bag
    """
    seq = list(seq)
    if npartitions and not partition_size:
        partition_size = int(math.ceil(len(seq) / npartitions))
    # 如果两个参数都不给,就按100的分片大小去切割元素
    if npartitions is None and partition_size is None:
        if len(seq) < 100:
            partition_size = 1
        else:
            partition_size = int(len(seq) / 100)
    # 这里是对序列进行切分的地方,这里还是用了toolz三方库,
    # 在客户端把seq序列按partition_size(此处为2)进行切分
    parts = list(partition_all(partition_size, seq))
    # 这时候parts=[(1, 2, 3, 4, 5), (6, 7, 8, 9, 10)]
    # 参考上一节Delayed介绍,这里应该是要开始转异步任务了,先创建了任务的name:
    name = "from_sequence-" + tokenize(seq, partition_size)
    # name = 'from_sequence-4206ac43a7f088cbbf77f5dc46ca024c'
    if len(parts) > 0:
        # enumerate不熟的看下https://www.runoob.com/python/python-func-enumerate.html
        # 这里实际就是把name和parts的序号作为key,part内容作为value,存到字典里。
        d = dict(((name, i), list(part)) for i, part in enumerate(parts))
        # d长这样:
        # {('from_sequence-4206ac43a7f088cbbf77f5dc46ca024c', 0): [1, 2, 3, 4, 5],
        # ('from_sequence-4206ac43a7f088cbbf77f5dc46ca024c', 1): [6, 7, 8, 9, 10]}
    else:
        d = {(name, 0): []}
	# 最后创建Bag,这里先不跟进细看了,我们先看完其他的构建方法。
    return Bag(d, name, len(d))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

从文本文件创建(read_text)

上面的从客户端(往往是在笔记本上)内存创建的bag,受限于客户端的硬件配置,不适合大数据的加载。所以dask提供了read_text更为常见的数据加载方式。这里还是用了教程给的例子:

# 构建Demo数据:
%run prep.py -d accounts
  • 1
  • 2

这个时候在教程文件夹的data文件夹下,多了很多accounts.*.json.gz的数据:
在这里插入图片描述
总共50个吧。每个都是经过GZIP压缩的。所以扩展名是gz。每个大小在500KB左右。我们开始加载数据:

import os
b = db.read_text(os.path.join('data', 'accounts.*.json.gz'))
b.take(1)
  • 1
  • 2
  • 3

通过debug,我们看下read_text内容:

# dask/bag/text.py
def read_text(
    urlpath,
    blocksize=None,
    compression="infer",
    encoding=system_encoding,
    errors="strict",
    linedelimiter=os.linesep,
    collection=True,
    storage_options=None,
    files_per_partition=None,
    include_path=False,
):
    """ 从文本文件中加载
    参数列表:
    ----------
    urlpath : string 或 list 类型
        可以支持:绝对或相对路径、带有协议前缀的url,比如``s3://``。其他类型我们过会看代码吧。
    blocksize: None, int, or str
        这个比较有意思,把文件切割成多大的块(按bytes为单位)。None的话会根据流(如http的报文流)大小来切。
        也可以传个整数类型,或者"128MiB"这样的字符串
    compression: string
        文件的压缩格式,默认是根据文件自适应。
    encoding: string
    errors: string
    linedelimiter: string
    collection: bool, optional
        如果是True则返回dask.bag , 否则返回 delayed 数组
    storage_options: dict
        这块对于适配各种大数据平台比较有用,比如hdfs或s3的一些密码、host、port等等
    files_per_partition: None or int
        不设的话,一个文件一个分片(partition ),设了就按输入文件group 后再分片。这个参数和blocksize互斥。
    include_path: bool
        是否在Bag里包含path,是的话按元组 (line, path)构建bag,默认是不带path的

    例子
    --------
    >>> b = read_text('myfiles.1.txt')  # doctest: +SKIP
    >>> b = read_text('myfiles.*.txt')  # doctest: +SKIP
    >>> b = read_text('myfiles.*.txt.gz')  # doctest: +SKIP
    >>> b = read_text('s3://bucket/myfiles.*.txt')  # doctest: +SKIP
    >>> b = read_text('s3://key:secret@bucket/myfiles.*.txt')  # doctest: +SKIP
    >>> b = read_text('hdfs://namenode.example.com/myfiles.*.txt')  # doctest: +SKIP

    将未压缩字节数(blocksize)加载到分片:
    >>> b = read_text('largefile.txt', blocksize='10MB')  # doctest: +SKIP

    include_path=True的情况:
    >>> b = read_text('myfiles.*.txt', include_path=True) # doctest: +SKIP
    >>> b.take(1) # doctest: +SKIP
    (('first line of the first file', '/home/dask/myfiles.0.txt'),)
    
    返回值
    -------
    dask.bag.Bag 或 list
        dask.bag.Bag 或 Delayed 列表。取决于collection传True还是False
    """
    # 这两个参数互斥,注释里提到过
    if blocksize is not None and files_per_partition is not None:
        raise ValueError("Only one of blocksize or files_per_partition can be set")
    if isinstance(blocksize, str):
        blocksize = parse_bytes(blocksize)
    # 这里用了fsspec三方库,通过官方文档可以知道,这是一个key支持多种存储后端的文件操作库
    # 查了下fsspec的官方文档:https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations
    # 大概支持hdfs、gcs、s3、ftp等协议,当然也支持本地绝对路径与相对路径。
    files = open_files(
        urlpath,
        mode="rt",
        encoding=encoding,
        errors=errors,
        compression=compression,
        **(storage_options or {})
    )
    # 下面的逻辑是按照blocksize 或files_per_partition 进行分片。
    if blocksize is None:
        if files_per_partition is None:
            blocks = [
                delayed(list)(delayed(partial(file_to_blocks, include_path))(fil))
                for fil in files
            ]
        else:
            blocks = []
            for start in range(0, len(files), files_per_partition):
                block_files = files[start : (start + files_per_partition)]
                block_lines = delayed(concat)(
                    delayed(map)(partial(file_to_blocks, include_path), block_files,)
                )
                blocks.append(block_lines)
    else:
        o = read_bytes(
            urlpath,
            delimiter=linedelimiter.encode(),
            blocksize=blocksize,
            sample=False,
            compression=compression,
            include_path=include_path,
            **(storage_options or {})
        )
        raw_blocks = o[1]
        blocks = [delayed(decode)(b, encoding, errors) for b in concat(raw_blocks)]
        if include_path:
            paths = list(
                concat([[path] * len(raw_blocks[i]) for i, path in enumerate(o[2])])
            )
            blocks = [
                delayed(attach_path)(entry, path) for entry, path in zip(blocks, paths)
            ]

    if not blocks:
        raise ValueError("No files found", urlpath)
    # 是否把block的delayed转成Bag对象。
    if collection:
        blocks = from_delayed(blocks)

    return block
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115

按照上述的例子加载50个json.gz,read_text我们实际上会按照一个文件一个block,最后把delayed构建成Bag对象中。用图表示Graph(DAG)如下:
在这里插入图片描述
回顾文本文件创建Bag的过程:

  1. 即从文件中逐行读取的操作,先从urlpath解析出文件fsspec对象(注意如果是分布式,其实是需要client和worker节点都可以访问到这些文件的路径的,client负责生成任务,取不到取不全这些本地路径是会有问题的,worker更不用说,加载的时候如果找不到文件也会有问题)
  2. 后续流程是转成了delayed
  3. 通过blocksize或files_per_partition对路径数组进行分片
  4. 通过file_to_blocks(生成器)按行读取文件
  5. 通过list获取生成器的值,转成list
  6. 将上述的delayed list作为Bag的Graph,对象
def file_to_blocks(include_path, lazy_file):
    with lazy_file as f:
        for line in f:
            yield (line, lazy_file.path) if include_path else line
  • 1
  • 2
  • 3
  • 4

Bag的构成

通过上述两种构建方法,Bag对象实际存储的是一个graph、它的name,还有就是它的分片(partition)数。而本身的数据,要么是内存中的元素数组,要么是获取元素的Delayed对象。

Bag的一些行为(Manipulation)

.take(k,npartitions)

上面例子中,最先被使用的,便是take方法了:

import dask.bag as db
b = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], npartitions=2)
# 获取3个元素
b.take(3)
import os
b = db.read_text(os.path.join('data', 'accounts.*.json.gz'))
# 获取1个元素
b.take(1)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

下面我们看一下take,源码在dask/bag/core.py,这里就补贴源码了。
take接受4个参数:

  • k:获取元素的个数
  • npartitions:int型,可选,从多少个分片中获取元素(注意不是第几个),如果传-1就从所有分片中获取。
  • compute:bool型,可选,默认True,即计算返回结果
  • warn:bool型,如果传入的比实际Bag中有的元素个数多,发出告警

通过源码走读可以看到,take内部会创建一个新的Bag对象,而新的Bag对象(子Bag)是依赖于总的Bag的。如果获取元素的个数>1,会将结果通过toolz的concat函数拼接起来。子Bag的分片固定为1(毕竟take是想要获取结果了)。

那么问题来了,既然take创建的子Bag依赖于父Bag,那么是不是会把所有的元素(例如第二个例子里的50个json.gz)都加载到dask后,才能take呢?

如果从子Bag构建的graph看,是需要的,毕竟take依赖于父bag的bag-from-delayed-abd0…,而父Bag是会读取所有partitions的。但如果Dask真要这么搞,对大数据处理来说绝对是一场灾难,后面我也不会写下去了。通过跟源码我们发现,dask还是很智能的为我们解决了这个问题,其秘密就在take最后进行compute的时候,有optimize_graph这个选项。会对graph进行优化,以下截图自debug跟进到compute的时候:
在这里插入图片描述
优化后的dsk对象(要计算的graph)只会加载1个分片的文件。collections_to_dsk这一步是对其进行优化的关键,跟进去:

# dask/base.py
def collections_to_dsk(collections, optimize_graph=True, **kwargs):
    """
    Convert many collections into a single dask graph, after optimization
    """
    optimizations = kwargs.pop("optimizations", None) or config.get("optimizations", [])
    # 对graph进行优化
    # 假如optimize_graph改成False,那么是会对所有的Bag分片进行加载的。
    if optimize_graph:
        # 这里的optimization_function代码我粘贴到本函数后面.
        # 可以看到,真实的优化函数,会定义在__dask_optimize__属性里。也就说,
        # Bag模块有自己的优化函数
        groups = groupby(optimization_function, collections)

        _opt_list = []
        for opt, val in groups.items():
            _graph_and_keys = _extract_graph_and_keys(val)
            groups[opt] = _graph_and_keys
            # 调用优化函数,进行graph优化
            _opt_list.append(opt(_graph_and_keys[0], _graph_and_keys[1], **kwargs))

        for opt in optimizations:
            _opt_list = []
            group = {}
            for k, (dsk, keys) in groups.items():
                group[k] = (opt(dsk, keys), keys)
                _opt_list.append(opt(dsk, keys, **kwargs))
            groups = group

        dsk = merge(*map(ensure_dict, _opt_list,))
    else:
        dsk, _ = _extract_graph_and_keys(collections)

    return dsk

def optimization_function(x):
    return getattr(x, "__dask_optimize__", dont_optimize)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

Bag的优化函数:

# dask/bag/core.py
def optimize(dsk, keys, fuse_keys=None, rename_fused_keys=None, **kwargs):
    """ Optimize a dask from a dask Bag. """
    dsk = ensure_dict(dsk)
    # 调用的公用的优化方法cull,就是在这里,Bag移除掉了不必要的加载。
    dsk2, dependencies = cull(dsk, keys)
    kwargs = {}
    if rename_fused_keys is not None:
        kwargs["rename_keys"] = rename_fused_keys
    dsk3, dependencies = fuse(dsk2, keys + (fuse_keys or []), dependencies, **kwargs)
    dsk4 = inline_singleton_lists(dsk3, keys, dependencies)
    dsk5 = lazify(dsk4)
    return dsk5
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

dask公用的优化方法:

# dask/optimization.py
def cull(dsk, keys):
    """ Return new dask with only the tasks required to calculate keys.

    In other words, remove unnecessary tasks from dask.
    ``keys`` may be a single key or list of keys.

    Examples
    --------
    >>> d = {'x': 1, 'y': (inc, 'x'), 'out': (add, 'x', 10)}
    >>> dsk, dependencies = cull(d, 'out')  # doctest: +SKIP
    >>> dsk  # doctest: +SKIP
    {'x': 1, 'out': (add, 'x', 10)}
    >>> dependencies  # doctest: +SKIP
    {'x': set(), 'out': set(['x'])}

    Returns
    -------
    dsk: culled dask graph
    dependencies: Dict mapping {key: [deps]}.  Useful side effect to accelerate
        other optimizations, notably fuse.
    """
    if not isinstance(keys, (list, set)):
        keys = [keys]

    seen = set()
    dependencies = dict()
    out = {}
    work = list(set(flatten(keys)))

    while work:
        new_work = []
        for k in work:
            # 根据key找到真实依赖的任务
            # 由于take-xxxx实际依赖的并不是bag-from-delayed-xxx,而是
            # (bag-from-delayed-xxx,0),也就是说是可以定位到具体一个分片的
            # 所以可以理论上是可以对dask的graph进行裁剪的,裁剪方法大致逻辑就是用
            # (bag-from-delayed-xxx,0)作为key,在dsk字典里索引对应的子任务
            # 例如list、再索引到file-to-block
            dependencies_k = get_dependencies(dsk, k, as_list=True)  # fuse needs lists
            out[k] = dsk[k]
            dependencies[k] = dependencies_k
            for d in dependencies_k:
                if d not in seen:
                    seen.add(d)
                    new_work.append(d)

        work = new_work

    return out, dependencie
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

如果我们对optimize_graph改成False,那么dask是会把所有分片都加载到内存的。感兴趣的可以修改take的源码试一下:

# dask/bag/core.py
# def take(...):
        graph = HighLevelGraph.from_collections(name, dsk, dependencies=[self])
        b = Bag(graph, name, 1)
		# 改为False试一下
        if compute:
            return tuple(b.compute(optimize_graph=False))
        else:
            return b
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

.filter(predicate)

filter比较好理解,按照predicate(谓词)进行过滤,谓词函数会返回True或False,满足的结果会返回。

    def filter(self, predicate):
        """ Filter elements in collection by a predicate function.

        >>> def iseven(x):
        ...     return x % 2 == 0

        >>> import dask.bag as db
        >>> b = db.from_sequence(range(5))
        >>> list(b.filter(iseven))  # doctest: +SKIP
        [0, 2, 4]
        """
        # 可以看到,又是组件graph的三步,name、dsk、HighLevelGraph
        name = "filter-{0}-{1}".format(funcname(predicate), tokenize(self, predicate))
        dsk = dict(
            ((name, i), (reify, (filter, predicate, (self.name, i))))
            for i in range(self.npartitions)
        )
        graph = HighLevelGraph.from_collections(name, dsk, dependencies=[self])
        # 最后返回的仍是Bag对象,名字改为filter-xxx-xxx,分片数不变
        return type(self)(graph, name, self.npartitions)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

.map(func, *args, **kwargs)

map其实是更通用一点的filter,如果说filter是调用了系统的filter结合传入的谓词函数实现的过滤,那么map也可以做类似的事情。
原理上也是根据传入的函数以及参数,构成graph。具体代码不再展开。

总结

  • Bag对象实际存储的是一个graph、它的name,还有就是它的分片(npartition)数。而本身的数据,要么是内存中的元素数组,要么是获取元素的Delayed对象。分片是把一个总的任务切分成多个子任务进行并行/分布式计算的根本。
  • Bag的行为(处理方法)是基于已有的graph,再进一步增加一些处理过程的graph。不会立即进行计算。
  • 真正的计算是graph触发compute时,进行一些优化(可选,默认是进行优化),选择调度器后才开始的。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/526100
推荐阅读
相关标签
  

闽ICP备14008679号