赞
踩
- spark_Document: http://spark.apache.org/docs/latest/index.html
- spark_API: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.package
- 测试数据集movielens https://grouplens.org/datasets/movielens/
- hadoop集群搭建 http://hadoop.apache.org/docs/r1.0.4/cn/cluster_setup.html
- JARs http://www.java2s.com
- ubuntu更新源 https://www.linuxidc.com/Linux/2017-11/148627.html
- unknown host: ---> sudo gedit /etc/resolv.conf ---> nameserver 8.8.8.8
- --------------------------------------------------D1 linux_enviromrnt-----------------------------------------
- Linux查看进程 ps -ef / ps -aux -> ps -ef|grep java -> kill -9 1827
- 查看第3-10行 cat filename | tail -n +3 | head -n +10
-
- #其他操作
- sudo apt-get update 更新源
- sudo apt-get install package 安装包
- sudo apt-get remove package 删除包
- sudo apt-cache search package 搜索软件包
- sudo apt-cache show package 获取包的相关信息,如说明、大小、版本等
- sudo apt-get install package --reinstall 重新安装包
- sudo apt-get -f install 修复安装
- sudo apt-get remove package --purge 删除包,包括配置文件等
- sudo apt-get build-dep package 安装相关的编译环境
- sudo apt-get upgrade 更新已安装的包
- sudo apt-get dist-upgrade 升级系统
- sudo apt-cache depends package 了解使用该包依赖那些包
- sudo apt-cache rdepends package 查看该包被哪些包依赖
- sudo apt-get source package 下载该包的源代码
- sudo apt-get clean && sudo apt-get autoclean 清理无用的包
- sudo apt-get check 检查是否有损坏的依赖
- --------------------------------------------------------------------------------------------------------------
-
-
- -----------------------------------------THE BOOK OF <HADOOP WITH SPARK>-----------------------------------------
- 书中范例下载:
- http://pan.baidu.com.cn/s/1qYMtjNQ
- http://pan.baidu.com.cn/hadoopsparkbook
- ----------------------------------------------------------CP1 Info of big data&ML p1-8
- ----------------------------------------------------------CP2 VirtualBox p11-18
- 1.virtualbox5.2.34下载 https://www.virtualbox.org/wiki/Download_Old_Builds_5_2
- ---> Next ~> ~> ~> Yes ~> Install ~> Finish
- ---> Set languages (File ~> preferences ~> language
- ---> Set Restore_File ( 管理 ~> 全局设定 ~> VRDP认证库 <其他> ~> 默认虚拟电脑位置:)
- ---> Build a Vm
- ~> next ~> 4096M ~> ~> VDI ~> D ~> 80G ~>
- 2.ubuntu18.04 https://ubuntu.com/download/desktop
- ubuntu18.04很卡解决方案 sudo apt install gnome-session-flashback
- https://jingyan.baidu.com/article/37bce2bea3c07f1002f3a22a.html
- #更新linux sudo apt-get update
- 3.安装:mysql http://www.cnblogs.com/jpfss/p/7944622.html
- --安装mysql: sudo apt-get install mysql-server
- ->获取mysql用户名密码文件: sudo gedit /etc/mysql/debian.cnf
- ->登录mysql: mysql -u用户名 -p密码
- ->修改mysql密码:
- ->配置快捷命令
- sudo gedit ~/.bashrc
- alias mysql='mysql -u debian-sys-maint -pAQeZFkTb0y5EECNU'
- source ~/.bashrc
- ->修改mysql在不支持中文 https://www.cnblogs.com/guodao/p/9702465.html
- #启动、关闭服务和查看运行状态
- sudo service mysql start
- sudo service mysql stop
- sudo service mysql status
- 删除 mysql https://blog.csdn.net/iehadoop/article/details/82961264
- 查看MySQL的依赖项: dpkg --list|grep mysql
- sudo apt-get remove mysql-common
- sudo apt-get autoremove --purge mysql-server-5.7
- 清除残留数据: dpkg -l|grep ^rc|awk '{print$2}'|sudo xargs dpkg -P
- pkg --list|grep mysql
- 继续删除剩余依赖项,如:sudo apt-get autoremove --purge mysql-apt-config
- 4.挂载文件夹
- vbox设置 -> 共享文件夹+ -> OK
- sudo mount -t vboxsf BZ /home/zieox/Desktop/BZ
-
- 5.安装anaconda https://blog.csdn.net/ksws0292756/article/details/79143460
- 启动anaconda.SH: sudo bash Anaconda3-2018.12-Linux-x86_64.sh
- 启动anaconda.navigator: anaconda-navigator
- 6.安装java8/step12INSTEAD https://blog.csdn.net/claire017/article/details/80953632
- 7.安装IDEA https://www.cnblogs.com/gzu-link-pyu/p/8263312.html
- 解压: sudo tar -zxvf ideaIC-2018.3.5-no-jdk.tar.gz -C /opt
- 8.安装PyCharm
-
-
-
- 9.安装scala-plugs https://www.cnblogs.com/starwater/p/6766831.html
- scala_SDK:
- 1. 右键项目名称找到Open Module Settings
- 2. 左侧Project Settings目录中点击Libraries
- 3. 点击+new Project Library选择Scala SDK
- 4. 添加下载好的jar文件夹
-
- 10.安装scala_shell https://blog.csdn.net/wangkai_123456/article/details/53669094
- 升级高版本scala: sudo apt-get remove scala
- 解压: tar zxvf scala-2.12.8.tgz
- 移动: mv scala /usr/local/scala
- Source /etc/profile https://blog.csdn.net/qq_35571554/article/details/82850563
- 卸载: sudo apt-get --purge remove polipo
- 11.maven https://baijiahao.baidu.com/s?id=1612907927393262341&wfr=spider&for=pc
- tar -xvf apache-maven-3.6.0-bin.tar.gz
- sudo mv apache-maven-3.6.0 /usr/local
- 下载地址: http://maven.apache.org/download.cgi
- 12.关于maven.pom.xml库 https://mvnrepository.com/
- 13.linux下安装gradle https://blog.csdn.net/yzpbright/article/details/53359855
- 14.卸载open-jdk sudo apt-get remove openjdk*
- 15.安装sunJDK12 https://blog.csdn.net/smile_from_2015/article/details/80056297
- tar -zxvf jdk-12_linux-x64_bin.tar.gz
- cd /usr/lib
- sudo mkdir jdk
- sudo mv jdk-12 /usr/lib/jdk
- sudo gedit /etc/profile
- ----------------------------------------------------
- export JAVA_HOME=/usr/lib/jdk/jdk-12
- export JRE_HOME=${JAVA_HOME}/jre
- export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
- export PATH=${JAVA_HOME}/bin:$PATH
- ----------------------------------------------------
- source /etc/profile
- sudo update-alternatives --install /usr/bin/java java /usr/lib/jdk/jdk-8/bin/java 300
- sudo update-alternatives --install /usr/bin/javac javac /usr/lib/jdk/jdk-8/bin/javac 300
-
- 16.下载maven wget http://mirror.bit.edu.cn/apache/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz
- tar -zxvf apache-maven-3.6.3-bin.tar.gz
- rm apache-maven-3.6.3-bin.tar.gz
- sudo gedit /etc/profile
- ------------------------
- export M2_HOME=/usr/local/apache-maven-3.6.1
- export PATH=${M2_HOME}/bin:$PATH
- ------------------------
- source /etc/profile
- 17.创建scala 项目
- new project -> maven (create from more archetype) ->scala-archetype-simple
- 配置maven镜像: https://blog.csdn.net/qq_39929929/article/details/103753905
-
- ----------------------------------------------------------CP3 Ubuntu Linux P23-43
- //Set Virtual_File setting ~> restore ~> Controler:IDE (no cd) ~> choose (open) ~>
- //Install Ubuntu -> launch ~~~> (clear CD & install Ubuntu) Install --->Launch Ubuntu
- //Install Plug-in P34
- //Set Default Typewriting system setting -> "text imput" -> "+" ->
- //Set terminal : (win)->ter-> drag->
- //Set terminal color : terminal (Configuration file preferences)->
- //share clipboard (equipment) ~> share_clipboard
-
-
- ----------------------------------------------------------CP4 The Installing Of Hadoop Single Node Cluster
- //4.1 install JDK
- 1.ctrl+alt+t
- 2.java-version
- 3.sudo apt-get update
- 4.sudo apt-get install default-jdk
- 5.update-alternatives --display java
-
- //4.2 Set SSH logging without code
- 1. install SSH : sudo apt-get install ssh
- 2. install rsync sudo apt-get install rsync
- 3.//产生秘匙 produce SSH Key: ssh-keygen -t dsa -P '' -f ~/.ssh/id.dsa
- 4.//查看秘匙 see the file of SSH Key: ll ~/.ssh
- 5.//将产生的key放入许可证文件 : cat ~/.ssh/id.dsa.pub >> ~/.ssh/authorized_keys
-
- //4.3 Download Hadoop 下载hadoop
- ---> https://archive.apache.org/dist/hadoop/common/
- ---> Index of /dist/hadoop/common/hadoop-2.6.0
-
- --->Copy Link Location
- ===> wget "~~~coped~~~"
- //使用weget下载hadoop2.6 wget https://archive.apache.org/dist/hadoop/common/hadoop-2.6.0/hadoop-2.6.0.tar.gz
- //解压 sudo tar -zxvf hadoop-2.6.0.tar.gz
- //移动文件 sudo mv hadoop-2.6.0 /usr/local/hadoop
- --->see & comfirm ll /usr/local/hadoop
-
- ============================================================================================================
- bin/各项运行文件
- sbin/各项shell运行文件
- etc/etc/hadoop 子目录包含Hadoop配置文件,例 hadoop -env.sh,core-site.xml,yarn-site.xml,mapred-site.xml,hdfs.site.xml
- lib/Hadoop函数库
- logs/系统日志,可以查看运行状况,运行有问题可以线哦嗯日志找出错误的原因
- ============================================================================================================
- //4.4 Set Hadoop variance 设置hadoop环境变量
- --->Edit bash sudo gedit ~/.bashrc
- ---path of jdk which java -> ls -l /usr/bin/java -> ls -l /etc/alternatives/java ->
- --->/usr/lib/jvm/java-8-oracle
- --->Set JDK
- --------------------------------------------------------------------
- export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
- export HADOOP_HOME=/usr/local/hadoop
- export PATH=$PATH:$HADOOP_HOME/bin
- export PATH=$PATH:$HADOOP_HOME/sbin
- export HADOOP_MAPRED_HOME=$HADOOP_HOME
- export HADOOP_COMMON_HOME=$HADOOP_HOME
- export HADOOP_HDFS_HOME=$HADOOP_HOME
- export YARN_HOME=$HADOOP_HOME
- export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
- export HADOOP_OPTS="-DJava.library.path=$HADOOP_HOME/lib"
- export JAVA_LIBRARY_PATH=$HADOOP_HOME/lib/native:$JAVA_LIBRARY_PATH
- --------------------------------------------------------------------
- --->Set HADOOP_HOME hadoop usr/local/hadoop export HADOOP_HOME=/usr/local/hadoop
- --->Set PATH
- export PATH=$PATH:$HADOOP_HOME/bin
- export PATH=$PATH:$HADOOP_HOME/sbin
- --->set Hadoop-env
- export HADOOP_MAPRED_HOME=$HADOOP_HOME
- export HADOOP_COMMON_HOME=$HADOOP_HOME
- export HADOOP_HDFS_HOME=$HADOOP_HOME
- export YARN_HOME=$HADOOP_HOME
- --->lib_link
- export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
- export HADOOP_OPTS="-DJava.library.path=$HADOOP_HOME/lib"
- export JAVA_LIBRARY_PATH=$HADOOP_HOME/lib/native:$JAVA_LIBRARY_PATH
- --->Let bash work. source ~/.bashrc
-
- //4.5 modify Hadoop Setting_files
- --->修改hadoop-env.sh配置文件: sudo gedit /usr/local/hadoop/etc/hadoop/hadoop-env.sh
- //change 'export JAVA_HOME=${JAVA_HOME}' to export JAVA_HOME=/usr/lib/jvm/java-8-oracle
- --->修改core-site.xml配置文件 sudo gedit /usr/local/hadoop/etc/hadoop/core-site.xml
- #set Default-name of HDFS
- -------------------------------------
- <property>
- <name>fs.default.name</name>
- <value>hdfs://localhost:9000</value>
- <property>
- -------------------------------------
- --->修改 yarn-site.xml配置文件 sudo gedit /usr/local/hadoop/etc/hadoop/yarn-site.xml
- -------------------------------------------------------------------
- <property>
- <name>yarn.nodemanager.aux-services</name>
- <value>mapreduce_shuffle</value>
- </property>
- <property>
- <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
- <value>org.apache.hadoop.mapred.ShuffleHandler</value>
- </property>
- -------------------------------------------------------------------
- --->修改 mapred-site.xml配置文件
- #copy template-file (复制模板文件)
- sudo cp /usr/local/hadoop/etc/hadoop/mapred-site.xml.template /usr/local/hadoop/etc/hadoop/mapred-site.xml
- --->编辑 mapred-site.xml配置文件 sudo gedit /usr/local/hadoop/etc/hadoop/mapred-site.xml
- --------------------------------------
- <property>
- <name>mapreduce.framework.name</name>
- <value>yarn</value>
- </property>
- --------------------------------------
- --->编辑 hdfs-site.xml配置文件 sudo gedit /usr/local/hadoop/etc/hadoop/hdfs-site.xml
- 10.0.2.15
- ----------------------------------------------------------------------------
- <property>
- <name>dfs.replication</name>
- <value>3</value>
- </property>
- <property>
- <name>dfs.namenode.name.dir</name>
- <value> file:/usr/local/hadoop/hadoop_data/hdfs/namenode</value>
- </property>
- <property>
- <name>dfs.datanode.name.dir</name>
- <value> file:/usr/local/hadoop/hadoop_data/hdfs/datanode</value>
- </property>
- ----------------------------------------------------------------------------
- //launch without namenode https://www.cnblogs.com/lishpei/p/6136043.html
- //4.6 Creating & Formatting HDFS_dir 创建并格式化hdfs目录
- ---> 创建NameNode数据存储目录 sudo mkdir -p /usr/local/hadoop/hadoop_data/hdfs/namenode
- ---> 创建DataNode数据存储目录 sudo mkdir -p /usr/local/hadoop/hadoop_data/hdfs/datanode
- ---> 将Hadoop目录所有者更改为 hduser sudo chown zieox:zieox -R /usr/local/hadoop
- --->格式化hdfs hadoop namenode -format
- ---# 启动 NameNode sudo ./start-yarn.sh
- ---# 启动 NameNode sudo ./start-dfs-sh
- --->显示错误日志 cd /usr/local/hadoop/logs/
- --->启动 name_node ./yarn-daemon.sh start namenode
- //查看本机ip ifconfig cd /usr/local/hadoop/ lsof -i :50070
- //打开hadoop resource manager web 界面: localhost:8808
- //打开NameNode HDFS Web界面 http://172.28.30.12:50070
-
- -------------------------------r-c-------------------------------
- 1469 gedit hadoop-lufax-datanode-lufax.log
- 1470 mkdir -r /home/user/hadoop_tmp/dfs/data
- 1471 mkdir -p /home/user/hadoop_tmp/dfs/data
- 1472 sudo mkdir -p /home/user/hadoop_tmp/dfs/data
- 1473 sudo chown lufax:lufax /home/user/hadoop_tmp/dfs/data
- 1474 ./hadoop-daemon.sh start datanode
- 1475 cd ../
- 1476 ./sbin/hadoop-daemon.sh start datanode
- 1477 jps
- 1478 hadoop fs -ls /
- 1479 hadoop fs -put README.txt /
- 1480 history 20
-
- -----------------------------------------------------------------
-
- ----------------------------------------------------------CP5 Installing:Hadoop Multi Node Cluster
- //1. 复制虚拟机
-
- //设置data1虚拟机
- //编辑interfaces网络配置文件: sudo gedit /etc/network/interfaces
- ------------------------------------------
- #NAT interface //网卡1
- auto auth0
- iface eth0 inet dhcp
-
- #host only interface //网卡2
- auto eth0
- iface eth1 inet static
- address 192.168.56.101
- netmask 255.255.255.0
- network 192.168.56.0
- broadcast 192.168.56.255
- ------------------------------------------
-
- //设置hostname sudo gedit /etc/hostname ---> data1
- //设置hosts文件 sudo gedit /etc/hosts
- //加入主机ip地址
- ------------------------------------------
- 192.168.56.100 master
- 192.168.56.101 data1
- 192.168.56.102 data2
- 192.168.56.103 data3
- ------------------------------------------
- //修改localhost为master sudo gedit /usr/local/hadoop/etc/hadoop/core-site.xml
- sudo gedit /usr/local/hadoop/etc/hadoop/yarn-site.xml
-
- //设置resourcemanager主机与nodemanager的连接地址
- <property>
- <name>yarn.resourcemanager.scheduler.address</name>
- <value>master:8025</value>
- </property>
-
- //设置resourcemanager与applicationmaster的连接地址
- <property>
- <name>yarn.resourcemanager.address</name>
- <value>master:8030</value>
- </property>
-
- //设置resourcemanager与客户端的连接地址
- <property>
- <name>yarn.resourcemanager.address</name>
- <value>master:8050</value>
- </property>
-
- //查看YARN架构图 http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html
-
- //设置mapred-site.xml sudo gedit /usr/local/hadoop/etc/hadoop/mapred-site.xml
- ------------------------------------------
- <property>
- <name>mapred.job.tracker</name>
- <value>master:54311</value>
- </property>
- ------------------------------------------
-
- //设置hdfs-site.xml sudo gedit /usr/local/hadoop/etc/hadoop/hdfs-site.xml
- ------------------------------------------
- <property>
- <name>dfs.datanode.data.dir</name>
- <value> file:/usr/local/hadoop/hadoop_data/hdfs/datanode</value>
- </property>
- ------------------------------------------
- restart
- //确认网络设置 ifconfig
-
- //设置master节点
- sudo gedit /usr/local/hadoop/etc/hadoop/hdfs-site.xml
- -----------------------------------------------------------------
- <property>
- <name>dfs.namenode.name.dir</name>
- <value> file:/usr/local/hadoop/hadoop_data/hdfs/namenode</value>
- </property>
- -----------------------------------------------------------------
-
- //编辑masters文件 sudo gedit /usr/local/hadoop/etc/hadoop/masters
- //编辑slaves文件 sudo gedit /usr/local/hadoop/etc/hadoop/slaves
-
- //master 连接到data1虚拟机 ssh data1
- //连接到data1创建HDFS相关目录
- --->删除hdfs所有目录 sudo rm -rf /usr/local/hadoop/hadoop_data/hdfs
- --->创建DataNode存储目录 mkdir -p /usr/local/hadoop/hadoop_data/hdfs/datanode
- --->将目录所有者改成zieox sudo chown -R zieox:zieox /usr/local/hadoop
- //中断data1连接回到master exit
-
- //master 连接到data2虚拟机 ssh data2
- --->删除hdfs所有目录 sudo rm -rf /usr/local/hadoop/hadoop_data/hdfs
- --->创建DataNode存储目录 mkdir -p /usr/local/hadoop/hadoop_data/hdfs/datanode
- --->将目录所有者改成zieox sudo chown -R zieox:zieox /usr/local/hadoop
- --->中断data2连接回到master exit
-
- //master 连接到data2虚拟机 ssh data3
- --->删除hdfs所有目录 sudo rm -rf /usr/local/hadoop/hadoop_data/hdfs
- --->创建DataNode存储目录 mkdir -p /usr/local/hadoop/hadoop_data/hdfs/datanode
- --->将目录所有者改成zieox sudo chown -R zieox:zieox /usr/local/hadoop
- --->中断data3连接回到master exit
-
- //创建并格式化NameNode HDFS目录
- --->删除之前的hdfs目录 sudo rm -rf /usr/local/hadoop/hadoop_data/hdfs
- --->创建DataNode存储目录 mkdir -p /usr/local/hadoop/hadoop_data/hdfs/datanode
- --->将目录所有者改成zieox sudo chown -R zieox:zieox /usr/local/hadoop
- --->格式化NameNode HDFS目录 hadoop namenode -format
-
- //启动Hadoop Multi Node Cluster
- --->分别启动HDFS & YARN start-dfs.sh start-yarn.sh start-all.sh
-
- ssh免密操作
- https://www.cnblogs.com/robert-blue/p/4133467.html
- //编辑固定IP地址
- scp id_rsa.pub zieox@master:/home/zieox
- cat id.dsa.pub >>~/.ssh/authorized_keys
- service sshd restart
- cd .ssh
- scp authorized_keys zieox@data1:/home/zieox
- scp authorized_keys zieox@data2:/home/zieox
- scp authorized_keys zieox@data3:/home/zieox
- http://master:8088/cluster
- sudo chown zieox:zieox *
- //Datanode启动不了 datanode的clusterID 和 namenode的clusterID 不匹配(master中把namenode的cluterID改的和datanodeID一致)
- https://www.cnblogs.com/artistdata/p/8410429.html
- sudo gedit /usr/local/hadoop/hadoop_data/hdfs/datanode/current/VERSION
- clusterID=CID-834686bf-02bf-4933-b2fb-0a2288e97cc9
- sudo gedit /usr/local/hadoop/hadoop_data/hdfs/namenode/current/VERSION
-
-
- ----------------------------------------------------------CP6 Hadoop HDFS Order
- //hdfs 常用命令
- http://hadoop.apache.org/docs/r1.0.4/cn/hdfs_shell.html
- -----------------------------------------HDFS基本命令-----------------------------------------
- hadoop fs-mkdir 创建hdfs目录
- hadoop fs-ls 列出hdfs目录
- hadoop fs-copyFromLocal 使用(copyFromLocal)复制本地文件到hdfs
- hadoop fs-put 使用(put)复制本地文件到hdfs
- hadoop fs-cat 列出hdfs目录下的文件内容
- hadoop fs-copyToLocal 使用(-copyToLocal)将hdfs上的文件复制到本地
- hadoop fs-get 使用(-get)将hdfs上的文件复制到本地
- hadoop fs-cp 复制hdfs文件
- hadoop fs-rm 删除hdfs文件
- ----------------------------------------------------------------------------------------------
- //创建hadoop目录
- hadoop fs -mkdir /user
- hadoop fs -mkdir /user/hduser
- hadoop fs -mkdir /user/hduser/test
- //hadoop: ls (list)
- hadoop fs -ls
- hadoop fs -ls /
- hadoop fs -ls /user
- hadoop fs -ls /user/hduser
- hadoop fs -ls -R /
- //创建多级目录 hadoop fs -mkdir -p /dir1/dir2/dir3
- //查看全部文件 hadoop fs -ls -R /
-
- ---------------------------------------------Output---------------------------------------------
- -rw-r--r-- 3 lufax supergroup 1366 2018-09-12 23:57 /README.txt
- drwxr-xr-x - lufax supergroup 0 2018-09-20 03:31 /dir1
- drwxr-xr-x - lufax supergroup 0 2018-09-20 03:31 /dir1/dir2
- drwxr-xr-x - lufax supergroup 0 2018-09-20 03:31 /dir1/dir2/dir3
- drwxr-xr-x - lufax supergroup 0 2018-09-20 03:23 /user
- drwxr-xr-x - lufax supergroup 0 2018-09-20 03:23 /user/hduser
- drwxr-xr-x - lufax supergroup 0 2018-09-20 03:23 /user/hduser/test
- -----------------------------------------------------------------------------------------------
- //上传文件至HDFS (copyFromLocal)
- hadoop fs -copyFromLocal /home/zieox/桌面/test1.txt /user/hduser/test
- hadoop fs -copyFromLocal /usr/local/hadoop/README.txt /user/hduser/test
- hadoop fs -copyFromLocal /user/local/hadoop/README.txt /user/hduser/test/test1.txt
-
- //打开查看文件 hadoop fs -cat /user/hduser/test/test1.txt |more
- //强制复制 hadoop fs -copyFromLocal -f /usr/local/hadoop/README.txt /user/hduser/test
- //查看目录 hadoop fs -ls /user/hduser/test
- // put:使用-put会直接覆盖文件 hadoop fs -put /usr/local/hadoop/README.txt /user/hduser/test/test1.txt
- //将屏幕上内容存储到HDFS文件 echo abc| hadoop fs -put - /user/hduser/test/echoin.txt
- //查看文件 hadoop fs -cat /user/hduser/test/echoin.txt
-
- //将本地目录的列表,存储到HDFS文件 ls /usr/local/hadoop | hadoop fs -put - /user/hduser/test/hadooplist.txt
-
- //将hdfs文件拷贝至本地 (copyToLocal)
- mkdir /home/zieox/桌面/test ~> cd /home/zieox/桌面/test ~> hadoop fs -copyToLocal /user/hduser/test/hadooplist.txt
-
- //将整个HDFS上的目录复制到本地计算机: hadoop fs -copyToLocal /user/hduser/test/ect
- //将HDFS上的文件复制到本地计算机 (get): hadoop fs -get /user/hduser/test/README.txt localREADME.txt
- ///复制与删除HDFS文件
- //在HDFS上创建测试目录 hadoop fs -mkdir /user/hduser/test/tmp
- //复制HDFS文件到HDFS测试目录 hadoop fs -cp /user/hduser/test/README.txt /user/hduser/test/tmp
- //查看测试目录 hadoop fs -ls /user/hduser/test/tmp
- //删除文件 hadoop fs -rm /user/hduser/test/test2.txt
- //删除文件目录 hadoop fs -rm -R /user/hduser/test
- ###///在Hadoop HDFS Web - Browse Directory http://master:50070
-
-
- ----------------------------------------------------------CP7 Hadoop MapReduce
- http://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html
- //创建wordcount目录 mkdir -p ~/wordcount/input
- //进入wordcount文件夹 cd ~/wordcount
- //编辑java脚本 sudo gedit WordCount.Java
- //设置路径 sudo gedit ~/.bashrc
- --------------------------------------------------
- export PATH=${JAVA_HOME}/bin:${path}
- export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar
- --------------------------------------------------
- //生效bashrc source ~/.bashrc
- ***
- hadoop com.sun.tools.Javac.Main WordCount.Java
- jar wc.jar WordCount*.class
-
-
- //创建测试文本文件 cp /usr/local/hadoop/LICENSE.txt ~/wordcount/input
- ll ~/wordcount/input
- //hdfs操作
- hadoop fs -mkdir -p /user/hduser/wordcount/input
- hadoop fs -ls /user/hduser/wordcount/input
- cd ~/wordcount
- //运行WordCount程序
- hadoop jar wc.jar WordCount /user/hduser/wordcount/input/LICENSE.txt /user/hduser/wordcount/output
-
- //查看运行结果
- hadoop fs -ls /user/hduser/wordcount/output
- hadoop fs -cat /user/hduser/wordcount/output/part-r-00000|more
-
-
- ----------------------------------------------------------CP8 Installing Spark
-
- //启动hadoop服务
- 启动服务的脚本全部在 ./sbin 目录下
- start-all.sh 可以启动全部的服务,不建议!这样做!
- start-dfs.sh 启动DFS中的namenode 和 datanode 全部启动,不建议!这样做!
- //启动namenode:/usr/local/hadoop/sbin/hadoop-daemon.sh start namenode
- //启动datenode:/usr/local/hadoop/sbin/hadoop-daemon.sh start datanode
- //查看是否有datanode 的进程jps 然后就可以通过Web浏览器查看dfs了!http://localhost:50070
- //启动Yarn /usr/local/hadoop/sbin/start-yarn.sh
- -------------------------------------------------------------------------------------------------------------------
- 测试yarn
- /usr/local/hadoop/bin/hadoop jar /usr/local/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar pi 2 100
- //启动hdfs/usr/local/hadoop/sbin/start-dfs.sh
- bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.2.jar pi 2 100
- ssh-copy-id -i /root/.ssh/id_rsa.pub root@<lufax>
- -------------------------------------------------------------------------------------------------------------------
- //Install spark wget https://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.6.tgz
- cd /home/lufax/下载
- tar -zxvf spark-2.3.1-bin-hadoop2.6.tgz -C /opt/spark/
- tar xzvf spark-2.3.1-bin-hadoop2.6.tgz
- sudo mv spark-2.3.1-bin-hadoop2.6 /usr/local/spark/
- gedit ~/.bashrc
- -----------------------------------
- export SPARK_HOME=/usr/local/spark
- export PATH=$PATH:$SPARK_HOME/bin
- -----------------------------------
- source ~/.bashrc
- spark-shell
- https://master:50070
-
- //设置spark-shell
- cd /usr/local/spark/conf
- cp log4j.properties.template log4j.properties
- sudo gedit log4j.properties
- -----------------set-----------------
- log4j.rootCategory=WARN
- -------------------------------------
- //本地运行spark
- //本地启动(3线程) spark-shell --master local[3]
- //读取本地文件 val tf = sctextFile("hdfs//master:9000/user/hduser/test/tes2.txt")
- //Spark读取与写入文件 https://blog.csdn.net/a294233897/article/details/80904305
- //在yarn上运行spark-shell
- SPARK_JAR=` /usr/local/spark/lib/jars/*.jar `
- HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
- MASTER=yarn-client
- /usr/local/spark/bin/spark-shell
- //用spark读取HDFS文件
- val tf = sc.textFile("hdfs://master:9000/user/hduser/test/README.txt")
- scala> tf.count --->res0: Long = 31
- http://172.22.230.18/
- //8.9 构建spark standalone cluster环境
- //复制模板文件来创建spark-env.sh
- cp /usr/local/spark/conf/spark-env.sh.template spark-env.sh
- sudo gedit spark-env.sh
- ---------------------------------------------------------------------------------------------
- export SPARK_MASTER_IP=master 设置master的IP或服务器名称
- export SPARK_WORKER_CORES=1 设置每个Worker使用的CPU核心
- export SPARK_WORKER_MEMORY=800m 设置每个Worker使用内存
- export SPARK_WORKER_INSTANCES=2 设置每个Worker实例
- ---------------------------------------------------------------------------------------------
- ssh data1
- sudo mkdir /usr/local/spark
- sudo chown -R zieox:zieox /usr/local/spark
- sudo scp -r /usr/local/spark zieox@data:/usr/local
- ssh data3
- ...
- sudo gedit /usr/local/spark/conf/slaves
- ------------
- data1
- data3
- ------------
- //启动:spark standalone cluster(启动works) /usr/local/spark/sbin/start-all.sh
- datanode=3
- SPARK_WORKER_INSTANCES=2
- worker=6
- //分别启动master&slaves
- //启动spark_master /usr/local/spark/sbin/start-master.sh
- //启动spark_slaves /usr/local/spark/sbin/start-slaves.sh
- //在Spark Standalone运行spark-shell程序 spark-shell --master spark://master:7077
- //查看Spark Standalone 的 Web_UI界面 http://master:8080/
- //读取本地文件
- val tf = spark.read.textFile("file:/home/zieox/桌面/tes2.txt")
- val tf = sc.textFile("file:/user/hduser/test/README.txt")
- //读取hdfs文件(确保works完全启动) val tf = spark.read.textFile("hdfs://master:9000/user/hduser/test/README.txt")
- --->scala> tf.count res0: Long = 31
- //停止Spark Stand alone cluster /usr/local/spark/sbin/stop-all.sh
- ----------------------------------------------------------CP9 Spark RDD
- spark-shell
- scala> val intRDD = sc.parallelize(List(1,2,3,1,3,2,3,4))
- --->intRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
- scala> intRDD.collect() res0: Array[Int] = Array(1, 2, 3, 1, 3, 2, 3, 4)
- scala> val stringRDD=sc.parallelize(List("zieox","luccy","fucker"))
- --->stringRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:24
- scala> stringRDD.collect() res1: Array[String] = Array(zieox, luccy, fucker)
- //map
- def addone(x:Int):Int = {return (x+1)}
- intRDD.map(addone).collect()
- intRDD.map(x=>x+1).collect()
- intRDD.map(_+1).collect()
- scala> stringRDD.map(x=>"name: "+x).collect --->res9: Array[String] = Array(name: zieox, name: luccy, name: fucker)
- //filter
- intRDD.filter(_<3).collect()
- scala> stringRDD.filter(x=>x.contains("eo")).collect() --->res10: Array[String] = Array(zieox)
- //distinct
- scala> intRDD.distinct().collect --->res11: Array[Int] = Array(4, 1, 2, 3)
- //randomSplit
- //以随机数的方式按照4:6的比例分割为两个RDD val sRDD=intRDD.randomSplit(Array(0.4,0.6))
- //groupBy
- val gDD = intRDD.groupBy(x=>{if (x%2==0) "even" else "odd"}).collect
- //9.3多个RDD“转换”运算
- val r1=sc.parallelize(List(1,2))
- val r2=sc.parallelize(List(3,4,5))
- val r3=sc.parallelize(List(1,6,7))
- //union (并集运算) r1.union(r2).union(r3).collect() & (r1++r2++r3).collect()
- //intersection (交集运算) r1.intersection(r3).collect
- //subtract (差集运算) r1.subtract(r2).collect() //在r1,不再r2
- //cartesian (笛卡尔积运算) r1.cartesian(r2).collect()
- //9.4 基本动作运算
- val r=sc.parallelize(List(1,2,3,4,5))
- scala> r.first --->res30: Int = 1
- scala> r.take(2) --->res32: Array[Int] = Array(1, 2)
- scala> r.takeOrdered(3) --->res33: Array[Int] = Array(1, 2, 3)
- scala> r.takeOrdered(3)(Ordering[Int].reverse) --->res34: Array[Int] = Array(5, 4, 3)
- --------基本统计--------
- r.stats 统计
- r.min 最小值
- r.max 最大值
- r.stdev 标准差
- r.count 计数
- r.sum 求和
- r.mean 平均
- ------------------------
- //9.5 RDD Key-Value 基本“转换”运算
- val kv=sc.parallelize(List((1,2),(1,3),(3,2),(3,4),(4,5)))
- kv.keys.collect()
- kv.filter{case (key,value)=>key<5}.collect
- //mapValues运算 kv.mapValues(x=>x*x).collect()
- //sortByKey (按key,从小到大排列) kv.sortByKey(true).collect()
- //reduceByKey kv.reduceByKey((x,y)=>x+y).collect //根据key进行reduce并对value执行func
- kv.reduceByKey(_+_).collect
- //9.6 多个RDD Key-Value “转换”运算
- val r1 = sc.parallelize(List((1,2),(1,3),(3,2),(3,4),(4,5)))
- val r2 = sc.parallelize(List((3,8)))
- scala> r1.join(r2).foreach(println) //join(by key) return(key with key's value)
- --->(3,(2,8))
- (3,(4,8))
- scala> r1.leftOuterJoin(r2).foreach(println) //join(by key-left_all) return (key's left_value-all)
- --->(1,(2,None))
- (1,(3,None))
- (3,(2,Some(8)))
- (4,(5,None))
- (3,(4,Some(8)))
- scala> r1.rightOuterJoin(r2).foreach(println) //join (by key-right_all) return (key's right_value-all)
- --->(3,(Some(2),8))
- (3,(Some(4),8))
- scala> r1.subtractByKey(r2).collect //join (exclude common_keys) return (keys) (join的逆反)
- --->res56: Array[(Int, Int)] = Array((4,5), (1,2), (1,3))
- //9.7 Key-Value “动作”运算
- val r1 = sc.parallelize(List((1,2),(1,3),(3,2),(3,4),(4,5)))
- scala> r1.first --->res0: (Int, Int) = (1,2)
- scala> r1.take(2) --->res2: Array[(Int, Int)] = Array((1,2), (1,3))
- scala> r1.first._1 --->res5: Int = 1
- //countByKey 根据key进行count
- scala> r1.countByKey --->res8: scala.collection.Map[Int,Long] = Map(4 -> 1, 1 -> 2, 3 -> 2)
- //collectAsMap 创建Key-Value对照表
- scala> r1.collectAsMap --->res11: scala.collection.Map[Int,Int] = Map(4 -> 5, 1 -> 3, 3 -> 4)
- //lookup运算(by_key)
- scala> r1.collect --->res16: Array[(Int, Int)] = Array((1,2), (1,3), (3,2), (3,4), (4,5))
- //9.8 Broadcast 广播变量
- //不使用广播变量的映射对照
- val kf = sc.parallelize(List((1,"apple"),(2,"banana"),(3,"orange")))
- scala> val kfm=kf.collectAsMap --->kfm: scala.collection.Map[Int,String] = Map(2 -> banana, 1 -> apple, 3 -> orange)
- scala> val id = sc.parallelize(List(2,1,3,1))
- scala> val fn = id.map(x=>kfm(x)).collect --->fn: Array[String] = Array(banana, apple, orange, apple)
- //使用广播变量
- val kf = sc.parallelize(List((1,"apple"),(2,"banana"),(3,"orange")))
- scala> val kfm=kf.collectAsMap --->kfm: scala.collection.Map[Int,String] = Map(2 -> banana, 1 -> apple, 3 -> orange)
- val bfm = sc.broadcast(kfm) //使用广播以节省内存与传送时间
- scala> val id = sc.parallelize(List(2,1,3,1))
- val fn = id.map(x=>bfm.value(x)).collect
- //9.9 accumulator 累加器
- val acc = sc.parallelize(List(2,1,3,1,5,1))
- val total = sc.accumulator(0.0)
- val num = sc.accumulator(0)
- acc.foreach(i=>{total+=1 ; num+=1})
- println("total=" + total.value + ",num=" + num.value)
- val avg = total.value/num.value
- //9.10 RDD Persistence持久化
- Spark RDD 持久化机制:可以用于将需要重复运算的RDD存储在内存中,以便大幅提升运算效率。
- ------------------------------------------------------------------------------------------------------------------------------------
- MEMORY_ONLY spark会将RDD对象以Java对象反串行化(序列化)在JVM的堆空间中,而不经过序列化处理。
- 如果RDD太大无法完全存储在内存中,多余的RDD partitions不会缓存在内存中,而是需要重新计算
- MEMORY_AND_DISK 尽量将RDD以Java对象反串行化在JVM的在内存中,如果内存缓存不下了,就将剩余分区缓存在磁盘中
- MEMORY_ONLY_SER 将RDD进行序列化处理(每个分区序列化为一个字节数组)然后缓存在内存中。
- 因为需要再进行反序列化,会多使用CPU计算资源,但是比较省内存的存储空间
- 多余的RDD partitions不会缓存在内存中,而是需要重新计算
- MEMORY_AND_DISK_SER 和MEMORY_ONLY_SER类似,多余的RDD partitions缓存在磁盘中
- DISK_ONLY 仅仅使用磁盘存储RDD的数据(未经序列化)
- MEMORY_ONLY_2,MEMORY_AND_DISK_2
- ------------------------------------------------------------------------------------------------------------------------------------
- RDD.persist
- RDD.unpersist
- val a = sc.parallelize(List(2,1,3,1,5,1))
- a.presist()
- //设置RDD.persist存储等级范例
- import org.apache.spark.storage.StorageLevel
- val intRDDMemoryAndDisk = sc.parallelize(List(2,1,3,1,5,1))
- intRDDMemoryAndDisk.persist(StorageLevel.MEMORY_AND_DISK)
- intRDDMemoryAndDisk.unpersist()
- //9.11 使用spark创建WordCount
- sudo gedit /home/zieox/桌面/tes.txt
- val tf = sc.textFile("file:/home/zieox/桌面/tes.txt") //读取
- val string = tf.flatMap(l=>l.split(" ")) //处理 (这里使用flatMap读取并创建stringRDD)
- val wordscount = string.map(w => (w,1)).reduceByKey(_+_) //计算每个单词出现的次数
- val wordscount = tf.flatMap(l=>l.split(" ")).map(w => (w,1)).reduceByKey(_+_)
- wordscount.saveAsTextFile("file:/home/zieox/桌面/output") //保存计算结果
- //9.12 Spark WordCount
- scala> tf.flatMap(i=>i.split(" ")).map((_,1)).collect
- --->res59: Array[(String, Int)] = Array((28,1), (march,1), (,,1), (its,1), (a,1), (raining,1), (day,1), (!,1))
- //WordsCount
- val wordscount=sc.textFile("file:/home/zieox/桌面/tes.txt").flatMap(l=>l.split(" ")).map((_,1)).reduceByKey(_+_).foreach(println)
- ----------------------------------------------------------CP10 Spark的集成开发环境
- //10.1 下载与安装eclipse Scala IDE
- 下载: http://scala-ide.org/
- 解压: tar -xzvf scala-SDK-4.7.0-vfinal-2.12-linux.gtk.x86_64.tar.gz
- 移动: mv eclipse /home
- 创建link:
- //10.2 下载项目所需的Library
- spark-assembly
- joda-Time
- jfreeChart
- jcommon
- //创建lib目录存放连接库 mkdir -p ~/workspace/Lib
- //复制spark.jar sudo cp /usr/local/spark/jars ~/workspace/Lib
- //进入jar下载网站 www.Java2s.com
- //下载joda-time
- cd ~/workspace/Lib ---> wegt http://www.java2s.com/Code/JarDownload/joda/joda-time-2.1.jar.zip ---> unzip -j joda-time-2.1.jar.zip
- //下载jfreechart
- wget http://www.java2s.com/Code/JarDownload/jfreechart/jfreechart.jar.zip ---> unzip -j jfreechart.jar.zip
- //下载saprk-core
- wget http://www.java2s.com/Code/JarDownload/spark/spark-core_2.9.2-0.6.1.jar.zip unzip -j spark-core_2.9.2-0.6.1.jar.zip
- wget http://www.java2s.com/Code/JarDownload/spark/spark-core_2.9.2-0.6.1-sources.jar.zip unzip -j spark-core_2.9.2-0.6.1-sources.jar.zip
- //下载jcommon
- wget http://www.java2s.com/Code/JarDownload/jcommon/jcommon-1.0.14.jar.zip
- wget http://www.java2s.com/Code/JarDownload/jcommon/jcommon-1.0.14-sources.jar.zip
- unzip -j jcommon-1.0.14-sources.jar.zip
- //删除zip文件以节省空间 rm *.zip
- //launch Eclipse
- //10.4 创建Spark项目 file -> new -> scala project -> addexternalJARs -> changeScalaVERSION
- //10.5 设置项目链接库(referenced Libraries --> build path --> configure build path...)
- //10.6 新建scala程序 new scala_object
- //10.7 创建WordCount测试文件
- mkdir -p ~/workspace/wordcount2/data
- cd ~/workspace/wordcount2/data
- cp /usr/local/hadoop/LICENSE.txt LICENSE.txt
- //10.8 创建WordCount.scala
- import org.apache.log4j.Logger
- import org.apache.log4j.Level
- import org.apache.spark.{SparkConf,SparkContext}
- import org.apache.spark.rdd.RDD
- import org.apache.hadoop.io.IOUtils;
- object wordcount {
- def main(args: Array[String]): Unit = {
- // 以這兩行設定不顯示 spark 內部的訊息
- Logger.getLogger("org").setLevel(Level.OFF)
- System.setProperty("spark.ui.showConsoleProgress", "false")
- // 清除 output folder
- println("執行RunWordCount")
- // 設定 application 提交到 MASTER 指向的 cluster 或是 local 執行的模式
- // local[4] 代表是在本地以 四核心的 CPU 執行
- val sc = new SparkContext(new SparkConf().setAppName("wordCount").setMaster("local[4]"))
- println("讀取文字檔...")
- val textFile = sc.textFile("hdfs://master:9000/user/hduser/test/README.txt")
- println("開始建立RDD...") // flapMap 是取出文字檔的每一行資料,並以 " " 進行 split,分成一個一個的 word
- // map 是將每一個 word 轉換成 (word, 1) 的 tuple
- // reduceByKey 會根據 word 這個 key,將後面的 1 加總起來,就會得到 (word, 數量) 的結果
- val countsRDD = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
- println("儲存結果至文字檔...")
- try {countsRDD.saveAsTextFile("data/output") ; println("存檔成功")}
- catch {case e: Exception => println("輸出目錄已經存在,請先刪除原有目錄");}
- println("hello")
- }
- }
- 使用intellIDEA创建scalaProject https://www.cnblogs.com/luguoyuanf/p/19c1e4d88a094c07331e912f40ed46c7.html
- scalaProject maven配置
- ------------------------------------------------------------------------------------------------------------------------------
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>zieox</groupId>
- <artifactId>zieox</artifactId>
- <version>1.0-SNAPSHOT</version>
- <inceptionYear>2008</inceptionYear>
- <properties> <scala.version>2.11.4</scala.version> </properties>
- <repositories>
- <repository>
- <id>scala-tools.org</id>
- <name>Scala-Tools Maven2 Repository</name>
- <url>http://central.maven.org/maven2/</url>
- </repository>
- </repositories>
- <pluginRepositories>
- <pluginRepository>
- <id>scala-tools.org</id>
- <name>Scala-Tools Maven2 Repository</name>
- <url>https://mvnrepository.com/artifact</url>
- </pluginRepository>
- </pluginRepositories>
- <dependencies>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>${scala.version}</version>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.11</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-reflect</artifactId>
- <version>${scala.version}</version>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-compiler</artifactId>
- <version>${scala.version}</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.11</artifactId>
- <version>2.4.0</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/joda-time/joda-time -->
- <dependency>
- <groupId>joda-time</groupId>
- <artifactId>joda-time</artifactId>
- <version>2.10.1</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.jfree/jcommon -->
- <dependency>
- <groupId>org.jfree</groupId>
- <artifactId>jcommon</artifactId>
- <version>1.0.23</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.jfree/jfreechart -->
- <dependency>
- <groupId>org.jfree</groupId>
- <artifactId>jfreechart</artifactId>
- <version>1.0.19</version>
- </dependency>
- </dependencies>
- <build>
- <sourceDirectory>src/main/scala</sourceDirectory>
- <testSourceDirectory>src/test/scala</testSourceDirectory>
- <plugins>
- <plugin>
- <groupId>org.scala-tools</groupId>
- <artifactId>maven-scala-plugin</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>compile</goal>
- <goal>testCompile</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <scalaVersion>${scala.version}</scalaVersion>
- <args>
- <arg>-target:jvm-1.5</arg>
- </args>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-eclipse-plugin</artifactId>
- <configuration>
- <downloadSources>true</downloadSources>
- <buildcommands>
- <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
- </buildcommands>
- <additionalProjectnatures>
- <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
- </additionalProjectnatures>
- <classpathContainers>
- <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
- <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
- </classpathContainers>
- </configuration>
- </plugin>
- </plugins>
- </build>
- <reporting>
- <plugins>
- <plugin>
- <groupId>org.scala-tools</groupId>
- <artifactId>maven-scala-plugin</artifactId>
- <configuration>
- <scalaVersion>${scala.version}</scalaVersion>
- </configuration>
- </plugin>
- </plugins>
- </reporting>
- </project>
- ------------------------------------------------------------------------------------------------------------------------------
- 用IDEA打jar包 https://www.cnblogs.com/blog5277/p/5920560.html
- //10.12 spark-submit 的详细介绍
- ------------------------------------------------------常用选项--------------------------------------------------------
- --master MASTER_URL 设置spark运行环境
- --driver-memory MEM Driver程序所使用的内存
- --executor-memory MEM executor程序所使用的内存
- --jars JARS 要运行的application会引用到的外部链接库
- --name NAME 要运行的application名称
- --class CLASS_NAME 要运行的application主要类名称
- ----------------------------------------------------------------------------------------------------------------------
- --master MASTER_URL设置选项
- local 在本地运行:只是用一个线程
- local[K] 在本地运行:使用K个线程
- local[*] 在本地运行:spark会尽量利用计算机上的多核CPU
- spark://HOST:PORT 在Spark Standalone Cluster 上运行,spark://master:7077
- mesos://HOST:PORT 在mesos cluster上运行(default_port:5050)
- yarn-client 在yarn-client上运行,必须设置HADOOP_CONF_DIR or YAEN_CONF_DIR环境变量
- yarn-cluster 在yarn-cluster上运行,必须设置HADOOP_CONF_DIR or YAEN_CONF_DIR环境变量
- ----------------------------------------------------------------------------------------------------------------------
- //10.13 在本地local模式运行WordCount程序
- 1.打jar包 https://blog.csdn.net/zrc199021/article/details/53999293
- 2.将jar放到usr/local/hadoop/bin 下
- 3.spark_submit
- spark-submit --driver-memory 2g --master local[4] --class WordCount /usr/local/hadoop/bin/zieoxscala.jar
- spark-submit
- --driver-memory 2g 设置dirver程序使用2G内存
- --master local[4] 本地运行使用4个线程
- --class WordCount 设置main类
- /usr/local/hadoop/bin/zieoxscala.jar jar路径
- //10.14 在hadoop yarn-client 运行wordcount程序
- //1.上传LICENSE.txt至HDFS
- hadoop fs -mkdir /data
- hadoop fs -copyFromLocal /home/zieox/桌面/LICENSE.txt data
- //2.修改错误标签
- sudo gedit ~/.bashrc
- 添加: export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
- 生效bashrc: source ~/.bashrc
- zieox@master:~/IdeaProjects/zieoxscala/out/artifacts/zieoxscala_jar$ cp zieoxscala.jar /usr/local/hadoop/bin
- spark-submit --driver-memory 2g --master local[4] --class wordcount --master yarn-client ~/IdeaProjects/zieoxscala/out/artifacts/zieoxscala_jar/zieoxscala.jar
- //查看执行结果 hadoop fs -cat /user/hduser/test/output/part-00000
- //IDEA项目远程调试hadoop入门(maven项目) https://www.cnblogs.com/zh-yp/p/7884084.html
- //10.15 在Spark Standalone Cluster 上运行wordcount程序
- spark-submit --driver-memory 2g --master local[4] --class wordcount --master spark://master:7077 ~/IdeaProjects/zieoxscala/out/artifacts/zieoxscala_jar/zieoxscala.jar
- ----------------------------------------------------------CP11 创建推荐引擎
- https://www.jianshu.com/p/b909d78f7d72
- mvn dependency:resolve -Dclassifier=sources
- https://www.cnblogs.com/flymercurial/p/7859595.html
- //mvlens数据集下载 https://grouplens.org/datasets/movielens/
- mkdir -p ~/workspace/recommend/data
- cd ~/workspace/recommend/data
- unzip -j ml-100k
- u.data 评分数据
- u.item 电影数据
- //11.5 使用spark-shell导入ml-100k
- //读取数据
- val rawData = sc.textFile("file:/home/zieox/workspace/recommend/data/u.data")
- rawData.first()
- rawData.take(5).foreach(println) //打印前5行
- rawData.map(_.split('\t')(1).toDouble).stats() //查看第二列统计信息
- //使用ALS.train进行训练
- import org.apache.spark.mllib.recommendation.ALS
- import org.apache.spark.mllib.recommendation.Rating
- val ratings = rawData.map(_.split('\t').take(3))
- val ratingRDD = ratings.map{case Array(user,movie,rating)=>Rating(user.toInt,movie.toInt,rating.toDouble)}
- //显示评分训练 p249
- ALS.train(ratings:RDD[Rating],rank:Int,iterations:Int,lambda:Double):MatrixFactorizationModel
- //Rating 数据源RDD,rank 原矩阵m*n 分解为 m*rank和rank*n矩阵,iterations 计算次数,lambda 建议值0.01,返回数据MatrixFactorizationModel
- //隐式训练
- ALS.trainlmplicit(ratings:RDD[Rating],rank:Int,iterations:Int,lambda:Double):MatrixFactorizationModel
- //进行显式训练
- val model = ALS.train(ratingsRDD,10,10,0.01)
- //11.8 使用模型进行推荐
- MatrixFactorizationModel.model.recommendProducts(user:Int,num:Int):Array[Rating] //使用模型推荐
- model.recommendProducts(196,5).mkString("\n") //针对用户推荐
- model.predict(196,464) //查看针对用户推荐评分 (查看向user:196 推荐464号产的推荐评分)
- MatrixFactorizationModel.model.recommendUsers(product,num) //针对产品推荐给用户
- model.recommendUsers(464,5).mkString("\n") //实际执行(针对电影推荐的用户)
- //11.9 显示推荐电影的名称
- val itemRDD = sc.textFile("file:/home/zieox/workspace/recommend/data/u.item") //导入文件item
- //创建ID_Movie对照表
- val movieTitle =
- itemRDD.map(line=>line.split("\\|") //处理 按照 ‘\\|’分列
- .take(2)) //取出前2个字段
- .map(array=>(array(0).toInt,array(1))) //对取出的array做处理,转化id为数字
- .collectAsMap() //创建ID_Movie对照表
- movieTitle.take(5).foreach(println) //查看前5条
- movieTitle(146) //查看某ID电影名称
- //显示推荐电影
- model.recommendProducts(196,5).map(rating => (rating.product,movieTitle(rating.product),rating.rating)).foreach(println))
- //11.10 创建Recommend项目P261
- https://www.cnblogs.com/flymercurial/p/7868606.html
- https://www.jianshu.com/p/61fac2245f1d
- import java.io.File
- import scala.io.Source
- import org.apache.log4j.Logger
- import org.apache.log4j.Level
- import org.apache.spark.SparkConf
- import org.apache.spark.SparkContext
- import org.apache.spark.SparkContext._
- import org.apache.spark.rdd
- import org.apache.spark.mllib.recommendation.{ALS,Rating,MatrixFactorizationModel}
- import scala.collection.immutable.Map
- object recommend {
- def PrepareData(): (RDD[Rating], Map[Int, String]) = {
- val sc = new SparkContext(new SparkConf().setAppName("Recommend").setMaster("local[4]"))
- print("开始读取用户评分数据中...")
- val rawUserData = sc.textFile("file:/home/zieox/workspace/recommend/data/u.data")
- val rawRatings = rawUserData.map(_.split("\t").take(3))
- val ratingsRDD = rawRatings.map{ case Array(user,movie,rating) => Rating(user.toInt,movie.toInt,rating.toDouble) }
- println("共计: "+ratingsRDD.count.toString()+"条 ratings")
- print("开始读取电影数据中...")
- val itemRDD = sc.textFile("file:/home/zieox/workspace/recommend/data/u.item")
- val movieTitle = itemRDD.map(line =>line.split("\\|").take(2)).map(array => (array(0).toInt,array(1))).collect().toMap
- val numRatings = ratingsRDD.count()
- val numUsers = ratingsRDD.map(_.user).distinct().count()
- val numMovies = ratingsRDD.map(_.product).distinct().count()
- println("共计: ratings:"+numRatings+" User "+numUsers+" Movie "+numMovies)
- return (ratingsRDD,movieTitle)
- }
- def RecommendMovies(model:MatrixFactorizationModel,movieTitle:Map[Int,String],inputUserID:Int) = {
- val RecommendMovie = model.recommendProducts(inputUserID,10)
- var i = 1
- println("针对用户id"+inputUserID+"推荐下列电影:")
- RecommendMovie.foreach{
- r => println(i.toString() + "." + movieTitle(r.product) + "评分: " + r.rating.toString())
- i += 1
- }
- }
- def RecommendUsers(model:MatrixFactorizationModel,movieTitle:Map[Int,String],inputMovieID:Int) = {
- val RecommendUser = model.recommendUsers(inputMovieID, 10)
- var i = 1
- println("针对电影 id" + inputMovieID + "电影名: " + movieTitle(inputMovieID.toInt) + "推荐下列用户id:" )
- RecommendUser.foreach{
- r => println(i.toString + "用户id:" + r.user + "评分:" + r.rating)
- i = i + 1
- }
- }
- def recommend(model:MatrixFactorizationModel,movieTitle:Map[Int,String]) = {
- var choose = ""
- while (choose != "3") {
- print("请选择要推荐的类型 1.针对用户推荐电影 2.针对电影推荐感兴趣的用户 3.离开?")
- choose = readLine()
- if (choose == "1") {
- print("请输入用户id?")
- val inputUserID = readLine()
- RecommendMovies(model,movieTitle,inputUserID.toInt)
- }
- else if (choose == "2") {
- print("请输入电影的id?")
- val inputMovieID = readLine()
- RecommendUsers(model,movieTitle,inputMovieID.toInt)
- }
- }
- }
- def main(args:Array[String]) {
- val (ratings,movieTitle) = PrepareData()
- val model = ALS.train(ratings,5,20,0.1)
- recommend(model,movieTitle)
- }
- }
- //11.15 创建AlsEvaluation.scala 调校推荐引擎参数 http://blog.sina.com.cn/s/blog_1823e4e0f0102x0ov.html
- import java.io.File
- import scala.io.Source
- import org.apache.log4j.{Level, Logger}
- import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
- import org.joda.time.{DateTime, Duration}
- import org.joda.time._
- import org.joda.time.format._
- import org.jfree.data.category.DefaultCategoryDataset
- import org.apache.spark.mllib.regression.LabeledPoint
- /* Created by Weipengfei on 2017/5/3 0003. ALS过滤算法调校参数 */
- object AlsEvaluation {
- /*设置日志及乱七八糟的配置*/
- def SetLogger: Unit ={
- System.setProperty("hadoop.home.dir", "/usr/local/hadoop")
- Logger.getLogger("org").setLevel(Level.OFF)
- Logger.getLogger("com").setLevel(Level.OFF)
- System.setProperty("spark.ui.showConsoleProgress","false")
- Logger.getRootLogger.setLevel(Level.OFF)
- }
-
- /*数据准备 @return 返回(训练数据,评估数据,测试数据)*/
- def PrepareData():(RDD[Rating],RDD[Rating],RDD[Rating])={
- val sc=new SparkContext(new SparkConf().setAppName("Recommend").setMaster("local[2]").set("spark.testing.memory","21474800000"))
- //创建用户评分数据
- print("开始读取用户评分数据中...")
- val rawUserData=sc.textFile("file:/home/zieox/workspace/recommend/data/u.data")
- val rawRatings=rawUserData.map(_.split("\t").take(3))
- val ratingsRDD=rawRatings.map{case Array(user,movie,rating) => Rating( user.toInt ,movie.toInt,rating.toFloat)}
- println("共计:"+ratingsRDD.count().toString+"条评分")
- //创建电影ID和名称对应表
- print("开始读取电影数据中...")
- val itemRDD=sc.textFile("file:/home/zieox/workspace/recommend/data/u.item")
- val moiveTitle=itemRDD.map(_.split("\\|").take(2)).map(array=>(array(0).toInt,array(1))).collect().toMap
- //显示数据记录数
- val numRatings=ratingsRDD.count()
- val numUser=ratingsRDD.map(_.user).distinct().count()
- val numMoive=ratingsRDD.map(_.product).distinct().count()
- println("共计:评分"+numRatings+"条 用户"+numUser+"个 电影"+numMoive+"个")
- //将数据分为三个部分并且返回
- print("将数据分为:")
- val Array(trainData,validationData,testData)=ratingsRDD.randomSplit(Array(0.8,0.1,0.1))
- println("训练数据:"+trainData.count()+"条 评估数据:"+validationData.count()+"条 测试数据:"+testData.count()+"条")
- (trainData,validationData,testData)
- }
-
- /*计算RMSE值 *@param model 训练模型 *@param validationData 评估数据 *@return RMSE值 */
- def computeRmse(model: MatrixFactorizationModel, validationData: RDD[Rating]):(Double) ={
- val num=validationData.count();
- val predictedRDD=model.predict(validationData.map(r=>(r.user,r.product)))
- val predictedAndVali=predictedRDD.map(p=>((p.user,p.product),p.rating)).join(validationData.map(r=>((r.user,r.product),r.rating))).values
- math.sqrt(predictedAndVali.map(x=>(x._1-x._2)*(x._1-x._2)).reduce(_+_)/num)
- }
-
- /** 训练模型
- * @param trainData 训练数据
- * @param validationData 评估数据
- * @param rank 训练模型参数
- * @param numIterations 训练模型参数
- * @param lambda 训练模型参数
- * @return 模型返回的RMSE(该值越小,误差越小)值,训练模型所需要的时间
- */
- def trainModel(trainData: RDD[Rating], validationData: RDD[Rating], rank: Int, numIterations: Int, lambda: Double):(Double,Double)={
- val startTime=new DateTime()
- val model=ALS.train(trainData,rank,numIterations,lambda)
- val endTime=new DateTime()
- val Rmse=computeRmse(model,validationData)
- val duration=new Duration(startTime,endTime)
- println(f"训练参数:rank:$rank= 迭代次数:$numIterations%.2f lambda:$lambda%.2f 结果 Rmse $Rmse%.2f"+" 训练需要时间:"+duration.getMillis+"毫秒")
- (Rmse,duration.getStandardSeconds)
- }
-
- /** 使用jfree.char评估单个参数,这里没有实现
- * @param trainData 训练数据
- * @param validationData 评估数据
- * @param evaluateParameter 评估参数名称
- * @param rankArray rank参数数组
- * @param numIterationsArray 迭代次数参数数组
- * @param lambdaArray lambda参数数组
- */
-
- def evaluateParameter(trainData:RDD[Rating],validationData:RDD[Rating],evaluateParameter:String,rankArray:Array[Int],numIterationsArray:Array[Int],lambdaArray:Array[Double])={
- val dataBarChart = new DefaultCategoryDataset()
- val dataLineChart = new DefaultCategoryDataset()
- for(rank <- rankArray;numIterations <- numIterationsArray;lambda <- lambdaArray){
- val (rmse,time) = trainModel(trainData,validationData,rank,numIterations,lambda)
- val parameterData = evaluateParameter match{
- case "rank" => rank;
- case "numIterations" => numIterations;
- case "lambda" => lambda
- }
- dataBarChart.addValue(rmse,evaluateParameter,parameterData.toString())
- dataLineChart.addValue(time,"Time",parameterData.toString())
- }
- Chart.plotBarLineChart("ALS evaluations " + evaluateParameter,evaluateParameter,"RMSE",0.58,5,"Time",dataBarChart,dataLineChart)
- }
-
- /*
- * 三个参数交叉评估,找出最好的参数组合
- * @param trainData 训练数据
- * @param validationData 评估数据
- * @param rankArray rank参数数组
- * @param numIterationsArray 迭代次数参数数组
- * @param lambdaArray lambda参数数组
- * @return 返回由最好参数组合训练出的模型
- */
-
- def evaluateAllParameter(trainData:RDD[Rating],validationData:RDD[Rating],rankArray:Array[Int],numIterationsArray:Array[Int],lambdaArray:Array[Double]): MatrixFactorizationModel = {
- val evaluations=for(rank <- rankArray;numIterations <- numIterationsArray;lambda <- lambdaArray) yield {
- val (rmse,time)=trainModel(trainData,validationData,rank,numIterations,lambda)
- (rank,numIterations,lambda,rmse)
- }
- val Eval=(evaluations.sortBy(_._4))
- val bestEval=Eval(0)
- println("最佳模型参数:rank:"+bestEval._1+" 迭代次数:"+bestEval._2+" lambda:"+bestEval._3+" 结果rmse:"+bestEval._4)
- val bestModel=ALS.train(trainData,bestEval._1,bestEval._2,bestEval._3)
- (bestModel)
- }
-
- /*训练评估 *@param trainData 训练数据 *@param validationData 评估数据 *@return 返回一个最理想的模型 */
- def trainValidation(trainData:RDD[Rating], validationData:RDD[Rating]):MatrixFactorizationModel={
- println("------评估rank参数使用------")
- evaluateParameter(trainData,validationData,"rank",Array(5,10,15,20,50,100),Array(10),Array(0.1))
- println("------评估numIterations------")
- evaluateParameter(trainData,validationData,"numIterations",Array(10),Array(5,10,15,2,25),Array(0.1))
- println("------评估lambda------")
- evaluateParameter(trainData,validationData,"lambda",Array(10),Array(10),Array(0.05,0.1,1,5,10.0))
- println("------所有参数交叉评估找出最好的参数组合------")
- val bestmodel=evaluateAllParameter(trainData,validationData,Array(5,10,15,20,50,100),Array(5,10,15,20,25),Array(0.01,0.05,0.1,1,5,10.0))
- bestmodel
- }
-
- def main(args: Array[String]) {
- SetLogger
- println("========================数据准备阶段========================")
- val (trainData,validationData,testData)=PrepareData()
- trainData.persist();validationData.persist();testData.persist()
- println("========================训练验证阶段========================")
- val bestModel=trainValidation(trainData,validationData)
- println("======================测试阶段===========================")
- val testRmse=computeRmse(bestModel,testData)
- println("使用bestModel测试testData,结果rmse="+testRmse)
- }
- }
-
-
-
- import org.jfree.chart._
- import org.jfree.data.xy._
- import org.jfree.data.category.DefaultCategoryDataset
- import org.jfree.chart.axis.NumberAxis
- import org.jfree.chart.axis._
- import java.awt.Color
- import org.jfree.chart.renderer.category.LineAndShapeRenderer;
- import org.jfree.chart.plot.DatasetRenderingOrder;
- import org.jfree.chart.labels.StandardCategoryToolTipGenerator;
- import java.awt.BasicStroke
-
- object Chart {
- def plotBarLineChart(Title: String, xLabel: String, yBarLabel: String, yBarMin: Double, yBarMax: Double, yLineLabel: String, dataBarChart : DefaultCategoryDataset, dataLineChart: DefaultCategoryDataset): Unit = {
-
- //画出Bar Chart
- val chart = ChartFactory
- .createBarChart(
- "", // Bar Chart 标题
- xLabel, // X轴标题
- yBarLabel, // Bar Chart 标题 y轴标题l
- dataBarChart , // Bar Chart数据
- org.jfree.chart.plot.PlotOrientation.VERTICAL,//画图方向垂直
- true, // 包含 legend
- true, // 显示tooltips
- false // 不要URL generator
- );
- //取得plot
- val plot = chart.getCategoryPlot();
- plot.setBackgroundPaint(new Color(0xEE, 0xEE, 0xFF));
- plot.setDomainAxisLocation(AxisLocation.BOTTOM_OR_RIGHT);
- plot.setDataset(1, dataLineChart); plot.mapDatasetToRangeAxis(1, 1)
- //画直方图y轴
- val vn = plot.getRangeAxis(); vn.setRange(yBarMin, yBarMax); vn.setAutoTickUnitSelection(true)
- //画折线图y轴
- val axis2 = new NumberAxis(yLineLabel); plot.setRangeAxis(1, axis2);
- val renderer2 = new LineAndShapeRenderer()
- renderer2.setToolTipGenerator(new StandardCategoryToolTipGenerator());
- //设置先画直方图,再画折线图以免折线图被盖掉
- plot.setRenderer(1, renderer2);plot.setDatasetRenderingOrder(DatasetRenderingOrder.FORWARD);
- //创建画框
- val frame = new ChartFrame(Title,chart); frame.setSize(500, 500);
- frame.pack(); frame.setVisible(true)
- }
- }
- --------------------------------------------------------------------------
- 最佳模型参数:rank:5 迭代次数:15 lambda:0.1 结果rmse:0.9178637704570859
- ======================测试阶段===========================
- 使用bestModel测试testData,结果rmse=0.9177717267204295
- --------------------------------------------------------------------------
-
-
- ----------------------------------------------------------CP12 StumbleUpon数据集
- https://www.kaggle.com/c/stumbleupon/data
-
- ----------------------------------------------------------CP13 决策树二元分类
- object RunDecisionTreeBinary{
- def main(args:Array[String]):Unit={
- SetLogger()
- val sc = new SparkContext(new SparkConf().setAppName("DecisionTreeBinary").setMaster("local[4]"))
- println("==============preparing===============")
- val (trainData,validationData,testData,categoriesMap)=PrepareData(sc)
- trainDate.persist();validationData.persist();testData.persist();
- println("==============evaluating==============")
- val model=trainEvaluate(trainDate,validationData)
- println("===============testing================")
- val auc = evaluateModel(model,testData)
- println("使用testdata测试的最佳模型,结果 AUC:"+auc)
- println("============predicted data=============")
- PredictData(sc,model,categoriesMap)
- trainDate.Unpersist();validationData.Unpersist();testData.Unpersist();
- }
- }
-
- def PrepareData(sc:SparkContext):(RDD[LabeledPoint],RDD[LabeledPoint],RDD[LabeledPoint],Map[String,Int])={
- //1.导入并转换数据
- print("开始导入数据...")
- val rawDataWithHeader=sc.textFile("data/train.tsv")
- val rawData=rawDataWithHeader.mapPartitionsWithIndex{(idx,iter)=> if (idx==0) iter.drop(1) else iter} //删除第一行表头
- val lines=rawData.map(_.split("\t")) //读取每一行的数据字段
- println("共计:"+lines.count.toString()+"条")
- //2.创建训练评估所需数据的RDD[LabeledPoint]
- val categoriesMap = lines.map(fields => fields(3)).distinct.collect.zipWithIndex.toMap //创建网页分类对照表
- val LabeledPointRDD=lines.map{fields =>
- val trFields=fields.map(_.replaceAll("\"",""))
- val categoryFeaturesArray=Array.ofDim[Double](categoriesMap.size)
- val categoryIdx=categoriesMap(fields(3))
- categoryFeaturesArray(categoryIdx)=1
- val numericalFeatures=trFields.slice(4,fields.size-1).map(d=>if (d=="?") 0.0 else d.toDouble)
- val label = trFields(fields.size-1).toInt
- LabeledPoint(label,Vectors.dense(categoryFeaturesArray++numericalFeatures))
- }
- //3.用随机方式将数据分为3个部分并返回
- val Array(trainData,validationData,testData) = LabeledPointRDD.randomSplit(Array(0.8,0.1,0.1))
- println("将数据分trainData: "+trainData.count() + " validationData:"+validationData.count()+" testData:"+testData.count())
- return (trainData,validationData,testData,categoriesMap)
- //训练评估阶段
- def trainEvaluate(trainData:RDD[LabeledPoint],validationData:RDD[LabeledPoint]):DecisionTreeModel = {
- print("开始训练...")
- val (model,time) = trainModel(trainData,"entropy",10,10)
- println("训练完成,所需时间:"+time+" 毫秒")
- val AUC = evaluateModel(model,validationData)
- println("评估结果 AUC = "+AUC)
- return (model)
- }
- //训练DecisionTree模型Model
- def trainModel(trainData:RDD[LabeledPoint],impurity:String,maxDepth:Int,maxBins:Int):(DescisionTreeModel,double)={
- val startTime = new DateTime()
- val model = DecisionTree.trainClassifier(trainData,2,Map[Int,Int](),impurity,maxDepth,maxBins)
- val endTime=new DateTime()
- val duration=new Duration(startTime,endTime)
- (model,duration.getMillis())
- }
- //评估模型
- def evaluateModel(model:DecisionTreeModel,validationData:RDD[LabeledPoint]):(Double)={
- val secoundAndLabels=validationData.map{data=>
- var predict=model.predict(data.features)
- (predict,data.label)
- }
- val Metrics = new BinaryClassificationMetrics(scoreAndLabels)
- val AUC = Metrics.areaUnderROC
- (AUC)
- }
- //预测阶段
- def PredictData(sc:SparkContext,model:DecisionTreeModel,categoriesMap:Map[String,Int]):Unit = {
- //1.导入并转换数据
- val rawDataWithHeader=sc.textFile("data/test.tsv")
- val rawData = rawDataWithHeader.mapPartitionsWithIndex{ (idx,iter)=>if (idx==0) iter.drop(1) else iter}
- val line = rawData.map(_.split("/t"))
- println("共计: "+lines.count.toString()+" 条")
- //2.创建训练评估所需数据RDD[LabeledPoint]
- val dataRDD=lines.take(20).map{fields =>
- val trFields = fields.map(_.replaceAll("\"",""))
- val categoryFeaturesArray=Array.ofDim[Double](categoriesMap.size)
- val categoryIdx=categoriesMap(fields(3))
- categoryFeaturesArray(categoryIdx)=1
- val numericalFeatures=trFields.slice(4,fields.size-1).map(d=>if (d=="?") 0.0 else d.toDouble)
- val label = 0
- //3.进行预测
- val url = trFields(0)
- val Features = Vectors.dense(categoryFeaturesArray++numericalFeatures)
- val predictDesc = {predict match{ case 0 =>"暂时性网页"case 1 => "常青网页"}}
- println("网址: "+url+"==>预测:"+predictDesc)
- }
- }
- //评估模型
- def evaluateModel(model:DecisionTreeModel,validationData:RDD[LabeledPoint]):(Double)={
- val scoreAndLabels = validationData.map{data=>
- var predict = model.predict(data.features)
- (predict,data.label)
- }
- val Metrics = new BinaryClassificationMetrics(scoreAndLabels)
- val AUC = Metrics.areaUnderROC
- (AUC)
- }
- //预测模型
- def PredictData(sc:SparkContext,model:DecisionTreeModel,categoriesMap:Map[String,Int]):Unit={
- //1.导入并转换数据
- val rawDataWithHeader = sc.textFile("data/train.tsv")
- val rawData = rawDataWithHeader.mapPartitionsWithIndex{(idx,iter)=> if (idx == 0) iter.drop(1) else iter}
- val lines = rawData.map(_.split("\t"))
- println("共计:"+lines.count.toString()+"条")
- //2.创建训练评估所需的数据RDD[LabeledPoint]
- val dataRDD = lines.take(20).map{ fields =>
- val trFields=fields.map(_.replaceAll("\"",""))
- val categoryFeaturesArray=Array.ofDim[Double](categoriesMap.size)
- val categoryIdx=categoriesMap(fields(3))
- categoryFeaturesArray(categoryIdx)=1
- val numericalFeatures=trFields.slice(4,fields.size).map(d=>if (d=="?") 0.0 else d.toDouble)
- val label = 0
- //3.进行预测
- val url = trFields(0)
- val Features = Vectors.dense(categoryFeaturesArray++numericalFeatures)
- val predict = model.predict(Features).toInt
- val predictDesc = {predict match{ case 0 =>"暂时性网页"case 1 => "常青网页"}}
- println("网址: "+url+"==>预测:"+predictDesc)
- }
- }
- //parametersTunning 参数调校函数
- def parametersTunning(trainData:RDD[LabeledPoint],validationData:RDD[LabeledPoint]):DecisionTreeModel={
- println("-------评估Impurity参数使用gini,entropy-------")
- evaluateParameter(trainDta,validationData,"impurity",Array("gini","entropy"),Array(10),Array(10))
- println("-------评估MaxDepth参数使用(3,5,10,15,20)-------")
- evaluateParameter(trainDta,validationData,"MaxDepth",Array("gini"),Array(3,5,10,15,20),Array(10))
- println("-------所有参数交叉评估找出最好的参数组合-------")
- evaluateParameter(trainDta,validationData,,Array("gini","entropy"),Array(3,5,10,15,20))
- return (bestModel)
- }
-
- //evaluateParameter 函数
- def evaluateParameter(trainData:RDD[LabeledPoint],validationData:RDD[LabeledPoint],evaluateParameter:String,impurityArray:Array[String],maxdepthArray:Array[Int],maxBinsArray:Array[Int])={
- var dataBarChart = new DefaultCategoryDataset()
- var dataLineChart = new DefaultCategoryDataset()
- for (impurity <- impurityArray; maxDepth <- maxdepthArray; maxBins <- maxBinsArray){
- val (model,time)=trainModel(trainData,impurity,maxDepth,maxBins)
- val auc = evaluateModel(model,validationData)
- val parameterData = evaluateParameter match{
- case "impurity"=>impurity;
- case "maxDepth"=>maxDepth;
- case "maxBins"=>maxBins
- }
- dataBarChart.addValue(auc,parameterData.toString())
- dataLineChart.addValue(time,"Time",parameterData.toString())
- }
- Chart.plotBarLineChart("DecisionTree evaluations"+evaluateParameter,evaluateParameter,"AUC",0.58,0.7,"Time",dataBarChart,dataLineLineChart)
- }
-
- def evaluateAllParameter(trainData:RDD[LabeledPoint],validationData:RDD[LabeledPoint],impurityArray:Array[String],maxdepthArray:Array[Int],maxBinsArray:Array[Int]): DecisionTreeModel = {
- val evaluationsArray =
- for (impurity<-impurityArray;maxdepth<-maxdepthArray;maxBins<-maxBinsArray) yield {
- val (model,time)=trainModel(trainData,impurity,maxDepth,maxBins)
- val auc = evaluateModel(model,validationData)
- (impurity,maxdepth,maxBins,auc)
- }
- val BestEval = (evaluationsArray.sortBy(_._4).reverse)(0)
- println("调校后最佳参数:impurity:"+BestEval._1+" ,maxDepth:"+BestEval._2+" ,maxBins:"+BestEval._3)
- return bestModel
- }
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- ----------------------------------------------------------CP18 决策树回归分析
- mkdir -p ~/workspace/Classfication/data
- cd ~/workspace/
- //Bike_Sharing数据
- wget https://archive.ics.uci.edu/ml/machine-learning-databases/00275/Bike-Sharing-Dataset.zip
- unzip -j Bike-Sharing-Dataset.zip
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- ----------------------------------------------------------CP19 使用 Apache Zeppelin 数据可视化
- http://zeppelin.incubator.apache.org/
- http://zeppelin.apache.org/
- https://www.cnblogs.com/purstar/p/6294412.html
-
- cd ~/下载
- tar zxvf zeppelin-0.8.1-bin-all.tgz
- sudo mv zeppelin-0.8.1-bin-all /usr/local/zeppelin
- sudo chown -R zieox:zieox /usr/local/zeppelin
- cd /usr/local/zeppelin/conf/
- cp zeppelin-env.sh.template zeppelin-env.sh
- cp zeppelin-site.xml.template zeppelin-site.xml
- sudo gedit /usr/local/zeppelin/conf/zeppelin-env.sh
- //配置zeppelin
- ----------------------------------------------------------------------------
- export JAVA_HOME=/usr/lib/jdk/jdk-8
- export SPARK_HOME=/usr/local/spark
- export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
- export ZEPPELIN_INTP_JAVA_OPTS="-XX:PermSize=512M -XX:MaxPermSize=1024M"
- ----------------------------------------------------------------------------
- export SPARK_MASTER_IP=192.168.56.100
- export SPARK_LOCAL_IP=192.168.56.100
- ----------------------------------------------------------------------------
- 在maven找到
- cp jackson-annotations-2.4.4.jar /usr/local/zeppelin
- cp jackson-core-2.4.4.jar /usr/local/zeppelin
- cp jackson-databind-2.4.4.jar /usr/local/zeppelin
- 替换
- ./lib/jackson-annotations-2.5.0.jar
- ./lib/jackson-core-2.5.3.jar
- ./lib/jackson-databind-2.5.3.jar
-
- //将这2个文件移至/usr/local/zeppelin/lib
- mv jackson-annotations-2.4.4.jar jackson-core-2.4.4.jar jackson-databind-2.4.4.jar -t /usr/local/zeppelin/lib
-
-
-
-
- //启动zeppelin /usr/local/zeppelin/bin/zeppelin-daemon.sh start
- //登录zeppelin Web_UI界面 http://master:8181/#/
-
- sudo mkdir -p /workspace/zeppelin/data
- sudo mv ~/workspace/recommend/data/ml-100k.zip ml-100k.zip
- sudo unzip -j ml-100k
-
- //zeppelin 命令集
- -------------------------------
- %sh
- ls -l /workspace/zeppelin/data
- %sh
- sudo head /workspace/zeppelin/data/u.user
- -------------------------------
- val usertext = sc.textFile("file:/home/zieox/workspace/recommend/data/u.user")
- case class usertable(id:String,age:String,gender:String,occupation:String,zipcode:String)
- val userRDD = usertext.map(s=>s.split("\\|")).map(s=>usertable(s(0),s(1),s(2),s(3),s(4)))
- userRDD.toDF().registerTempTable("usertable")
- println("input: "+userRDD.count+" s")
-
- //在子目录需找名带jackson的文件 find .|grep jackson
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++CP10构建Hadoop集群 P280
-
- //安装hadoop
-
- 1. install java 安装java
- 2. linux account可通linux帐号
- 3. install Hadoop安装hadoop
- 4. Set SSH设置无密匙SSH
- 5. set Hadoop配置hadoop
- 6. 格式化HDFS:hdfs namenode -format
- 7. 启动和停止守护进程:
- 启动守护进程:start-dfs.sh
- 找到namenode主机名:hdfs getconf -namenodes / hdfs getconf -secondarynamenodes
- 启动YARN守护进程:start-yarn.sh/stop-yarn.sh
- 启动MapReduce守护进程:mr-jobhistory-daemon.sh start historyserver
- 8. 创建用户目录:
- hadoop fs -mkdir /user/username
- hadoop fs -chown username:username /user/username
- 设置容量空间:hdfs dfsadmin -setSpaceQuota 1T /user/us
- hadoop配置:https://www.cnblogs.com/yinghun/p/6230436.html
-
- //10.3.3 Hadoop守护进程的关键属性
- 1.HDFS
- dfs.datenode.date.dir 设定datenode存储数据块的目录列表
- dfs.datenode.name.dir 编辑日志和文件系统映像 (支持namenode进行冗余备份)
- dfs.datenode.checkpoint.dir 指定辅助文件系统的检查点目录
- p295 HDFS守护进程关键属性
- p297 Yarn守护进程关键属性
- 2. Yarn
- yarn.nodemanager.resource.memory-mb 设置内存分配量
- hadoop -fs -expunge//删除已在回收站中超过最小时限的所有文件
- //10.5 hadoop 基准测评程序 测试hadoop程序
- p312 //使用TeraSort测评HDFS (基准测评)
- //关于其他基准测评程序
- TestSFSIO主要用于测试HDFS的I/O性能
- MRBench检验小型作业能否快速响应
- NNBench测试namenode硬件的加载过程
- Gridmix基准评测程序套装
- SWIM用来为被测系统生成代表性的测试负载
- TPCx-HS基于TeraSort的标准测评程序
-
- +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++CP11 管理hadoop
-
- //进入和离开安全模式
- hdfs dfsadmin [-safemode enter | leave | get | wait]
- get:查看是否处于安全模式
- wait:执行某条命令之前先退出安全模式
- enter:进入安全模式
- leave:离开安全模式
- //启动审计日志
- gedit hadoop-env.sh
- export HDFS_AUDIT_LOGGER="INFO,RFAAUDIT"
- //tools
- //1.dfsadmin 命令
- //Hadoop常用命令:
- https://blog.csdn.net/suixinlun/article/details/81630902
- https://blog.csdn.net/m0_38003171/article/details/79086780
-
- 2.文件系统检查fsck工具hadoop fsck /
- 3.datanode块扫描器:各个datanode运行一个块扫描器,定期检测节点上的所有块 dfs.datanode.scan.period.hopurs设置
- 4.均衡器 balancer
- 启动均衡器:start-balancer.sh
-
- //设置日志级别
- hadoop daemonlog -setlevel resource-manager-host:8088 \ org.apache.hadoop.yarn.server.resourcemanager DEBUG
-
- //通过修改hadoop-env.sh配置 HADOOP_NAMENODE_OPTS="-Dcom.sun.management.jmxremote.port=8804" 来使远程监控得以访问
- //维护hadoop
- 1.元数据备份:hdfs dfsadmin -fetchImage fsimage.backup
- 2.数据备份:distcp
- 3.文件系统检查:fsck
- 4.文件系统均衡器
- p332
- //关于委任和解除节点
- //1.委任新节点
- 配置hdfs-site.xml指向namemode -> 配置 yarn-site.xml 文件,指向资源管理器 -> 启动datanode和资源管理器守护进程
-
- //添加新节点步骤
- 1. 将新节点IP地址添加到include文件中
- 2. 运行 hdfs dfsadmin -refreshNodes,将审核过的一系列datanode集合更新至namenode信息
- 3. 运行 yarn rmadmin -refreshNodes,将审核过的一系列节点管理器信息更新至资源管理器
- 4. 以新节点更新slaves文件
- 5. 启动新的datanode和节点管理器
- 6. 检查新的datanode和节点管理器是否都出现在网页界面中
-
- //2. 解除旧节点
- 解除节点由exclude文件控制
-
- //从集群中移除节点的步骤如下
- 1. 将待解除的网络地址添加岛exclude文件中,不更新include文件
- 2. 执行 hdfs dfsadmin -refreshNodes,使用一组新的审核过的datanode来更新namenode设置
- 3. 执行 yarn rmsadmin -refreshNodes,使用一组新的审核过的节点管理器来更新资源管理器设置
- 4. 转到网页界面,查看待解除datanode的管理状态是否已经变成“正在解除(Decommission In Progress)”
- 5. 当所有datanode的状态变为“解除完毕”(Decommissioned)时,表明所有块都已经复制完毕
- 6. 从include文件中移除这些节点
- hdfs dfsadmin -refreshNodes
- yarn rmsadmin -refreshNodes
-
- 7. 从slaves文件中移除节点
- //hadoop的升级
- steps P334
- start-dfs.sh -upgrade ...
-
- //为python安装avro
- easy_install avro
- python3 ch-12-avro/src/main/py/write_pairs.py pairs.avro
-
- //pig 宏
- DEFINE max_by_group(X,groupby_key,max_field) RETURNS Y{
- A = GROUP $X BY $group_key;
- $Y=FOREACH A GENERATE group ,MAX($X.$max_field);
- }
-
- record= LOAD "lr/text.txt" AS (year:chararray,temperature:int,quality:int);
- filter_records = FILTER records BY temperature != 9999 and quality IN(1,2,3)
- macro_max_by_group_A = GROUP filtered_records by (year);
- max_tmp = FOREACH macro_max_by_group_A GENERATE group,
- MAX(filter_records.(temperature));
- DUMP max_tmp
- import "./max_tmp.macro"
-
- ----------------------------------------------------------------------
- linux系统进程 :gnome-system-monitor
- //Parquet 是什么:
- http://www.zhimengzhe.com/shujuku/other/21763.html
- ---------------------------------------------------------------------- with hive
- 查看linux版本:lsb_release -a
- ----------------------------------------------------------------------hadoop with spark
-
- sql中提取表
- https://gitlab.com/matteo.redaelli/sqlu/-/blob/master/sqlu/__init__.py
- gsp
- http://www.sqlparser.com/download.php
-
-
-
- //20200924
- //IDEA git 添加项目
- --ubuntu-
- --ssh
- ssh-keygen -t rsa => cd ~/.ssh => cat ~/.ssh/id_rsa.pub => <copytoGitLab>
- idea - vcs - clone - >
-
- //git发布流程
- --dev
- git拉取(pull)
- =>文件放至git上的指定文件夹
- =>git再次拉取
- =>git提交(commit)
- =>git同步(merge into current)
- master文件夹同步:git拉取=>git合并=>git同步
-
- git pull
- new file
- pull
- commit
- git-push
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。