当前位置:   article > 正文

《Python开发 - Python库》Dask安装与使用(轻量级并行计算库)_离线安装dask库

离线安装dask库

Dask手册

1 Dask简介

Dask是一个并行计算库,能在集群中进行分布式计算,能以一种更方便简洁的方式处理大数据量,与Spark这些大数据处理框架相比较,Dask更轻。Dask更侧重与其他框架,如:Numpy,Pandas,Scikit-learning相结合,从而使其能更加方便进行分布式并行计算。

Dask存在三种最基本的数据结构,分别是:Arrays、Dataframes以及Bags
在这里插入图片描述

Dask中的Arrays(位于包dask.arrays下),其实就是对Numpy中的ndarray的部分接口进行了改进,从而方便处理大数据量。对于大数据集,特别是其大小大于内存时,如果我们要对其计算,按照传统的方式,,我们会将其全部塞进内存里,那么这就会报Out-Of-Memory错误,当然,我们也可以一次读取一部分数据,那么我们是否可以提前将大数据集进行分块处理了,我们只需要控制每块数据集不超过内存,从而满足In-Memory计算。

Dataframe是基于Pandas Dataframe改进的一个可以并行处理大数据量的数据结构,即使对大于内存的数据也是能够处理的(注意:dask.array并不能直接处理大于内存的处理,从其源码中可以看出从Numpy Array转为Dask Array时,首先需要将Numpy Array放入内存)。

对于Bags,其最主要的是用于半结构化的大数据集,比如日志或者博客等等。

Dask之所以能够高效的处理大数据量,在于其可进行分布式计算,这才是Dask的核心所在,Dask支持多种调度器,从单线程、多线程、多进程到本地分布式和集群分布式,各种调度器在不同情况下有不同的作用。

所有大型的Dask集合变量(例如Dask Array,Dask DataFrame和Dask Bag)以及细粒度的API(例如Delay和Future)都会生成任务图,其中图中的每个节点都是常规的Python函数,而节点之间的边缘是常规的Python对象,由一个任务创建为输出,并在另一任务中用作输入。 在Dask生成这些任务图之后,它需要在并行硬件上执行它们。这就是任务调度。Dask存在不同的任务调度,每个调度程序将使用一个任务图并计算得到相同的结果,但是它们的性能差别很大。

在这里插入图片描述

如何使用将Dask任务细分和调度,请自行到Dask官网了解吧。

2 Dask安装

地址:https://pypi.org/project/dask/

Dask最新的版本是2021.1.1,Anaconda集成开发环境默认已经包含该库,如果没有该安装包,使用以下命令即可安装:

#pip install dask
  • 1

还可使用conda安装:

#conda install dask
  • 1

3 Dask使用

Dask DataFrame mimics Pandas – documentation

import pandas as pd                     import dask.dataframe as dd
df = pd.read_csv('2015-01-01.csv')      df = dd.read_csv('2015-*-*.csv')
df.groupby(df.user_id).value.mean()     df.groupby(df.user_id).value.mean().compute()
  • 1
  • 2
  • 3

Dask Array mimics NumPy – documentation

import numpy as np                       import dask.array as da
f = h5py.File('myfile.hdf5')             f = h5py.File('myfile.hdf5')
x = np.array(f['/small-data'])           x = da.from_array(f['/big-data'],
                                                           chunks=(1000, 1000))
x - x.mean(axis=1)                       x - x.mean(axis=1).compute()
  • 1
  • 2
  • 3
  • 4
  • 5

Dask Bag mimics iterators, Toolz, and PySpark – documentation

import dask.bag as db
b = db.read_text('2015-*-*.json.gz').map(json.loads)
b.pluck('name').frequencies().topk(10, lambda pair: pair[1]).compute()
  • 1
  • 2
  • 3

以上三个实例分别是Arrays、Dataframes以及Bags三大基础数据结构的基本使用。

Dask Delayed mimics for loops and wraps custom code – documentation

from dask import delayed
L = []
for fn in filenames:                  # Use for loops to build up computation
    data = delayed(load)(fn)          # Delay execution of function
    L.append(delayed(process)(data))  # Build connections between variables

result = delayed(summarize)(L)
result.compute()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

Delayed则是任务调度的核心模块。

The concurrent.futures interface provides general submission of custom tasks: - documentation

from dask.distributed import Client
client = Client('scheduler:port')

futures = []
for fn in filenames:
    future = client.submit(load, fn)
    futures.append(future)

summary = client.submit(summarize, futures)
summary.result()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

concurrent.futures则是分布式处理的模块。

详细内容请参看dask官网。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/526108
推荐阅读
相关标签
  

闽ICP备14008679号