当前位置:   article > 正文

Dask Bag 应用_dask包

dask包
  • Dask Bag包提供了如map\filter\groupby和python对象的集合聚集。
  • 类似于pythonnic版本的pyspark RDD。
  • Dask 包通常用于对非结构化或半结构化数据(如文本数据、日志文件、JSON 记录或用户定义的 Python 对象)的简单计算进行并行化。

    Bag的优点:

    • Parallel:数据被拆分,允许多个内核或机器并行执行;

    • 迭代:数据处理延迟,允许大于内存的数据平滑执行,即使在单个分区内的单个机器上;


    默认情况下,dask.bag使用dask.multiprocessing的计算。作为一个好处,Dask 绕过GIL并在纯 Python 对象上使用多个内核。作为一个缺点,Dask Bag 在包含大量工作间通信的计算中表现不佳。对于常见的操作,这很少成为问题,因为大多数 Dask Bag 工作流程都是令人尴尬的并行,或者导致在工作人员之间几乎没有数据移动的情况下减少。

    某些操作,例如groupby,需要大量的工作间通信。在单台机器上,Dask 使用partd执行高效、并行、溢出到磁盘的洗牌。在集群中工作时,Dask 使用基于任务的 shuffle。

    这些 shuffle 操作代价高昂,并且由 dask.dataframe. 最好使用dask.bag清理和处理数据,然后将其转换为数组或 DataFrame,然后再开始需要 shuffle 步骤的更复杂的操作。

    Bags 提供了非常通用的计算(任何 Python 函数)。这种普遍性是有代价的。袋子具有以下已知限制:

    • 默认情况下,它们依赖于多处理调度程序,它有自己的一组已知限制
    • Bag 是不可变的,所以你不能改变单个元素
    • Bag 操作往往比数组/DataFrame 计算慢,就像标准 Python 容器比 NumPy 数组和 Pandas DataFrames 慢一样
    • 包的groupby慢。如果可能可以尝试使用 foldby。

    Bag是允许重复的无序集合的数学名称。它是multiset的友好同义词。包或多重集是集合概念的概括,与集合不同,它允许多重集元素的多个实例:

    • list:有重复的有序集合,[1, 2, 3, 2]
    • set:无重复的无序集合, {1, 2, 3}
    • bag:重复的无序集合,{1, 2, 2, 3}

    所以,bag 就像一个列表,但它不能保证元素之间的排序。可以有重复的元素,但你不能要求第 i 个元素。

读取json数据:

  1. import dask
  2. import json
  3. import os
  4. # 创建测试数据
  5. data=dask.datasets.make_people()
  6. # 这里可能需要再安装mimesis包
  7. # ! pip install mimesis
  8. #data.take(2)
  9. # 保存为json数据
  10. data.map(json.dumps).to_textfiles("data/test_*.json")
  11. # 读取json数据
  12. import dask.bag as db
  13. b=db.read_text("data/test_*.json").map(json.loads)
  14. b.take(1)

常用方法

  1. b.filter(lambda data:data["age"]<40).take(2)
  2. # 过滤年龄小于40的
  3. b.count().compute()
  4. # 统计记录数
  5. b.map(lambda data:data["occupation"]).take(1)
  6. # 链式计算
  7. # 获取最大年龄和最小的年龄
  8. b.map(lambda data:data["age"]).max().compute()
  9. b.map(lambda data:data["age"]).min().compute()
  10. # 获取年龄出现频次最多的5个
  11. b.map(lambda data:data["age"]).frequencies(sort=True).topk(5).compute()
  12. b=b.persist() # 保存到内存中
  13. # 转换和存储
  14. b.filter(lambda x:x["age"]>40).map(json.dumps).to_textfiles("data/processed.*.json")
  15. # 转换为dataframe
  16. def flatten(record):
  17. return {
  18. 'age': record['age'],
  19. 'occupation': record['occupation'],
  20. 'telephone': record['telephone'],
  21. 'credit-card-number': record['credit-card']['number'],
  22. 'credit-card-expiration': record['credit-card']['expiration-date'],
  23. 'name': ' '.join(record['name']),
  24. 'street-address': record['address']['address'],
  25. 'city': record['address']['city']
  26. }
  27. b.map(flatten).take(1)
  28. df = b.map(flatten).to_dataframe()
  29. df.head()

bag支持链式计算,在一个管道中执行多个操作。与所有惰性 Dask 集合一样,我们需要调用compute来实际评估我们的结果。(take也是拉起计算的一种方式)

API文档:
API — Dask documentationicon-default.png?t=LBL2https://docs.dask.org/en/latest/bag-api.html# 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/526084
推荐阅读
相关标签
  

闽ICP备14008679号