赞
踩
- """
- dask: 多任务并行编程与任务调度
- 参考资料:
- https://docs.dask.org/en/latest/deploying-cli.html
- https://www.cnblogs.com/traditional/p/13759712.html
- 官网中文翻译版:
- https://www.heywhale.com/home/column/610a486caca2460017a1d410?from=zhihudsk
- """
-
- # 一、安装(已安装python3 直接跳入第3步,pip安装python库,如无法安装建议重装openssl,python3)
- # 1.安装OpenSSL
- yum install bzip2-devel
- yum install libffi-devel -y
- yum install -y zlib zlib-dev openssl-devel sqlite-devel bzip2-devel libffi libffi-devel gcc gcc-c++
-
- # cd到一个下载目录,这里为/opt,openssl压缩包会被下载到/opt目录下,可任意设置已存在的文件夹
- cd /opt
- wget http://www.openssl.org/source/openssl-1.1.1.tar.gz --no-check-certificate
-
- tar -zxvf openssl-1.1.1.tar.gz
-
- cd openssl-1.1.1
-
- # 编译安装,安装路径为/usr/local/openssl
- ./config --prefix=/usr/local/openssl shared zlib
-
- make && make install
-
- # 添加环境变量
- echo "export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/openssl/lib" >> /etc/profile
- # 刷新环境变量
- source /etc/profile
-
- # 增加软连接
- ln -s /usr/local/openssl/lib/libssl.so.1.1 /usr/lib64/libssl.so.1.1
- ln -s /usr/local/openssl/lib/libcrypto.so.1.1 /usr/lib64/libcrypto.so.1.1
-
- # 2.安装 python3
- # python官网下载python包传至/opt目录,并解压文件
- tar -zxvf Python-3.8.3.tgz
-
- cd Python-3.8.3
-
- # 编译python3,安装目录为/usr/local/python3
- ./configure --prefix=/usr/local/python3 --with-openssl=/usr/local/openssl --enable-shared
-
- make && make install
-
- # 添加环境变量
- echo "export PYTHON_PATH=$PYTHON_PATH:/usr/local/python3/bin" >> /etc/profile
- # 刷新环境变量
- source /etc/profile
-
-
- # 3.安装dask库
- pip install "dask[array]" # Install requirements for dask array
- pip install "dask[dataframe]" # Install requirements for dask dataframe
- pip install "dask[diagnostics]" # Install requirements for dask diagnostics
- pip install "dask[distributed]" # Install requirements for distributed dask
-
- # 也可以直接安装complete完整版
- pip install "dask[complete]"
-
- # 安装分布式库
- pip3 install distributed
-
- # 安装pyarrow库
- pip3 install pyarrow
-
-
- # 分布式dask的启动
- # 使用前提: 分布式的几台机器都安装了dask库,且以下运行无报错
- # Master主节点启动服务:IP替换为自己机器的IP,需要与SlavesIP在一个网段,启动后会出现一个IP地址
- # --host 指定IP, --port 指定端口
- dask-scheduler --host 192.168.0.100 --port 8786
- dask-scheduler --host 192.168.110.11 --port 8786
- dask-scheduler --host 133.160.190.142 --port 8786
-
- # 支节点启动服务,IP为上面启动后给出的IP
- dask-worker 192.168.0.100:8786
- dask-worker 192.168.110.11:8786
-
- # 可视化界面访问,浏览器访问一下地址,IP为主节点IP,端口为默认的8787
- http://192.168.0.100:8787
- http://192.168.110.11:8787
-
- # 5.dask的使用
- """
- 说明:
- 1.client IP为主节点的IP
- 2.csv来源为hdfs(Master为主节点映射的名称),由于是分布式执行,所以每个worker机器都需要获取到该文件
- 3.只有执行了compute操作之后,dask才会开始计算
- 计算的文件足够大,机器配置足够高才能体现出dask的优势
- 5.计算时,可以在web页面实时检测每个worker的执行情况
- """
- import datetime
-
- from dask import dataframe as dd
- from dask.distributed import Client
-
- if __name__ == '__main__':
- time1 =datetime.datetime.now()
- print("time1: %s" %time1)
- # 与dask主节点建立连接
- client = Client('192.168.0.100:8786', asynchronous=False)
- # 从hdfs读取文件
- df = dd.read_csv("hdfs://Master:9000/dask_data/123.csv")
- print("success: read csv success.")
- df = df.groupby(["id", "name"]).agg({
- "age": "sum", "year": "sum"
- }).reset_index()
- # 只有执行了compute操作后dask才会进行计算
- df.compute()
- print(df)
- client.close()
- time2 = datetime.datetime.now()
- print("time2: %s" %time2)
- print("use time:", time2 - time1)
-
- # 6.如何让dask的to_csv生成的是csv而不是.part结尾的文件
- 可以使用dd.to_csv()函数,该函数有一个名为compression的参数,默认为None。将其设置为“infer”,即可生成CSV文件而不是.part文件
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。