赞
踩
Dask 包通常用于对非结构化或半结构化数据(如文本数据、日志文件、JSON 记录或用户定义的 Python 对象)的简单计算进行并行化。
Bag的优点:
Parallel:数据被拆分,允许多个内核或机器并行执行;
迭代:数据处理延迟,允许大于内存的数据平滑执行,即使在单个分区内的单个机器上;
默认情况下,dask.bag使用dask.multiprocessing的计算。作为一个好处,Dask 绕过GIL并在纯 Python 对象上使用多个内核。作为一个缺点,Dask Bag 在包含大量工作间通信的计算中表现不佳。对于常见的操作,这很少成为问题,因为大多数 Dask Bag 工作流程都是令人尴尬的并行,或者导致在工作人员之间几乎没有数据移动的情况下减少。
某些操作,例如groupby,需要大量的工作间通信。在单台机器上,Dask 使用partd执行高效、并行、溢出到磁盘的洗牌。在集群中工作时,Dask 使用基于任务的 shuffle。
这些 shuffle 操作代价高昂,并且由 dask.dataframe. 最好使用dask.bag清理和处理数据,然后将其转换为数组或 DataFrame,然后再开始需要 shuffle 步骤的更复杂的操作。
Bags 提供了非常通用的计算(任何 Python 函数)。这种普遍性是有代价的。袋子具有以下已知限制:
Bag是允许重复的无序集合的数学名称。它是multiset的友好同义词。包或多重集是集合概念的概括,与集合不同,它允许多重集元素的多个实例:
所以,bag 就像一个列表,但它不能保证元素之间的排序。可以有重复的元素,但你不能要求第 i 个元素。
读取json数据:
- import dask
- import json
- import os
-
- # 创建测试数据
-
- data=dask.datasets.make_people()
- # 这里可能需要再安装mimesis包
- # ! pip install mimesis
-
- #data.take(2)
-
- # 保存为json数据
- data.map(json.dumps).to_textfiles("data/test_*.json")
-
- # 读取json数据
-
- import dask.bag as db
-
- b=db.read_text("data/test_*.json").map(json.loads)
- b.take(1)
常用方法
- b.filter(lambda data:data["age"]<40).take(2)
- # 过滤年龄小于40的
-
- b.count().compute()
- # 统计记录数
-
- b.map(lambda data:data["occupation"]).take(1)
-
- # 链式计算
-
- # 获取最大年龄和最小的年龄
- b.map(lambda data:data["age"]).max().compute()
- b.map(lambda data:data["age"]).min().compute()
- # 获取年龄出现频次最多的5个
- b.map(lambda data:data["age"]).frequencies(sort=True).topk(5).compute()
-
- b=b.persist() # 保存到内存中
-
- # 转换和存储
- b.filter(lambda x:x["age"]>40).map(json.dumps).to_textfiles("data/processed.*.json")
-
-
- # 转换为dataframe
- def flatten(record):
- return {
- 'age': record['age'],
- 'occupation': record['occupation'],
- 'telephone': record['telephone'],
- 'credit-card-number': record['credit-card']['number'],
- 'credit-card-expiration': record['credit-card']['expiration-date'],
- 'name': ' '.join(record['name']),
- 'street-address': record['address']['address'],
- 'city': record['address']['city']
- }
-
- b.map(flatten).take(1)
-
- df = b.map(flatten).to_dataframe()
- df.head()
bag支持链式计算,在一个管道中执行多个操作。与所有惰性 Dask 集合一样,我们需要调用
compute
来实际评估我们的结果。(take也是拉起计算的一种方式)
API文档:
API — Dask documentationhttps://docs.dask.org/en/latest/bag-api.html#
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。