赞
踩
Dask是一个并行计算库,能在集群中进行分布式计算,能以一种更方便简洁的方式处理大数据量,与Spark这些大数据处理框架相比较,Dask更轻。Dask更侧重与其他框架,如:Numpy,Pandas,Scikit-learning相结合,从而使其能更加方便进行分布式并行计算。
Dask存在三种最基本的数据结构,分别是:Arrays、Dataframes以及Bags。
Dask中的Arrays(位于包dask.arrays下),其实就是对Numpy中的ndarray的部分接口进行了改进,从而方便处理大数据量。对于大数据集,特别是其大小大于内存时,如果我们要对其计算,按照传统的方式,,我们会将其全部塞进内存里,那么这就会报Out-Of-Memory错误,当然,我们也可以一次读取一部分数据,那么我们是否可以提前将大数据集进行分块处理了,我们只需要控制每块数据集不超过内存,从而满足In-Memory计算。
Dataframe是基于Pandas Dataframe改进的一个可以并行处理大数据量的数据结构,即使对大于内存的数据也是能够处理的(注意:dask.array并不能直接处理大于内存的处理,从其源码中可以看出从Numpy Array转为Dask Array时,首先需要将Numpy Array放入内存)。
对于Bags,其最主要的是用于半结构化的大数据集,比如日志或者博客等等。
Dask之所以能够高效的处理大数据量,在于其可进行分布式计算,这才是Dask的核心所在,Dask支持多种调度器,从单线程、多线程、多进程到本地分布式和集群分布式,各种调度器在不同情况下有不同的作用。
所有大型的Dask集合变量(例如Dask Array,Dask DataFrame和Dask Bag)以及细粒度的API(例如Delay和Future)都会生成任务图,其中图中的每个节点都是常规的Python函数,而节点之间的边缘是常规的Python对象,由一个任务创建为输出,并在另一任务中用作输入。 在Dask生成这些任务图之后,它需要在并行硬件上执行它们。这就是任务调度。Dask存在不同的任务调度,每个调度程序将使用一个任务图并计算得到相同的结果,但是它们的性能差别很大。
如何使用将Dask任务细分和调度,请自行到Dask官网了解吧。
地址:https://pypi.org/project/dask/
Dask最新的版本是2021.1.1,Anaconda集成开发环境默认已经包含该库,如果没有该安装包,使用以下命令即可安装:
#pip install dask
还可使用conda安装:
#conda install dask
Dask DataFrame mimics Pandas – documentation
import pandas as pd import dask.dataframe as dd
df = pd.read_csv('2015-01-01.csv') df = dd.read_csv('2015-*-*.csv')
df.groupby(df.user_id).value.mean() df.groupby(df.user_id).value.mean().compute()
Dask Array mimics NumPy – documentation
import numpy as np import dask.array as da
f = h5py.File('myfile.hdf5') f = h5py.File('myfile.hdf5')
x = np.array(f['/small-data']) x = da.from_array(f['/big-data'],
chunks=(1000, 1000))
x - x.mean(axis=1) x - x.mean(axis=1).compute()
Dask Bag mimics iterators, Toolz, and PySpark – documentation
import dask.bag as db
b = db.read_text('2015-*-*.json.gz').map(json.loads)
b.pluck('name').frequencies().topk(10, lambda pair: pair[1]).compute()
以上三个实例分别是Arrays、Dataframes以及Bags三大基础数据结构的基本使用。
Dask Delayed mimics for loops and wraps custom code – documentation
from dask import delayed
L = []
for fn in filenames: # Use for loops to build up computation
data = delayed(load)(fn) # Delay execution of function
L.append(delayed(process)(data)) # Build connections between variables
result = delayed(summarize)(L)
result.compute()
Delayed则是任务调度的核心模块。
The concurrent.futures interface provides general submission of custom tasks: - documentation
from dask.distributed import Client
client = Client('scheduler:port')
futures = []
for fn in filenames:
future = client.submit(load, fn)
futures.append(future)
summary = client.submit(summarize, futures)
summary.result()
concurrent.futures则是分布式处理的模块。
详细内容请参看dask官网。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。