当前位置:   article > 正文

分布式大数据计算系统Dask分布式部署及使用_分布式部署 dask

分布式部署 dask
  1. """
  2. dask: 多任务并行编程与任务调度
  3. 参考资料:
  4. https://docs.dask.org/en/latest/deploying-cli.html
  5. https://www.cnblogs.com/traditional/p/13759712.html
  6. 官网中文翻译版:
  7. https://www.heywhale.com/home/column/610a486caca2460017a1d410?from=zhihudsk
  8. """
  9. # 一、安装(已安装python3 直接跳入第3步,pip安装python库,如无法安装建议重装openssl,python3)
  10. # 1.安装OpenSSL
  11. yum install bzip2-devel
  12. yum install libffi-devel -y
  13. yum install -y zlib zlib-dev openssl-devel sqlite-devel bzip2-devel libffi libffi-devel gcc gcc-c++
  14. # cd到一个下载目录,这里为/opt,openssl压缩包会被下载到/opt目录下,可任意设置已存在的文件夹
  15. cd /opt
  16. wget http://www.openssl.org/source/openssl-1.1.1.tar.gz --no-check-certificate
  17. tar -zxvf openssl-1.1.1.tar.gz
  18. cd openssl-1.1.1
  19. # 编译安装,安装路径为/usr/local/openssl
  20. ./config --prefix=/usr/local/openssl shared zlib
  21. make && make install
  22. # 添加环境变量
  23. echo "export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/openssl/lib" >> /etc/profile
  24. # 刷新环境变量
  25. source /etc/profile
  26. # 增加软连接
  27. ln -s /usr/local/openssl/lib/libssl.so.1.1 /usr/lib64/libssl.so.1.1
  28. ln -s /usr/local/openssl/lib/libcrypto.so.1.1 /usr/lib64/libcrypto.so.1.1
  29. # 2.安装 python3
  30. # python官网下载python包传至/opt目录,并解压文件
  31. tar -zxvf Python-3.8.3.tgz
  32. cd Python-3.8.3
  33. # 编译python3,安装目录为/usr/local/python3
  34. ./configure --prefix=/usr/local/python3 --with-openssl=/usr/local/openssl --enable-shared
  35. make && make install
  36. # 添加环境变量
  37. echo "export PYTHON_PATH=$PYTHON_PATH:/usr/local/python3/bin" >> /etc/profile
  38. # 刷新环境变量
  39. source /etc/profile
  40. # 3.安装dask库
  41. pip install "dask[array]" # Install requirements for dask array
  42. pip install "dask[dataframe]" # Install requirements for dask dataframe
  43. pip install "dask[diagnostics]" # Install requirements for dask diagnostics
  44. pip install "dask[distributed]" # Install requirements for distributed dask
  45. # 也可以直接安装complete完整版
  46. pip install "dask[complete]"
  47. # 安装分布式库
  48. pip3 install distributed
  49. # 安装pyarrow库
  50. pip3 install pyarrow
  51. # 分布式dask的启动
  52. # 使用前提: 分布式的几台机器都安装了dask库,且以下运行无报错
  53. # Master主节点启动服务:IP替换为自己机器的IP,需要与SlavesIP在一个网段,启动后会出现一个IP地址
  54. # --host 指定IP, --port 指定端口
  55. dask-scheduler --host 192.168.0.100 --port 8786
  56. dask-scheduler --host 192.168.110.11 --port 8786
  57. dask-scheduler --host 133.160.190.142 --port 8786
  58. # 支节点启动服务,IP为上面启动后给出的IP
  59. dask-worker 192.168.0.100:8786
  60. dask-worker 192.168.110.11:8786
  61. # 可视化界面访问,浏览器访问一下地址,IP为主节点IP,端口为默认的8787
  62. http://192.168.0.100:8787
  63. http://192.168.110.11:8787
  64. # 5.dask的使用
  65. """
  66. 说明:
  67. 1.client IP为主节点的IP
  68. 2.csv来源为hdfs(Master为主节点映射的名称),由于是分布式执行,所以每个worker机器都需要获取到该文件
  69. 3.只有执行了compute操作之后,dask才会开始计算
  70. 计算的文件足够大,机器配置足够高才能体现出dask的优势
  71. 5.计算时,可以在web页面实时检测每个worker的执行情况
  72. """
  73. import datetime
  74. from dask import dataframe as dd
  75. from dask.distributed import Client
  76. if __name__ == '__main__':
  77. time1 =datetime.datetime.now()
  78. print("time1: %s" %time1)
  79. # 与dask主节点建立连接
  80. client = Client('192.168.0.100:8786', asynchronous=False)
  81. # 从hdfs读取文件
  82. df = dd.read_csv("hdfs://Master:9000/dask_data/123.csv")
  83. print("success: read csv success.")
  84. df = df.groupby(["id", "name"]).agg({
  85. "age": "sum", "year": "sum"
  86. }).reset_index()
  87. # 只有执行了compute操作后dask才会进行计算
  88. df.compute()
  89. print(df)
  90. client.close()
  91. time2 = datetime.datetime.now()
  92. print("time2: %s" %time2)
  93. print("use time:", time2 - time1)
  94. # 6.如何让dask的to_csv生成的是csv而不是.part结尾的文件
  95. 可以使用dd.to_csv()函数,该函数有一个名为compression的参数,默认为None。将其设置为“infer”,即可生成CSV文件而不是.part文件

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/526102
推荐阅读
相关标签
  

闽ICP备14008679号