赞
踩
- ---------------------------------CentOS安装JDK与MySQL---------------------------------------
-
- rpm -qa | grep java --查看系统中是否安装jdk
-
- rpm -e --nodeps java包 --强制卸载系统中的jdk
- tar -zxf jdk-7u79-linux-x64.tar.gz -C /opt/modules/ --解压jdk包
- vi ../etc/profile --编辑profile文件
-
-
- export JAVA_HOME=/opt/modules/jdk1.7.0_79 -- 在profile 文件结尾加上这两句话
- export PATH=$PATH:$JAVA_HOME/bin
-
- source /etc/profile 重启profile配置文件
-
-
-
- create table student(id int, name string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
- load data local inpath '/opt/datas/student.txt' into table student ;
-
-
-
- rpm -qa | grep mysql
-
-
- rpm -e --nodeps mysql包 --强制卸载系统中的mysql
-
-
- rpm -ivh MySQL-server-5.6.24-1.el6.x86_64.rpm
-
- ---------------------------------CentOS安装JDK与MySQL---------------------------------------
-
-
-
-
-
-
-
- ---------------------------------Hive创建表---------------------------------------
-
-
- create table if not exists default.log_1017
- (
- ip string comment 'remote ip address',
- user string,
- req_url string comment 'user request url'
- )
- comment 'Web Acsess Logs'
- row format delimited fields terminated by ' '
- stored as textfile ;
-
-
-
- create table if not exists default.log_1017_2
- as select * from default.log_1017
-
- create table if not exists default.log_1017_2
- like default.log_1017
-
-
- create table if not exists student(id int, name string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
- load data local inpath '/opt/datas/log-1017.txt' into table default.log_1017 ;
-
-
-
-
- create table if not exists default.emp
- (
- empno int ,
- ename string,
- job string,
- mgr int,
- hiredate string,
- sal double,
- comm double,
- deptno int
- )
- row format delimited fields terminated by '\t';
-
-
-
-
- create table if not exists default.dept
- (
- deptno int ,
- dname string,
- loc string
- )
- row format delimited fields terminated by '\t';
-
-
-
- load data local inpath '/opt/datas/emp.txt' overwrite into table default.emp ;
- load data local inpath '/opt/datas/dept.txt' into table default.dept ;
-
-
-
- ----创建外部表
- create external table if not exists default.emp_ext
- (
- empno int ,
- ename string,
- job string,
- mgr int,
- hiredate string,
- sal double,
- comm double,
- deptno int
- )
- row format delimited fields terminated by '\t';
-
-
- ----创建分区表
- create table if not exists default.dept_part
- (
- deptno int ,
- dname string,
- loc string
- )
- partitioned by (month string,day string)
- row format delimited fields terminated by '\t';
-
-
- load data local inpath '/opt/datas/dept.txt' overwrite into table default.dept_part partition (month='201810');
-
- ----第一种方式
- dfs -mkdir -p /user/hive/warehouse/dept_part/day=20181020;
- dfs -put /opt/datas/dept.txt /user/hive/warehouse/dept_part/day=20181020;
-
- msck repair table dept_part
-
- ----第二种方式
- dfs -mkdir -p /user/hive/warehouse/dept_part/day=20181021;
- dfs -put /opt/datas/dept.txt /user/hive/warehouse/dept_part/day=20181021;
-
- alert table dept_part add partition (day='20181021')
-
-
- insert overwrite local directory '/opt/datas/hive_exp_emp'
- row format delimited fields terminated by '\t' collection items terminated by '\n'
- select * from default.emp;
-
- insert overwrite directory '/user/root//hive_exp_emp'
- select * from default.emp;
-
-
-
- insert overwrite local directory '/opt/datas/hive_emp_res'
- select * from emp order by empno asc;
-
-
-
- set hive.exec.reducers.max=3
- insert overwrite local directory '/opt/datas/hive_emp_res'
- select * from emp sort by empno asc;
-
-
- insert overwrite local directory '/opt/datas/hive_emp_res'
- select * from emp distribute by deptno sort by empno asc;
-
-
-
- ---------------------------UDF编程---------------------------
-
-
- <!-- Hive Client-->
- <dependency>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-jdbc</artifactId>
- <version>${hive.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-exec</artifactId>
- <version>${hive.version}</version>
- </dependency>
-
-
- UDF编程步骤:
- 1-->Create UDF Java
- 2-->打包成Jar包,并上传。
- 3-->Use UDF Class
- 2-1--> add jar /opt/datas/hive-udf.jar
- 2-2--> create temporary function my_lower as "com.root.senior.hive.udf.LowerUDF"
- 2-3--> select ename ,my_lower(ename) lowername from emp limit 5 ;
-
- CREATE FUNCTION self_lower AS 'com.root.senior.hive.udf.LowerUDF' USING JAR 'hdfs://hadoop.zxk.com:8020/user/root/hive/jars/hiveudf.jar';
- select ename, self_lower(ename) lowername from emp limit 5 ;
-
-
-
- ------------------------------------测试Snappy-----------------------------------
-
- bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.0.jar wordcount /user/root/mapreduce/wordcount/input /user/root/mapreduce/wordcount/output4
-
-
-
- bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.0.jar wordcount
- -Dmapreduce.map.output.compress=true
- -Dmapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec
- /user/root/mapreduce/wordcount/input
- /user/root/mapreduce/wordcount/output2
-
- -------------------------------------测试ORC与PARQUET数据格式-------------------------------------------
-
- create table page_views(
- track_time string,
- url string,
- session_id string,
- referer string,
- ip string,
- end_user_id string,
- city_id string
- )
- ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
- STORED AS TEXTFILE ;
-
-
- load data local inpath '/opt/datas/page_views.data' into table page_views;
- select session_id,count(*) cnt from page_views group by session_id order by cnt desc limit 30 ;
- dfs -du -h /user/hive/warehouse/page_views
-
-
-
- create table page_views_orc(
- track_time string,
- url string,
- session_id string,
- referer string,
- ip string,
- end_user_id string,
- city_id string
- )
- ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
- STORED AS ORC ;
-
- insert into table page_views_orc select * from page_views ;
- select session_id,count(*) cnt from page_views_orc group by session_id order by cnt desc limit 30 ;
- dfs -du -h /user/hive/warehouse/page_views_orc
-
-
- create table page_views_parquet(
- track_time string,
- url string,
- session_id string,
- referer string,
- ip string,
- end_user_id string,
- city_id string
- )
- ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
- STORED AS PARQUET ;
-
- insert into table page_views_parquet select * from page_views ;
- select session_id,count(*) cnt from page_views_parquet group by session_id order by cnt desc limit 30 ;
-
- dfs -du -h /user/hive/warehouse/page_views_parquet
-
- -------------------------------------测试ORC与PARQUET数据格式-------------------------------------------
-
-
-
- -------------------------------------测试ORC与Snappy压缩结合使用----------------------------------------
-
- create table page_views_orc_snappy(
- track_time string,
- url string,
- session_id string,
- referer string,
- ip string,
- end_user_id string,
- city_id string
- )
- ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
- STORED AS orc tblproperties ("orc.compress"="SNAPPY");
-
- insert into table page_views_orc_snappy select * from page_views ;
- dfs -du -h /user/hive/warehouse/page_views_orc_snappy/ ;
-
-
- create table page_views_orc_none(
- track_time string,
- url string,
- session_id string,
- referer string,
- ip string,
- end_user_id string,
- city_id string
- )
- ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
- STORED AS orc tblproperties ("orc.compress"="NONE");
-
- insert into table page_views_orc_none select * from page_views ;
- dfs -du -h /user/hive/warehouse/page_views_orc_none/ ;
-
-
- set parquet.compression=SNAPPY ;
- create table page_views_parquet_snappy(
- track_time string,
- url string,
- session_id string,
- referer string,
- ip string,
- end_user_id string,
- city_id string
- )
- ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
- STORED AS parquet;
- insert into table page_views_parquet_snappy select * from page_views ;
- dfs -du -h /user/hive/warehouse/page_views_parquet_snappy/ ;
-
- 总结:
- 在实际的项目开发当中,hive表的数据
- * 存储格式
- orcfile / parquet
- * 数据压缩
- snappy
-
- -------------------------------------测试ORC与Snappy压缩结合使用----------------------------------------
-
-
-
- -------------------------------------使用正则表达式创建表----------------------------------------
- drop table if exists default.bf_log_src ;
- create table IF NOT EXISTS default.bf_log_src (
- remote_addr string,
- remote_user string,
- time_local string,
- request string,
- status string,
- body_bytes_sent string,
- request_body string,
- http_referer string,
- http_user_agent string,
- http_x_forwarded_for string,
- host string
- )
- ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
- WITH SERDEPROPERTIES (
- "input.regex" = "(\"[^ ]*\") (\"-|[^ ]*\") (\"[^\]]*\") (\"[^\"]*\") (\"[0-9]*\") (\"[0-9]*\") (-|[^ ]*) (\"[^ ]*\") (\"[^\"]*\") (-|[^ ]*) (\"[^ ]*\")"
- )
- STORED AS TEXTFILE;
-
- load data local inpath '/opt/datas/moodle.iroot.access.log' into table default.bf_log_src ;
-
-
- ---------------------------------------创建orc格式并且使用Snappy压缩的表格----------------------------------------
- drop table if exists default.bf_log_comm ;
- create table IF NOT EXISTS default.bf_log_comm (
- remote_addr string,
- time_local string,
- request string,
- http_referer string
- )
- ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
- STORED AS orc tblproperties ("orc.compress"="SNAPPY");
-
- insert into table default.bf_log_comm select remote_addr, time_local, request,http_referer from default.bf_log_src ;
-
- select * from bf_log_comm limit 5 ;
-
-
-
-
-
- ---------------------------------------定义UDF,对原表数据进行清洗----------------------------------------
-
-
- 第一个udf
- 去除引号
- add jar /opt/datas/hiveudf2.jar ;
- create temporary function my_removequotes as "com.root.senior.hive.udf.RemoveQuotesUDF" ;
-
- insert overwrite table default.bf_log_comm select my_removequotes(remote_addr), my_removequotes(time_local), my_removequotes(request), my_removequotes(http_referer) from default.bf_log_src ;
-
- select * from bf_log_comm limit 5 ;
-
-
- -----------------------------------------------Sqoop-----------------------------------------
-
-
- RDBMS以Mysql数据库为例讲解,拷贝jdbc驱动包到$SQOOP_HOME/lib目录下
-
- cp /opt/softwares/mysql-libs/mysql-connector-java-5.1.27/mysql-connector-java-5.1.27-bin.jar
- /opt/modules/sqoop-1.4.5-cdh5.3.6/lib/
-
- bin/sqoop list-databases \
- --connect jdbc:mysql://hadoop.zxk.com:3306 \
- --username root \
- --password 123456
-
-
- -----------------------------------------------Sqoop Import使用-----------------------------------------
-
- bin/sqoop import \
- --connect jdbc:mysql://hadoop.zxk.com:3306/test \
- --username root \
- --password 123456 \
- --table my_user
-
- ------------------------------------------Sqoop Import使用设置路径与mappers-----------------------------
- bin/sqoop import \
- --connect jdbc:mysql://hadoop.zxk.com:3306/test \
- --username root \
- --password 123456 \
- --table my_user \
- --target-dir /user/root/sqoop/imp_my_user \
- --num-mappers 1
-
- -----------------------------------Import Hdfs文件系统 Sqoop设置文件存储格式为parquetfile-------------------
- sqoop 底层的实现就是MapReduce,import来说,仅仅运行Map Task
-
- bin/sqoop import \
- --connect jdbc:mysql://hadoop.zxk.com:3306/test \
- --username root \
- --password 123456 \
- --table my_user \
- --target-dir /user/root/sqoop/imp_my_user_parquet \
- --fields-terminated-by ',' \
- --num-mappers 1 \
- --as-parquetfile
-
-
- ------------------------------------Import Hdfs文件系统 Sqoop设置数据的一部分列----------------------------
-
- bin/sqoop import \
- --connect jdbc:mysql://hadoop.zxk.com:3306/test \
- --username root \
- --password 123456 \
- --table my_user \
- --target-dir /user/root/sqoop/imp_my_user_column \
- --num-mappers 1 \
- --columns id,account
-
-
- ----------------------------------Import Hdfs文件系统 -Sqoop设置数据的一部分列--------------------------------
-
- bin/sqoop import \
- --connect jdbc:mysql://hadoop.zxk.com:3306/test \
- --username root \
- --password 123456 \
- --table my_user \
- --target-dir /user/root/sqoop/imp_my_user_column \
- --num-mappers 1 \
- --columns id,account
-
- -----------------------------------Import Hdfs文件系统 Sqoop使用查询语句Query----------------------------------
- bin/sqoop import \
- --connect jdbc:mysql://hadoop.zxk.com:3306/test \
- --username root \
- --password 123456 \
- --query 'select id, account from my_user where $CONDITIONS' \
- --target-dir /user/root/sqoop/imp_my_user_query \
- --num-mappers 1
-
- -----------------------------------Import Hdfs文件系统 Sqoop使用Snappy压缩-------------------------------------
-
- bin/sqoop import \
- --connect jdbc:mysql://hadoop.zxk.com:3306/test \
- --username root \
- --password 123456 \
- --table my_user \
- --target-dir /user/root/sqoop/imp_my_sannpy \
- --delete-target-dir \
- --num-mappers 1 \
- --compress \
- --compression-codec org.apache.hadoop.io.compress.SnappyCodec \
- --fields-terminated-by ','
-
-
-
- drop table if exists default.hive_user_snappy ;
- create table default.hive_user_snappy(
- id int,
- username string,
- password string
- )
- ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ;
-
- load data inpath '/user/root/sqoop/imp_my_sannpy' overwrite into table default.hive_user_snappy ;
-
-
-
-
- ----------------------------Import Hdfs文件系统 增量数据的导入(incremental)--------------------------
-
- 有一个唯一标识符,通常这个表都有一个字段,类似于插入时间createtime
- * query
- where createtime => 20150924000000000 and createtime < 20150925000000000
- * sqoop
- Incremental import arguments:
- --check-column <column> Source column to check for incremental
- change
- --incremental <import-type> Define an incremental import of type
- 'append' or 'lastmodified'
- --last-value <value> Last imported value in the incremental
- check column
- bin/sqoop import \
- --connect jdbc:mysql://hadoop.zxk.com:3306/test \
- --username root \
- --password 123456 \
- --table my_user \
- --target-dir /user/root/sqoop/imp_my_incr \
- --num-mappers 1 \
- --incremental append \
- --check-column id \
- --last-value 4
-
- ----------------------------Import Hdfs文件系统 增量数据的导入(direct)--------------------------
- bin/sqoop import \
- --connect jdbc:mysql://hadoop.zxk.com:3306/test \
- --username root \
- --password 123456 \
- --table my_user \
- --target-dir /user/root/sqoop/imp_my_incr \
- --num-mappers 1 \
- --delete-target-dir \
- --direct
-
-
-
- ----------------------------将Hdfs文件系统文件数据导入MySQL的表中--------------------------
- touch /opt/datas/user.txt
- vi /opt/datas/user.txt
- 12,root,root
- 13,xuanyun,xuanyu
-
- bin/hdfs dfs -mkdir -p /user/root/sqoop/exp/user/
- bin/hdfs dfs -put /opt/datas/user.txt /user/root/sqoop/exp/user/
-
-
- bin/sqoop export \
- --connect jdbc:mysql://hadoop.zxk.com:3306/test \
- --username root \
- --password 123456 \
- --table my_user \
- --export-dir /user/root/sqoop/exp/user/ \
- --num-mappers 1
-
-
-
- ----------------------------使用Sqoop将MySQL表中的数据导入Hive中---------------------------
-
- use default ;
- drop table if exists user_hive ;
- create table user_hive(
- id int,
- account string,
- password string
- )
- ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ;
-
- bin/sqoop import \
- --connect jdbc:mysql://hadoop.zxk.com:3306/test \
- --username root \
- --password 123456 \
- --table my_user \
- --fields-terminated-by ',' \
- --delete-target-dir \
- --num-mappers 1 \
- --hive-import \
- --hive-database default \
- --hive-table user_hive
-
-
- ----------------------------使用Sqoop将Hive表中数据(HDFS文件系统)中的数据导入MySQL中---------------------------
- CREATE TABLE `my_usert` (
- `id` tinyint(4) NOT NULL AUTO_INCREMENT,
- `account` varchar(255) DEFAULT NULL,
- `passwd` varchar(255) DEFAULT NULL,
- PRIMARY KEY (`id`)
- );
-
-
- bin/sqoop export \
- --connect jdbc:mysql://hadoop.zxk.com:3306/test \
- --username root \
- --password 123456 \
- --table my_usert \
- --export-dir /user/hive/warehouse/user_hive \
- --num-mappers 1 \
- --input-fields-terminated-by ','
-
-
-
-
- ---------------------------------------------Flume的基本配置--------------------------------------------
-
- bin/flume-ng
- Usage: bin/flume-ng <command> [options]...
-
- commands:
- agent run a Flume agent
-
- global options:
- --conf,-c <conf> use configs in <conf> directory
- -Dproperty=value sets a Java system property value
-
- agent options:
- --name,-n <name> the name of this agent (required)
- --conf-file,-f <file> specify a config file (required if -z missing)
-
-
- bin/flume-ng agent --conf conf --name agent-test --conf-file test.conf -Dflume.root.logger=DEBUG,console
- bin/flume-ng agent -c conf -n agent-test -f test.conf
-
- bin/flume-ng agent --conf conf --name a1 --conf-file conf/a1.conf -Dflume.root.logger=DEBUG,console
-
- bin/flume-ng agent --conf conf --name a2 --conf-file conf/flume-tail.conf -Dflume.root.logger=DEBUG,console
-
-
-
- ---------------------------------------------Oozie的基本命令--------------------------------------------
-
-
-
- oozie job -oozie http://hadoop.zxk.com:11000/oozie -config examples/apps/map-reduce/job.properties -run
-
- export OOZIE_URL=http://hadoop.zxk.com:11000/oozie
- oozie job -config oozie-apps/mr-wordcount-wf/job.properties -run
-
-
- ---------------------------------------------Oozie WorkFlow的基本使用-------------------------------------
-
-
- ---------------------------------------------Oozie WorkFlow的MapReduce基本使用----------------------------
-
- 如何定义一个WorkFlow
- 第一步:
- * 生成job.properties文件 -->指向 workflow.xml文件所在的HDFS位置。
- 第二步:
- * 编写 workflow.xml文件
- * 编写步骤:
- * 编写流程控制节点
- * 第一步:定义XML文件
- * 第二步:编写节点
- * start 节点
- * action 节点(包含 MapReduce Hive Sqoop Shell)
- * 当action节点执行成功时跳转
- (1):--->>> action 节点
- (2):--->>> end 节点
- * 当action节点执行失败时跳转
- (1):--->>> fail 节点
- (2):--->>> kill 节点(杀死线程)
- (3):--->>> end 节点
- * end 节点
- * 编写Action节点
- * 关键点: 如何使用Oozie调用MapReduce程序
- * 将以前Java MapReduce 程序中的【Driver】部分的配置 转换为 workflow.xml 文件中的 configuration 部分配置
- 【注意】 目前的Workflow.XML文件支持的是旧MapReduce API 需要加上配置
-
- <property>
- <name>mapred.mapper.new-api</name>
- <value>true</value>
- </property>
- <property>
- <name>mapred.reducer.new-api</name>
- <value>true</value>
- </property>
-
- 第三步:
- * 将生成的jar包上传到lib目录下
-
- 第四步:
- * 使用Hadoop将文件夹上传到HDFS文件系统中
- bin/hdfs dfs -put /opt/modules/oozie-4.0.0-cdh5.3.6/oozie-apps/*文件夹名称/ oozie-apps/
- 第五步:
- * 在Ooize安装目录下运行命令
- export OOZIE_URL=http://hadoop.zxk.com:11000/oozie
- oozie job -config oozie-apps/*文件夹名称/job.properties -run
-
- ---------------------------------------------Oozie workflow.xml 样例与job.properties 样例-------------------------------------------
-
- job.properties
-
- nameNode=hdfs://hadoop.zxk.com:8020
- jobTracker=hadoop.zxk.com:8032
- queueName=default
- oozieAppsRoot=user/root/oozie-apps
- oozieDataRoot=user/root/oozie/datas
-
- oozie.wf.application.path=${nameNode}/${oozieAppsRoot}/mr-wordcount-wf/workflow.xml
- inputDir=mr-wordcount-wf/input
- outputDir=mr-wordcount-wf/output
-
-
- workflow.xml
-
- <workflow-app xmlns="uri:oozie:workflow:0.5" name="mr-wordcount-wf">
- <start to="mr-node-wordcount"/>
- <action name="mr-node-wordcount">
- <map-reduce>
- <job-tracker>${jobTracker}</job-tracker>
- <name-node>${nameNode}</name-node>
- <prepare>
- <delete path="${nameNode}/${oozieDataRoot}/${outputDir}"/>
- </prepare>
- <configuration>
- <property>
- <name>mapred.mapper.new-api</name>
- <value>true</value>
- </property>
- <property>
- <name>mapred.reducer.new-api</name>
- <value>true</value>
- </property>
- <property>
- <name>mapreduce.job.queuename</name>
- <value>${queueName}</value>
- </property>
- <property>
- <name>mapreduce.job.map.class</name>
- <value>com.iroot.hadoop.senior.mapreduce.WordCount$WordCountMapper</value>
- </property>
- <property>
- <name>mapreduce.job.reduce.class</name>
- <value>com.iroot.hadoop.senior.mapreduce.WordCount$WordCountReducer</value>
- </property>
-
- <property>
- <name>mapreduce.map.output.key.class</name>
- <value>org.apache.hadoop.io.Text</value>
- </property>
- <property>
- <name>mapreduce.map.output.value.class</name>
- <value>org.apache.hadoop.io.IntWritable</value>
- </property>
- <property>
- <name>mapreduce.job.output.key.class</name>
- <value>org.apache.hadoop.io.Text</value>
- </property>
- <property>
- <name>mapreduce.job.output.value.class</name>
- <value>org.apache.hadoop.io.IntWritable</value>
- </property>
- <property>
- <name>mapreduce.input.fileinputformat.inputdir</name>
- <value>${nameNode}/${oozieDataRoot}/${inputDir}</value>
- </property>
- <property>
- <name>mapreduce.output.fileoutputformat.outputdir</name>
- <value>${nameNode}/${oozieDataRoot}/${outputDir}</value>
- </property>
- </configuration>
- </map-reduce>
- <ok to="end"/>
- <error to="fail"/>
- </action>
- <kill name="fail">
- <message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
- </kill>
- <end name="end"/>
- </workflow-app>
-
-
-
- ---------------------------------------------Oozie WorkFlow的Hive基本使用----------------------------
-
-
- 如何定义一个WorkFlow
- 第一步:
- * 生成job.properties文件 -->指向 workflow.xml文件所在的HDFS位置。
- 第二步:
- * 编写 workflow.xml文件
- * 编写步骤:
- * 编写流程控制节点
- * 第一步:定义XML文件
- * 第二步:编写节点
- * start 节点
- * action 节点(包含 MapReduce Hive Sqoop Shell)
- * 当action节点执行成功时跳转
- (1):--->>> action 节点
- (2):--->>> end 节点
- * 当action节点执行失败时跳转
- (1):--->>> fail 节点
- (2):--->>> kill 节点(杀死线程)
- (3):--->>> end 节点
- * end 节点
- * 编写Action节点
- 【注意】 目前的Workflow.XML文件支持旧MapReduce API 不需要配置
-
- 第三步:
- * 将生成的mysql的jar包上传到lib目录下
- 第四步:
- * 将Hive的配置文件hive.site.xml 上传到目录下
- 第五步:
- * 使用Hadoop将文件夹上传到HDFS文件系统中
- bin/hdfs dfs -put /opt/modules/oozie-4.0.0-cdh5.3.6/oozie-apps/*文件夹名称/ oozie-apps/
- 第六步:
- * 在Ooize安装目录下运行命令
- export OOZIE_URL=http://hadoop.zxk.com:11000/oozie
- oozie job -config oozie-apps/*文件夹名称/job.properties -run
-
- job.properties样例
-
- nameNode=hdfs://hadoop.zxk.com:8020
- jobTracker=hadoop.zxk.com:8032
- queueName=default
- oozieAppsRoot=user/root/oozie-apps
- oozieDataRoot=user/root/oozie/datas
- oozie.use.system.libpath=true
- oozie.wf.application.path=${nameNode}/${oozieAppsRoot}/hive-select/
- outputDir=hive-select/output
-
-
-
- workflow.xml样例
-
- <workflow-app xmlns="uri:oozie:workflow:0.5" name="wf-hive-select">
- <start to="hive-node"/>
-
- <action name="hive-node">
- <hive xmlns="uri:oozie:hive-action:0.2">
- <job-tracker>${jobTracker}</job-tracker>
- <name-node>${nameNode}</name-node>
- <prepare>
- <delete path="${nameNode}/${oozieDataRoot}/${outputDir}"/>
- </prepare>
- <job-xml>${nameNode}/${oozieAppsRoot}/hive-select/hive-site.xml</job-xml>
- <configuration>
- <property>
- <name>mapred.job.queue.name</name>
- <value>${queueName}</value>
- </property>
- </configuration>
- <script>select-dept.sql</script>
- <param>OUTPUT=${nameNode}/${oozieDataRoot}/${outputDir}</param>
- </hive>
- <ok to="end"/>
- <error to="fail"/>
- </action>
-
- <kill name="fail">
- <message>Hive failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
- </kill>
- <end name="end"/>
- </workflow-app>
-
-
-
-
-
- ---------------------------------------------Oozie WorkFlow的Sqoop基本使用----------------------------
-
- 如何定义一个WorkFlow
- 第一步:
- * 生成job.properties文件 -->指向 workflow.xml文件所在的HDFS位置。
- 第二步:
- * 编写 workflow.xml文件
- * 编写步骤:
- * 编写流程控制节点
- * 第一步:定义XML文件
- * 第二步:编写节点
- * start 节点
- * action 节点(包含 MapReduce Hive Sqoop Shell)
- * 当action节点执行成功时跳转
- (1):--->>> action 节点
- (2):--->>> end 节点
- * 当action节点执行失败时跳转
- (1):--->>> fail 节点
- (2):--->>> kill 节点(杀死线程)
- (3):--->>> end 节点
- * end 节点
- * 编写Action节点
- 【注意】 目前的Workflow.XML文件支持旧MapReduce API 不需要配置
- 【注意】 <command></command>中的语句的编写依据Sqoopde 规则
- 第三步:
- * 将生成的mysql的jar包上传到lib目录下
- 第四步:
- * 使用Hadoop将文件夹上传到HDFS文件系统中
- bin/hdfs dfs -put /opt/modules/oozie-4.0.0-cdh5.3.6/oozie-apps/*文件夹名称/ oozie-apps/
- 第五步:
- * 在Ooize安装目录下运行命令
- export OOZIE_URL=http://hadoop.zxk.com:11000/oozie
- oozie job -config oozie-apps/*文件夹名称/job.properties -run
-
- job.properties样例
-
- nameNode=hdfs://hadoop.zxk.com:8020
- jobTracker=hadoop.zxk.com:8032
- queueName=default
- oozieAppsRoot=user/root/oozie-apps
- oozieDataRoot=user/root/oozie/datas
- oozie.use.system.libpath=true
- oozie.wf.application.path=${nameNode}/${oozieAppsRoot}/sqoop-import-user
- outputDir=sqoop-import-user/output
-
-
-
- workflow.xml样例
-
- <workflow-app xmlns="uri:oozie:workflow:0.5" name="sqoop-wf">
- <start to="sqoop-node"/>
-
- <action name="sqoop-node">
- <sqoop xmlns="uri:oozie:sqoop-action:0.3">
- <job-tracker>${jobTracker}</job-tracker>
- <name-node>${nameNode}</name-node>
- <prepare>
- <delete path="${nameNode}/${oozieDataRoot}/${outputDir}"/>
- </prepare>
- <configuration>
- <property>
- <name>mapred.job.queue.name</name>
- <value>${queueName}</value>
- </property>
- </configuration>
- <command>import --connect jdbc:mysql://hadoop.zxk.com:3306/test --username root --password 123456 --table my_user --target-dir /user/root/oozie/datas/sqoop-import-user/output --fields-terminated-by "," --num-mappers 1</command>
- </sqoop>
- <ok to="end"/>
- <error to="fail"/>
- </action>
-
- <kill name="fail">
- <message>Sqoop failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
- </kill>
- <end name="end"/>
- </workflow-app>
-
-
-
-
-
- ---------------------------------------------Oozie WorkFlow的Shell脚本基本使用----------------------------
-
- 如何定义一个WorkFlow
- 第一步:
- * 生成job.properties文件 -->指向 workflow.xml文件所在的HDFS位置。
-
- * exec=dept-select.sh (定义脚本的位置与名称)
- * script=dept-select.sql (定义SQL语句的位置与名称)
- 第二步:
- * 编写 workflow.xml文件
- * 编写步骤:
- * 编写流程控制节点
- * 第一步:定义XML文件
- * 第二步:编写节点
- * start 节点
- * action 节点(包含 MapReduce Hive Sqoop Shell)
- * 当action节点执行成功时跳转
- (1):--->>> action 节点
- (2):--->>> end 节点
- * 当action节点执行失败时跳转
- (1):--->>> fail 节点
- (2):--->>> kill 节点(杀死线程)
- (3):--->>> end 节点
- * end 节点
- * 编写Action节点
- 【注意】 目前的Workflow.XML文件支持旧MapReduce API 不需要配置
- 【注意】 <file></file>中的路径是指HDFS文件系统中的目录
- <exec>${exec}</exec>
- <file>${nameNode}/${oozieAppsRoot}/shell-hive-select/${exec}#${exec}</file>
- <file>${nameNode}/${oozieAppsRoot}/shell-hive-select/${script}#${script}</file>
- 第三步:
- * 编写脚本文件
-
- #!/usr/bin/env bash
-
- /opt/modules/hive-0.13.1/bin/hive -f dept-select.sql
- 第四步(可选):
- * 编写SQL语句
- insert overwrite directory '/user/root/oozie/datas/shell-hive-select/output'
- select
- deptno, dname
- from default.dept ;
- 第五步:
- * 使用Hadoop将文件夹上传到HDFS文件系统中
- bin/hdfs dfs -put /opt/modules/oozie-4.0.0-cdh5.3.6/oozie-apps/*文件夹名称/ oozie-apps/
- 第六步:
- * 在Ooize安装目录下运行命令
- export OOZIE_URL=http://hadoop.zxk.com:11000/oozie
- oozie job -config oozie-apps/*文件夹名称/job.properties -run
-
- job.properties样例
-
- nameNode=hdfs://hadoop.zxk.com:8020
- jobTracker=hadoop.zxk.com:8032
- queueName=default
- oozieAppsRoot=user/root/oozie-apps
- oozieDataRoot=user/root/oozie/datas
- oozie.wf.application.path=${nameNode}/${oozieAppsRoot}/shell-hive-select
- exec=dept-select.sh
- script=dept-select.sql
-
-
-
- workflow.xml样例
-
- <workflow-app xmlns="uri:oozie:workflow:0.5" name="shell-wf">
- <start to="shell-node"/>
- <action name="shell-node">
- <shell xmlns="uri:oozie:shell-action:0.2">
- <job-tracker>${jobTracker}</job-tracker>
- <name-node>${nameNode}</name-node>
- <configuration>
- <property>
- <name>mapred.job.queue.name</name>
- <value>${queueName}</value>
- </property>
- </configuration>
- <exec>${exec}</exec>
- <file>${nameNode}/${oozieAppsRoot}/shell-hive-select/${exec}#${exec}</file>
- <file>${nameNode}/${oozieAppsRoot}/shell-hive-select/${script}#${script}</file>
- <capture-output/>
- </shell>
- <ok to="end"/>
- <error to="fail"/>
- </action>
- <kill name="fail">
- <message>Shell action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
- </kill>
- <end name="end"/>
- </workflow-app>
-
-
-
-
-
-
- ----------------------------Oozie WorkFlow的两个及以上Action编写以及运行----------------------------
-
-
- 依据不同的情况:组合Hadoop,Hive,Sqoop,Shell以及coordinator文件看是否上传hive-site.xml文件以及lib目录
-
-
- job.properties文件
-
-
- nameNode=hdfs://hadoop.zxk.com:8020
- jobTracker=hadoop.zxk.com:8032
- queueName=default
- oozieAppsRoot=user/root/oozie-apps
- oozieDataRoot=user/root/oozie/datas
- oozie.use.system.libpath=true
- oozie.wf.application.path=${nameNode}/${oozieAppsRoot}/wf-user-select/
- #oozie.coord.application.path=${nameNode}/${oozieAppsRoot}/wf-user-select
- #start=2015-10-15T00:00+0800
- #end=2015-10-26T00:00+0800
- #workflowAppUri=${nameNode}/${oozieAppsRoot}/wf-user-select
- outputDir=wf-user-select/output
-
-
- Workflow文件
-
- workflow-app xmlns="uri:oozie:workflow:0.5" name="wf-user-select">
- <start to="hive-node"/>
-
- <action name="hive-node">
- <hive xmlns="uri:oozie:hive-action:0.5">
- <job-tracker>${jobTracker}</job-tracker>
- <name-node>${nameNode}</name-node>
- <prepare>
- <delete path="${nameNode}/${oozieDataRoot}/${outputDir}"/>
- </prepare>
- <job-xml>${nameNode}/${oozieAppsRoot}/hive-select/hive-site.xml</job-xml>
- <configuration>
- <property>
- <name>mapred.job.queue.name</name>
- <value>${queueName}</value>
- </property>
- </configuration>
- <script>select-user.sql</script>
- <param>OUTPUT=${nameNode}/${oozieDataRoot}/${outputDir}</param>
- </hive>
- <ok to="sqoop-node"/>
- <error to="fail"/>
- </action>
-
- <action name="sqoop-node">
- <sqoop xmlns="uri:oozie:sqoop-action:0.3">
- <job-tracker>${jobTracker}</job-tracker>
- <name-node>${nameNode}</name-node>
- <configuration>
- <property>
- <name>mapred.job.queue.name</name>
- <value>${queueName}</value>
- </property>
- </configuration>
- <command>export --connect jdbc:mysql://hadoop.zxk.com:3306/test --username root --password 123456 --table my_user --num-mappers 1 --fields-terminated-by "," --export-dir /user/root/oozie/datas/wf-user-select/output</command>
- </sqoop>
- <ok to="end"/>
- <error to="fail"/>
- </action>
-
- <kill name="fail">
- <message>Hive failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
- </kill>
- <end name="end"/>
- </workflow-app>
-
-
-
- ------------------------------------------Hue 集成时HDFS与YARN的配置---------------------------------------------------
- core-site.xml配置
-
- <property>
- <name>hadoop.proxyuser.hue.hosts</name>
- <value>*</value>
- </property>
- <property>
- <name>hadoop.proxyuser.hue.groups</name>
- <value>*</value>
- </property>
-
-
- hue.ini
-
- secret_key=jFE93j;2[290-eiw.KEiwN2s3['d;/.q[eIW^y#e=+Iei*@Mn<qW5o
- http_host=hadoop.zxk.com
- http_port=8888
- time_zone=Asia/Shanghai
- django_debug_mode=false
- http_500_debug_mode=false
- [[hdfs_clusters]]
- # HA support by using HttpFs
- [[[default]]]
- fs_defaultfs=hdfs://hadoop.zxk.com:8020
- webhdfs_url=http://hadoop.zxk.com:50070/webhdfs/v1
- hadoop_conf_dir=/opt/modules/hadoop-2.5.0/etc/hadoop
- # Configuration for YARN (MR2)
- [[yarn_clusters]]
- [[[default]]]
- resourcemanager_host=hadoop.zxk.com
- resourcemanager_port=8032
- submit_to=True
- resourcemanager_api_url=http://hadoop.zxk.com:8088
- proxy_api_url=http://hadoop.zxk.com:8088
- history_server_api_url=http://hadoop.zxk.com:19888
- [filebrowser]
- archive_upload_tempdir=/tmp
- ------------------------------------------Hue 集成时Hive的配置------------------------------------------------
- 启动 HiveServer2 bin/bin/hiveserver2
- 启动 Metastore bin/hive --service metastore
- hive-site.xml
- <!--HiveServer2 -->
- <property>
- <name>hive.server2.thrift.port</name>
- <value>10000</value>
- </property>
- <property>
- <name>hive.server2.thrift.bind.host</name>
- <value>hadoop.zxk.com</value>
- </property>
-
- <!--Remote Metastore -->
-
- <property>
- <name>hive.metastore.uris</name>
- <value>thrift://hadoop.zxk.com:9083</value>
- </property>
-
-
- hue.ini
- [beeswax]
- hive_server_host=hadoop.zxk.com
- hive_server_port=10000
- hive_conf_dir=/opt/modules/hive-0.13.1/conf
- server_conn_timeout=120
-
- [librdbms]
- [[databases]]
- [[[sqlite]]]
- nice_name=SQLite
- name=/opt/app/hue-3.7.0-cdh5.3.6/desktop/desktop.db
- engine=sqlite
- [[[mysql]]]
- nice_name="My SQL DB"
- name=test
- engine=mysql
- host=hadoop.zxk.com
- port=3306
- user=root
- password=123456
-
-
- ------------------------------------------Hue 集成时Oozie的配置------------------------------------------------
- 启动Oozie bin/oozied.sh start
- -----------------------------------------Hue 集成 Oozie时的share目录-----------------------------------------
- bin/oozie-setup.sh sharelib create \
- -fs hdfs://hadoop.zxk.com:8020 \
- -locallib oozie-sharelib-4.0.0-cdh5.3.6-yarn.tar.gz
- ----------------------------------------HBase Shell 的使用----------------------------------------------------
- 在 Xshell 中 使用 hbase shell 进入后 无法删除 问题:
- 在hbase shell下,误输入的指令不能使用backspace和delete删除
- 进入到XShell 文件 --> 属性 --> 终端 --> 键盘
- 在 DELETE键序列 和 BACKSPACE键序列 中都选择 ASCII 127
- 表的管理
- 1)查看有哪些表
- hbase(main):002:0> list
- 2)创建表
- # 语法:create <table>, {NAME => <family>, VERSIONS => <VERSIONS>}
- # 例如:创建表t1,有两个family name:f1,f2,且版本数均为2
- hbase(main):002:0> create 't1',{NAME => 'f1', VERSIONS => 2},{NAME => 'f2', VERSIONS => 2}
- hbase(main):002:0> create 'user','info'
- 2-1)查看表的结构
-
- hbase(main):002:0> describe 表名
- 2-2)插入数据
-
- put 'user','10001','info:name','zx'
- put 'user','10001','info:age','22'
- put 'user','10001','info:sex','female'
- put 'user','10001','info:address','henan'
-
- put 'user','10002','info:name','cl'
- put 'user','10002','info:age','22'
- put 'user','10002','info:address','henan'
-
- put 'user','10003','info:name','zxk'
- put 'user','10003','info:age','22'
- 2-3)查询数据
-
- get 'user','10001'
- get 'user','10001','info:address'
- scan 'user'
- scan 'user', {COLUMNS => ['info:name', 'info:age'], STARTROW => '10002'}
- 3)删除表
-
- 分两步:首先disable,然后drop
- 例如:删除表t1
- hbase(main):002:0> disable 't1'
- hbase(main):002:0> drop 't1'
- 4)查看表的结构
- # 语法:describe <table>
- # 例如:查看表t1的结构
- hbase(main):002:0> describe 't1'
- 5)修改表结构
- 修改表结构必须先disable
- # 语法:alter 't1', {NAME => 'f1'}, {NAME => 'f2', METHOD => 'delete'}
- # 例如:修改表test1的cf的TTL为180天
- hbase(main):002:0> disable 'test1'
- hbase(main):002:0> alter 'test1',{NAME=>'body',TTL=>'15552000'},{NAME=>'meta', TTL=>'15552000'}
- hbase(main):002:0> enable 'test1'
-
-
-
- ------------------------------------------HBase 数据存储----------------------------------------------------------
- 一、HBase数据检索流程
- (1)客户端读或写一个表的数据,首先链接Zookeeper,因为需要到Zookeeper中找读的数据,表是
- 通过Region来管理,每个Region由RegionServer管理,每个Region都有startkey及endkey。
- (2)HBase的表格分为User Tables(用户表)和Catalog Tables(系统自带表)。
- (3)User Tables(用户表)包含user信息、region信息(startkey和endkey)。例:user表的
- region-01存在regionserver-03中。该信息是保存在meta-table中。
- (4)在HBase新版本中,有类似于RDBMS(关系数据库管理系统)中DataBase的命名空间的概念。
- HBase的所有表都在data目录下,data下包含default目录和hbase目录,这里的目录就是命名空间的概念。
- (5)用户自定义的表默认情况下命名空间为default,而系统自带的元数据表的命名空间为hbase。
- (6)meta表只有一个Region,它的Region也需要RegionServer管理,即为meta-region-server的功能。
- 用户首先找到meta-region-server,然后找到meta表,scan命令可以看到表格中column被什么server管理。
- (7)综上所述,用户表由很多region组成,region信息存储在hbase:meta中。用户表的每一个region
- 都有key。Client需要先读zookeeper,其实通过meta-region-server找到的是meta表的region,找到后扫描
- meta表的数据,然后再找到数据再操作。
- 二、HBase数据存储
- 2.1 HBase结构详解
- HBase能高速实现数据存储和访问源于HBase数据存储。
- 1. 连接Zookeeper,从Zookeeper中找到要读的数据。我们需要知道表中RowKey在region的位置。
- 2. 客户端查找HRegionServer,HRegionServer管理众多Region。
- 3. HMaster也需要连接Zookeeper,连接的作用是:HMaster需要知道哪些HRegionServer是活动
- 的及HRegionServer所在的位置,然后管理HRegionServer。
- 4. HBase内部是把数据写到HDFS上的,DFS有客户端。
- 5. Region中包含HLog、Store。一张表有几个列簇,就有几个Store。Store中有很多memStore
- 及StoreFile。StoreFile是对HFile的封装。StoreFile真正存储在HDFS上。
- 6. 写数据时,先往HLog上写一份,再往memStore上写一份。当memStore达到一定大小则往StoreFile上写。
- 若memStore数据有丢失,则从HLog上恢复。
- 7. 读数据先到memStore上读,再到StoreFile上读,之后合并。
- 2.2 HBase数据存储详解
- 1. HBase中的所有数据文件都存储在Hadoop HDFS文件系统上,主要包括两种文件类型:
- 1)HFile:HBase中KeyValue数据的存储格式,HFile是Hadoop的二进制格式文件,
- 实际上StoreFIle就是对HFile做了轻量级的包装,进行数据的存储。
- 2)HLog File:HBase中WAL(Write Ahead Log)的存储格式,物理上是Hadoop的Sequence File。
- 2. HRegionServer内部管理了一系列HRegion对象,每个HRegion对应了table中的一个region,
- HRegion中由多个HStore组成。每个HStore对应了Table中的一个column family的存储,可以看出
- 每个columnfamily其实就是一个集中的存储单元,因此最好将具备共同IO特性的column放在一个column family中,
- 这样最高效。
- 3. HStore存储是HBase存储的核心,由两部分组成,一部分是MemStore,一部分是StoreFile。
- 4. MemStore是 Sorted Memory Buffer,用户写入的数据首先会放入MemStore,当MemStore满了以后
- 会Flush成一个StoreFile(底层实现是HFile)。
- 5. HLog 文件结构:WAL意为Write ahead log,类似Mysql中的binlog,用来做灾难恢复。
- Hlog记录数据的所有变更,一旦数据修改,就可以从log中进行恢复。
- 6. 每个HRegionServer维护一个HLog,而不是每个HRegion一个。这样不同region(来自不同table)
- 的日志会混在一起,这样做的目的是不断追加单个文件,相对于同时写多个文件而言,可以减少
- 磁盘寻址次数,因此可以提高对table的写性能。带来的麻烦是,如果一台HRegionServer下线,
- 为了恢复其上的region,需要将HRegionServer上的log进行拆分,然后分发到其它HRegionServer
- 上进行恢复。
- 2.3 用户写入数据流程
- 1. Client客户端写入数据后 -> 数据存入MemStore,一直到MemStore满之后 Flush成一个StoreFile,
- 直至增长到一定阈值 -> 触发Compact合并操作 -> 多个StoreFile合并成一个StoreFile。
- 2. 同时进行版本合并和数据删除 -> 当StoreFiles Compact后,
- 逐步形成越来越大的StoreFile ->单个StoreFile大小超过一定阈值后,
- 触发Split操作,把当前Region分成2个Region,Region会下线,新分出的2个孩子Region会被HMaster分配到
- 相应的HRegionServer上,使得原先1个Region的压力得以分流到2个Region上。
-
-
-
- export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
- export HADOOP_HOME=/opt/modules/hadoop-2.5.0
- HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp` $HADOOP_HOME/bin/yarn jar $HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar
- CellCounter: Count cells in HBase table
- completebulkload: Complete a bulk data load.
- copytable: Export a table from local cluster to peer cluster
- export: Write table data to HDFS.
- import: Import data written by Export.
- importtsv: Import data in TSV format.
- rowcounter: Count rows in HBase table
- verifyrep: Compare the data from tables in two different clusters. WARNING: It doesn't work for incrementColumnValues'd cells since the timestamp is changed after being appended to the log.
-
-
- ------------------------------------------HBase 与MapReduce集成------------------------------------------
- (1)上传Jar包
- (2)export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
- (3)export HADOOP_HOME=/opt/modules/hadoop-2.5.0
- (4)HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp` $HADOOP_HOME/bin/yarn jar $HADOOP_HOME/jars/hbase-mr-user2basic.jar
- ------------------------------------------HBase Importtsv的使用----------------------------------------
- 10001 21 male henan 13243169133
- 10002 22 male yunnan 13243169133
- 10003 23 male dali 13243169133
- 10004 24 male henan 13243169133
- 10005 25 male henan 13243169133
- export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
- export HADOOP_HOME=/opt/modules/hadoop-2.5.0
- HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp` $HADOOP_HOME/bin/yarn jar \
- $HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar importtsv \
- -Dimporttsv.columns=HBASE_ROW_KEY,\
- info:name,info:age,info:sex,info:address,info:phone \
- student \
- hdfs://hadoop.zxk.com:8020/user/root/hbase/importtsv
- export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
- export HADOOP_HOME=/opt/modules/hadoop-2.5.0
- HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp` $HADOOP_HOME/bin/yarn jar \
- $HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar importtsv \
- -Dimporttsv.columns=HBASE_ROW_KEY,\
- info:name,info:age,info:sex,info:address,info:phone \
- -Dimporttsv.bulk.output=hdfs://hadoop.zxk.com:8020/user/root/hbase/hfileoutput \
- student \
- hdfs://hadoop.zxk.com:8020/user/root/hbase/importtsv
- export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
- export HADOOP_HOME=/opt/modules/hadoop-2.5.0
- HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp` $HADOOP_HOME/bin/yarn jar \
- $HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar \
- completebulkload \
- hdfs://hadoop.zxk.com:8020/user/root/hbase/hfileoutput \
- student
- ------------------------------------------HBase 表与命名空间NameSpace的使用----------------------------------------
- 在HBase中,默认情况创建的表,都在【default】命名空间下,在Hbase中
- 系统的命名空间
- meta
- namespace
- 命名空间NameSpace中的命令
- Group name: namespace
- Commands: alter_namespace, create_namespace, describe_namespace, drop_namespace, list_namespace, list_namespace_tables
- create 'ns1:t1', {NAME => 'f1', VERSIONS => 5}
- create 'ns1:t1', 'cf' 等同于 create 'ns1:t1', {NAME => 'f1'}
- create 'ns1:t2', {NAME => 'f1'}, {NAME => 'f2'}, {NAME => 'f3'}等同于create 'ns1:t3', 'f1', 'f2', 'f3'
-
-
- ------------------------------------------HBase 表的预分区的创建----------------------------------------
- 每一个Region会自动创建rowkeyd的范围[startkey,endkey):包括数值为startkey-->endkey-1
- 默认情况的情况下,创建一个HBase表,自动为表分配一个Region。
- 结合实际使用来看,无论是在测试环境还生产环境,我们创建好HBase一张表以后,需要往表中导入大量的数据。
- 我们会将【文件中的数据】数据转化为 【hfile文件】,通过【bulk load】 加载到hbase 的表中去。每一个【Region】
- 会被一个【RegionServer】管理。当数据过大,此时的Region分割为两个【Region】时,【RegionServer】会出问题
- 解决方案:
- 创建表时,多创建一些Region(依据表的数据rowkey进行设计,结合业务)
- 例如:五个Region
- 被多个RegionServer进行管理
- 要点:
- 在插入数据时,会向五个Region中分别插入对应的数据,均衡数据的插入
- HBase 表的预分区创建
-
- Region划分,依赖于rowkey,我们需要预先预估一些rowkey
- 案例:
- hbase> create 'ns1:t1', 'f1', SPLITS => ['10', '20', '30', '40']
- hbase> create 't1', 'f1', SPLITS => ['10', '20', '30', '40']
- hbase> create 't1', 'f1', SPLITS_FILE => 'splits.txt', OWNER => 'johndoe'
- hbase> create 't1', {NAME => 'f1', VERSIONS => 5}, METADATA => { 'mykey' => 'myvalue' }
- hbase> # Optionally pre-split the table into NUMREGIONS, using
- hbase> # SPLITALGO ("HexStringSplit", "UniformSplit" or classname)
- hbase> create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}
- hbase> create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit', CONFIGURATION => {'hbase.hregion.scan.loadColumnFamiliesOnDemand' => 'true'}}
- 案例:
- create 'bflogs', 'info', SPLITS => ['20151001000000000', '20151011000000000', '20151021000000000']
- 指定预估rowkey
- 年月日时分秒毫秒
- '20151001000000000'
- '20151011000000000'
- '20151021000000000'
- create 'bflogs2', 'info', SPLITS_FILE => '/opt/datas/bflogs-split.txt'
- 第三方式(很少使用)
- create 't11', 'f11', {NUMREGIONS => 2, SPLITALGO => 'HexStringSplit'}
- create 't12', 'f12', {NUMREGIONS => 4, SPLITALGO => 'UniformSplit'}
- -----------------------------------------HBase 表的的创建思路----------------------------------------
-
- 如何在海量数据中,获取需要的数据(查询的数据)。
- 表的rowkey设计中:
- 核心思想:
- 依据rowkey查询最快
- 对rowkey进行范围查询range
- 前缀匹配
-
-
- 索引表/辅助表(主表) 创建与设计
- 列簇:info
- 列:rowkey
- 主表和索引表的数据 如何同步
- >>> 程序,事务
- >>> phoenix
- >>> 只能使用JDBC方式,主表与索引表的数据才能同步
- 创建索引表
- >>> solr
- lily
- cloudera search
-
-
-
-
- -----------------------------------------HBase 创建表时Sanppy压缩-----------------------------------------
- 1)配置Haodop压缩
- >>> bin/hadoop checknative
- 2)配置HBase
- >>> hadoop-snappy jar -> 放入到HBase的lib目录下
- >>> 在HBase的lib目录下,创建native ,需要将本地库native ln -s 到 Hadoop 的lib目录的native下
- >>> 配置HBase-site.xml
- <property>
- <name>hbase.regionserver.codecs</name>
- <value>snappy</value>
- </property>
-
-
- -----------------------------------------HBase Memstore-----------------------------------------
- 当RegionServer(RS)收到写请求的时候(write request),RS会将请求转至相应的Region。
- 每一个Region都存储着一些列(a set of rows)。根据其列族的不同,将这些列数据存储在相应的列族中
- (Column Family,简写CF)。不同的CFs中的数据存储在各自的HStore中,HStore由一个Memstore及一系列
- HFile组成。
- Memstore位于RS的主内存中,而HFiles被写入到HDFS中。当RS处理写请求的时候,数据首先写入到
- Memstore,然后当到达一定的阀值的时候,Memstore中的数据会被刷到HFile中。
- 用到Memstore最主要的原因是:存储在HDFS上的数据需要按照row key 排序。而HDFS本身被设计为
- 顺序读写(sequential reads/writes),不允许修改。这样的话,HBase就不能够高效的写数据,因为要
- 写入到HBase的数据不会被排序,这也就意味着没有为将来的检索优化。为了解决这个问题,HBase将最
- 近接收到的数据缓存在内存中(in Memstore),在持久化到HDFS之前完成排序,然后再快速的顺序写入HDFS。
- 需要注意的一点是实际的HFile中,不仅仅只是简单地排序的列数据的列表,详见Apache HBase I/O – HFile。
- 除了解决“无序”问题外,Memstore还有一些其他的好处,例如:
- 作为一个内存级缓存,缓存最近增加数据。一种显而易见的场合是,新插入数据总是比老数据频繁使用。
- 在持久化写入之前,在内存中对Rows/Cells可以做某些优化。比如,当数据的version被设为1的时候,对于
- 某些CF的一些数据,Memstore缓存了数个对该Cell的更新,在写入HFile的时候,仅需要保存一个最新的版本就好了,
- 其他的都可以直接抛弃。
- 有一点需要特别注意:每一次Memstore的flush,会为每一个CF创建一个新的HFile。在读方面相对来说就会
- 简单一些:HBase首先检查请求的数据是否在Memstore,不在的话就到HFile中查找,最终返回merged的一个结果
- 给用户。
- HBase Memstore关注要点
- 迫于以下几个原因,HBase用户或者管理员需要关注Memstore并且要熟悉它是如何被使用的:
- Memstore有许多配置可以调整以取得好的性能和避免一些问题。HBase不会根据用户自己的使用模式来调整这些
- 配置,你需要自己来调整。频繁的Memstore flush会严重影响HBase集群读性能,并有可能带来一些额外的负载。
-
- Memstore flush的方式有可能影响你的HBase schema设计
- 接下来详细讨论一下这些要点:
- Configuring Memstore Flushes
- 对Memstore Flush来说,主要有两组配置项:
- 决定Flush触发时机
- 决定Flush何时触发并且在Flush时候更新被阻断(block)
- 第一组是关于触发“普通”flush,这类flush发生时,并不影响并行的写请求。该类型flush的配置项有:
- hbase.hregion.memstore.flush.size
- <property>
- <name>hbase.hregion.memstore.flush.size</name>
- <value>134217728</value>
- </property>
-
- base.regionserver.global.memstore.lowerLimit
- <property>
- <name>hbase.regionserver.global.memstore.lowerLimit</name>
- <value>0.35</value>
- </property>
-
- 需要注意的是第一个设置是每个Memstore的大小,当你设置该配置项时,你需要考虑一下每台RS承载的
- region总量。可能一开始你设置的该值比较小,后来随着region增多,那么就有可能因为第二个设置原因Memstore
- 的flush触发会变早许多。
- 第二组设置主要是出于安全考虑:有时候集群的“写负载”非常高,写入量一直超过flush的量,这时,
- 我们就希望memstore不要超过一定的安全设置。在这种情况下,写操作就要被阻止(blocked)一直到memstore
- 恢复到一个“可管理”(manageable)的大小。该类型flush配置项有:
- hbase.regionserver.global.memstore.upperLimit
- <property>
- <name>hbase.regionserver.global.memstore.upperLimit</name>
- <value>0.4</value>
- </property>
- hbase.hregion.memstore.block.multiplier
- <property>
- <name>hbase.hregion.memstore.block.multiplier</name>
- <value>2</value>
- </property>
- 某个节点“写阻塞”对该节点来说影响很大,但是对于整个集群的影响更大。HBase设计为:
- 每个Region仅属于一个RS但是“写负载”是均匀分布于整个集群(所有Region上)。
- 有一个如此“慢”的节点,将会使得整个集群都会变慢(最明显的是反映在速度上)。
- 提示:
- 严重关切Memstore的大小和Memstore Flush Queue的大小。理想情况下,
- Memstore的大小不应该达到hbase.regionserver.global.memstore.upperLimit的设置,
- Memstore Flush Queue 的size不能持续增长。
- 频繁的Memstore Flushes
- 要避免“写阻塞”,貌似让Flush操作尽量的早于达到触发“写操作”的阈值为宜。
- 但是,这将导致频繁的Flush操作,而由此带来的后果便是读性能下降以及额外的负载。
- 每次的Memstore Flush都会为每个CF创建一个HFile。频繁的Flush就会创建大量的HFile。
- 这样HBase在检索的时候,就不得不读取大量的HFile,读性能会受很大影响。
- 为预防打开过多HFile及避免读性能恶化,HBase有专门的HFile合并处理(HFile Compaction Process)。
- HBase会周期性的合并数个小HFile为一个大的HFile。明显的,有Memstore Flush产生的HFile越多,
- 集群系统就要做更多的合并操作(额外负载)。更糟糕的是:Compaction处理是跟集群上的其他请求并行进行的。
- 当HBase不能够跟上Compaction的时候(同样有阈值设置项),会在RS上出现“写阻塞”。像上面说到的,这是最最不希望的。
- 提示:
- 严重关切RS上Compaction Queue 的size。要在其引起问题前,阻止其持续增大。
- 理想情况下,在不超过hbase.regionserver.global.memstore.upperLimit的情况下,Memstore应该尽可能多
- 的使用内存(配置给Memstore部分的,而不是真个Heap的)。
- ----------------------------------------------HBsse与Hive的集成----------------------------------------------
- >> 数据存储在HBase中
- >> hive 表的描述信息存储在hive中
- hive-table <--映射--> hbase-table
- hive-column<--映射-->hbase-rowkey,hbase-cf-column
- handler
- HBsse与Hive的集成两种方式
- >> 管理表
- 创建hive表的时候,指定数据存储在hbase表中。
-
- 例如:
- CREATE TABLE hbase_hive_table(key int, value string)
- STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
- WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf1:val")
- TBLPROPERTIES ("hbase.table.name" = "xyz");
- >> 外部表
- 现在已经存在一个HBase表,需要对表中数据进行分析。
-
- 例如:
- CREATE EXTERNAL TABLE hbase_user(id int, name string,age int)
- STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
- WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:name,info:age")
- TBLPROPERTIES ("hbase.table.name" = "user");
- 本质:
- Hive就是HBase客户端。
- 需要一些配置,jar包
- export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
- export HIVE_HOME=/opt/modules/hive-0.13.1/lib
- ln -s $HBASE_HOME/lib/hbase-common-0.98.6-hadoop2.jar $HIVE_HOME/hbase-common-0.98.6-hadoop2.jar
- ln -s $HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar $HIVE_HOME/hbase-server-0.98.6-hadoop2.jar
- ln -s $HBASE_HOME/lib/hbase-client-0.98.6-hadoop2.jar $HIVE_HOME/hbase-client-0.98.6-hadoop2.jar
- ln -s $HBASE_HOME/lib/hbase-protocol-0.98.6-hadoop2.jar $HIVE_HOME/hbase-protocol-0.98.6-hadoop2.jar
- ln -s $HBASE_HOME/lib/hbase-it-0.98.6-hadoop2.jar $HIVE_HOME/hbase-it-0.98.6-hadoop2.jar
- ln -s $HBASE_HOME/lib/htrace-core-2.04.jar $HIVE_HOME/htrace-core-2.04.jar
- ln -s $HBASE_HOME/lib/hbase-hadoop2-compat-0.98.6-hadoop2.jar $HIVE_HOME/hbase-hadoop2-compat-0.98.6-hadoop2.jar
- ln -s $HBASE_HOME/lib/hbase-hadoop-compat-0.98.6-hadoop2.jar $HIVE_HOME/hbase-hadoop-compat-0.98.6-hadoop2.jar
- ln -s $HBASE_HOME/lib/high-scale-lib-1.1.1.jar $HBASE_HOME/high-scale-lib-1.1.1.jar
- ----------------------------------------------HBase与Hue的集成----------------------------------------------
- 配置hue.ini
- [hbase]
- hbase_clusters=(Cluster|hadoop.zxk.com:9090)
- hbase_conf_dir=/opt/modules/hbase-0.98.6-hadoop2/conf
- 启动ThriftServer bin/hbase-daemon.sh start thrift
- -----------------------------------------Scala运行MapReduce----------------------------------------
- val lineadd = sc.textFile("hdfs://hadoop.zxk.com:8020/user/root/mapreduce/wordcount/input/wc.input")
- val wordsAdd=lineadd.map(line =>line.split(" "))
- val wordsAdd=lineadd.flatMap(line =>line.split(" "))
- val keyvalRdds =wordsAdd.map(word=>(word,1))
- val count= keyvalRdds.reduceByKey((a,b)=>(a+b))
- sc.textFile("hdfs://hadoop.zxk.com:8020/user/root/mapreduce/wordcount/input/wc.input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
- -----------------------------------------Scala sortByKey----------------------------------------
- ## WordCount
- val rdd = sc.textFile("hdfs://hadoop.zxk.com:8020/user/root/mapreduce/wordcount/input/wc.input")
- val wordcount = rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)
- wordcount.saveAsTextFile("hdfs://hadoop.zxk.com:8020/user/root/mapreduce/wordcount/sparkOutput89")
- wordcount.sortByKey().collect # 默认情况是 升序
- wordcount.sortByKey(true).collect # true情况是 升序
- wordcount.sortByKey(false).collect # false情况是 升序
- 按照value值进行降序
- # Value Sort
- wordcount.map(x => (x._2,x._1)).sortByKey(false).collect
- wordcount.map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)).collect
-
- # Top N
- wordcount.map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)).take(3)
- Group Top Key
- WordCount 程序,前KEY值
- 需求:
- 类似MapReduce中的二次排序
- 1) 按照第一个字段进行分组
- 2) 对分组中的第二字段进行排序(降序)
- 3) 获取每个分组Top N,比如获取前三个值
- aa 78
- bb 98
- aa 80
- cc 98
- aa 69
- cc 87
- bb 97
- cc 86
- aa 97
- bb 78
- bb 34
- cc 85
- bb 92
- cc 72
- bb 32
- bb 23
-
- 功能分析:
- (aa,list(78,80,69,97)) -> (aa,list(69,78,80,97)) -> (aa,list(69,78,80))
-
- val rdd = sc.textFile("hdfs://hadoop.zxk.com:8020/user/root/spark/grouptop/input/score.input")
-
- rdd.map(_.split(" ")).collect
- Array(aa, 78)
-
- rdd.map(_.split(" ")).map(x => (x(0),x(1))).collect
- (aa,78)
-
- rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.collect
- (aa,CompactBuffer(78, 80, 69, 97))
- rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.map(
- x => {
- val xx = x._1
- val yy = x._2
- yy
- }
- ).collect
- Iterable[String]
-
- Iterable 方法:
- def toList: List[A]
- 返回包含此遍历的迭代器的所有元素的列表
-
- rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.map(
- x => {
- val xx = x._1
- val yy = x._2
- yy.toList
- }
- ).collect
- List[String]
- List(78, 80, 69, 97)
-
- List 方法:
- def sorted[B >: A]: List[A]
- 根据排序对列表进行排序
- rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.map(
- x => {
- val xx = x._1
- val yy = x._2
- yy.toList.sorted
- }
- ).collect
- List[String]
- List(69, 78, 80, 97)
-
- List 方法:
- def reverse: List[A]
- 返回新列表,在相反的顺序元素
- rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.map(
- x => {
- val xx = x._1
- val yy = x._2
- yy.toList.sorted.reverse
- }
- ).collect
- List[String]
- List(97, 80, 78, 69)
- List 方法:
- def take(n: Int): List[A]
- 返回前n个元素
-
- def takeRight(n: Int): List[A]
- 返回最后n个元素
- rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.map(
- x => {
- val xx = x._1
- val yy = x._2
- yy.toList.sorted.reverse.take(3)
- }
- ).collect
- 要求返回的是一个元组对
- rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.map(
- x => {
- val xx = x._1
- val yy = x._2
- (xx,yy.toList.sorted.reverse.take(3))
- }
- ).collect
- val groupTopKeyRdd = rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.map(
- x => {
- val xx = x._1
- val yy = x._2
- (xx,yy.toList.sorted.reverse.take(3))
- }
- )
- groupTopKeyRdd.saveAsTextFile("hdfs://hadoop.zxk.com:8020/user/root/spark/grouptop/output")
- -----------------------------------------Scala 提交jar包----------------------------------------
- bin/spark-submit \
- --master spark://hadoop.zxk.com:7077 \
- jars/sparkApp.jar
- Spark-----------------------------------------Streaming Demo-----------------------------------------
- 从Socket实时读取数据,进行实时处理
- # rpm -ivh nc-1.84-22.el6.x86_64.rpm
- ## 运行nc针对于端口号9999
- $ nc -lk 9999
- ## 运行Demo
- bin/run-example streaming.NetworkWordCount hadoop.zxk.com 9999
- -----------------------------------------Initializing StreamingContext-----------------------------------------
- import org.apache.spark._
- import org.apache.spark.streaming._
- import org.apache.spark.streaming.StreamingContext._
- val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
- val ssc = new StreamingContext(conf, Seconds(5))
- // read data
- val lines = ssc.socketTextStream("localhost", 9999)
- // process
- val words = lines.flatMap(_.split(" "))
- val pairs = words.map(word => (word, 1))
- val wordCounts = pairs.reduceByKey(_ + _)
- wordCounts.print()
-
- ssc.start() // Start the computation
- ssc.awaitTermination() // Wait for the computation to terminate
- -----------------------------------------HDFS -----------------------------------------
- import org.apache.spark._
- import org.apache.spark.streaming._
- import org.apache.spark.streaming.StreamingContext._
- val ssc = new StreamingContext(sc, Seconds(5))
- // read data
- val lines = ssc.textFileStream("hdfs://hadoop.zxk.com:8020/user/root/streaming/input/hdfs/")
- // process
- val words = lines.flatMap(_.split("\t"))
- val pairs = words.map(word => (word, 1))
- val wordCounts = pairs.reduceByKey(_ + _)
- wordCounts.print()
-
- ssc.start() // Start the computation
- ssc.awaitTermination() // Wait for the computation to terminate
- -----------------------------------如何在Spark-shell中执行某个scala代码-----------------------------------
- :load /opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/HDFSWordCount.scala
- -----------------------------------将处理数据保存到HDFS-----------------------------------
- import org.apache.spark._
- import org.apache.spark.streaming._
- import org.apache.spark.streaming.StreamingContext._
- val ssc = new StreamingContext(sc, Seconds(5))
- // read data
- val lines = ssc.textFileStream("hdfs://hadoop.zxk.com:8020/user/root/streaming/input/hdfs/")
- // process
- val words = lines.flatMap(_.split("\t"))
- val pairs = words.map(word => (word, 1))
- val wordCounts = pairs.reduceByKey(_ + _)
- wordCounts.saveAsTextFiles("hdfs://hadoop.zxk.com:8020/user/root/streaming/output/")
-
- ssc.start() // Start the computation
- ssc.awaitTermination() // Wait for the computation to terminate
- -----------------------------------Spark Streaming + Flume Integration-----------------------------------
- Flume有三个组件
- Source ---> Channel ---> Sink(Spark Streaming)
- import org.apache.spark._
- import org.apache.spark.streaming._
- import org.apache.spark.streaming.StreamingContext._
- import org.apache.spark.streaming.flume._
- import org.apache.spark.storage.StorageLevel
- val ssc = new StreamingContext(sc, Seconds(5))
- // read data
- val stream = FlumeUtils.createStream(ssc, "hadoop.zxk.com", 9999, StorageLevel.MEMORY_ONLY_SER_2)
- stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
- ssc.start() // Start the computation
- ssc.awaitTermination() // Wait for the computation to terminate
- ------------------
- bin/spark-shell --jars \
- /opt/modules/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/spark-streaming-flume_2.10-1.3.0.jar,/opt/modules/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/flume-avro-source-1.5.0-cdh5.3.6.jar,/opt/modules/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/flume-ng-sdk-1.5.0-cdh5.3.6.jar
- ------
- run command:
- bin/flume-ng agent -c conf -n a2 -f conf/flume-spark-push.sh -Dflume.root.logger=DEBUG,console
-
-
-
-
-
- -----------------------------------启动Kafka集群-----------------------------------
- 启动Kafka集群
- nohup bin/kafka-server-start.sh config/server.properties &
- 创建topic命令
- bin/kafka-topics.sh --create --zookeeper hadoop.zxk.com:2181 --replication-factor 1 --partitions 1 --topic test
- 查看已用topic
- bin/kafka-topics.sh --list --zookeeper hadoop.zxk.com:2181
- 生产数据
- bin/kafka-console-producer.sh --broker-list hadoop.zxk.com:9092 --topic test
- 消费数据
- bin/kafka-console-consumer.sh --zookeeper hadoop.zxk.com:2181 --topic test --from-beginning
-
-
-
-
- -----------------------------------Spark Streaming + Kafka Integration-----------------------------------
- import java.util.HashMap
- import org.apache.spark._
- import org.apache.spark.streaming._
- import org.apache.spark.streaming.StreamingContext._
- import org.apache.spark.streaming.kafka._
- val ssc = new StreamingContext(sc, Seconds(5))
- val topicMap = Map("test" -> 1)
- // read data
- val lines = KafkaUtils.createStream(ssc, "hadoop-senior.ibeifeng.com:2181", "testWordCountGroup", topicMap).map(_._2)
- val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
- wordCounts.print()
- ssc.start() // Start the computation
- ssc.awaitTermination() // Wait for the computation to terminate
- ------------------
- bin/spark-shell --jars \
- /opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/spark-streaming-kafka_2.10-1.3.0.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/kafka_2.10-0.8.2.1.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/kafka-clients-0.8.2.1.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/zkclient-0.3.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/metrics-core-2.2.0.jar
- 第两种方式
- -----------------------------------Spark Streaming + Kafka Integration-----------------------------------
- import kafka.serializer.StringDecoder
- import org.apache.spark._
- import org.apache.spark.streaming._
- import org.apache.spark.streaming.StreamingContext._
- import org.apache.spark.streaming.kafka._
- val ssc = new StreamingContext(sc, Seconds(5))
- val kafkaParams = Map[String, String]("metadata.broker.list" -> "hadoop-senior.ibeifeng.com:9092")
- val topicsSet = Set("test")
- // read data
- val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
- val lines = messages.map(_._2)
- val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
- wordCounts.print()
- ssc.start() // Start the computation
- ssc.awaitTermination() // Wait for the computation to terminate
- ---------------------------------------------------------------------------------------------------------
- bin/spark-shell --jars \
- /opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/spark-streaming-kafka_2.10-1.3.0.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/kafka_2.10-0.8.2.1.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/kafka-clients-0.8.2.1.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/zkclient-0.3.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/metrics-core-2.2.0.jar
- -----------------------------------UpdataStateByKey-----------------------------------
- import kafka.serializer.StringDecoder
- import org.apache.spark._
- import org.apache.spark.streaming._
- import org.apache.spark.streaming.StreamingContext._
- import org.apache.spark.streaming.kafka._
- val ssc = new StreamingContext(sc, Seconds(5))
- ssc.checkpoint(".")
- val kafkaParams = Map[String, String]("metadata.broker.list" -> "hadoop-senior.ibeifeng.com:9092")
- val topicsSet = Set("test")
- val updateFunc = (values: Seq[Int], state: Option[Int]) => {
- val currentCount = values.sum
- val previousCount = state.getOrElse(0)
- Some(currentCount + previousCount)
- }
- // read data
- val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
- val lines = messages.map(_._2)
- val words = lines.flatMap(_.split(" "))
- val wordDstream = words.map(x => (x, 1))
- val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
- stateDstream.print()
- ssc.start() // Start the computation
- ssc.awaitTermination() // Wait for the computation to terminate
- -----------------------------------隐式转换--------------------------------------------------
- 在scala中,有一个很重要的功能,就是隐式转换
- 比如
- A类对象,->通过一个函数将对象转换成另外一个对象 B类对象
- /**
- * Return a new "state" DStream where the state for each key is updated by applying
- * the given function on the previous state of the key and the new values of each key.
- * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
- * @param updateFunc State update function. If `this` function returns None, then
- * corresponding state key-value pair will be eliminated.
- * @tparam S State type
- */
- def updateStateByKey[S: ClassTag](
- updateFunc: (Seq[V], Option[S]) => Option[S]
- ): DStream[(K, S)] = ssc.withScope {
- updateStateByKey(updateFunc, defaultPartitioner())
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。