赞
踩
---------------------------------CentOS安装JDK与MySQL---------------------------------------
rpm -qa | grep java --查看系统中是否安装jdk
rpm -e --nodeps java包 --强制卸载系统中的jdk
tar -zxf jdk-7u79-linux-x64.tar.gz -C /opt/modules/ --解压jdk包
vi ../etc/profile --编辑profile文件
export JAVA_HOME=/opt/modules/jdk1.7.0_79 -- 在profile 文件结尾加上这两句话
export PATH=$PATH:$JAVA_HOME/bin
source /etc/profile 重启profile配置文件
create table student(id int, name string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
load data local inpath '/opt/datas/student.txt' into table student ;
rpm -qa | grep mysql
rpm -e --nodeps mysql包 --强制卸载系统中的mysql
rpm -ivh MySQL-server-5.6.24-1.el6.x86_64.rpm
---------------------------------CentOS安装JDK与MySQL---------------------------------------
---------------------------------Hive创建表---------------------------------------
create table if not exists default.log_1017
(
ip string comment 'remote ip address',
user string,
req_url string comment 'user request url'
)
comment 'Web Acsess Logs'
row format delimited fields terminated by ' '
stored as textfile ;
create table if not exists default.log_1017_2
as select * from default.log_1017
create table if not exists default.log_1017_2
like default.log_1017
create table if not exists student(id int, name string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
load data local inpath '/opt/datas/log-1017.txt' into table default.log_1017 ;
create table if not exists default.emp
(
empno int ,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int
)
row format delimited fields terminated by '\t';
create table if not exists default.dept
(
deptno int ,
dname string,
loc string
)
row format delimited fields terminated by '\t';
load data local inpath '/opt/datas/emp.txt' overwrite into table default.emp ;
load data local inpath '/opt/datas/dept.txt' into table default.dept ;
----创建外部表
create external table if not exists default.emp_ext
(
empno int ,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int
)
row format delimited fields terminated by '\t';
----创建分区表
create table if not exists default.dept_part
(
deptno int ,
dname string,
loc string
)
partitioned by (month string,day string)
row format delimited fields terminated by '\t';
load data local inpath '/opt/datas/dept.txt' overwrite into table default.dept_part partition (month='201810');
----第一种方式
dfs -mkdir -p /user/hive/warehouse/dept_part/day=20181020;
dfs -put /opt/datas/dept.txt /user/hive/warehouse/dept_part/day=20181020;
msck repair table dept_part
----第二种方式
dfs -mkdir -p /user/hive/warehouse/dept_part/day=20181021;
dfs -put /opt/datas/dept.txt /user/hive/warehouse/dept_part/day=20181021;
alert table dept_part add partition (day='20181021')
insert overwrite local directory '/opt/datas/hive_exp_emp'
row format delimited fields terminated by '\t' collection items terminated by '\n'
select * from default.emp;
insert overwrite directory '/user/root//hive_exp_emp'
select * from default.emp;
insert overwrite local directory '/opt/datas/hive_emp_res'
select * from emp order by empno asc;
set hive.exec.reducers.max=3
insert overwrite local directory '/opt/datas/hive_emp_res'
select * from emp sort by empno asc;
insert overwrite local directory '/opt/datas/hive_emp_res'
select * from emp distribute by deptno sort by empno asc;
---------------------------UDF编程---------------------------
<!-- Hive Client-->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
</dependency>
UDF编程步骤:
1-->Create UDF Java
2-->打包成Jar包,并上传。
3-->Use UDF Class
2-1--> add jar /opt/datas/hive-udf.jar
2-2--> create temporary function my_lower as "com.root.senior.hive.udf.LowerUDF"
2-3--> select ename ,my_lower(ename) lowername from emp limit 5 ;
CREATE FUNCTION self_lower AS 'com.root.senior.hive.udf.LowerUDF' USING JAR 'hdfs://hadoop.zxk.com:8020/user/root/hive/jars/hiveudf.jar';
select ename, self_lower(ename) lowername from emp limit 5 ;
------------------------------------测试Snappy-----------------------------------
bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.0.jar wordcount /user/root/mapreduce/wordcount/input /user/root/mapreduce/wordcount/output4
bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.0.jar wordcount
-Dmapreduce.map.output.compress=true
-Dmapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec
/user/root/mapreduce/wordcount/input
/user/root/mapreduce/wordcount/output2
-------------------------------------测试ORC与PARQUET数据格式-------------------------------------------
create table page_views(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE ;
load data local inpath '/opt/datas/page_views.data' into table page_views;
select session_id,count(*) cnt from page_views group by session_id order by cnt desc limit 30 ;
dfs -du -h /user/hive/warehouse/page_views
create table page_views_orc(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS ORC ;
insert into table page_views_orc select * from page_views ;
select session_id,count(*) cnt from page_views_orc group by session_id order by cnt desc limit 30 ;
dfs -du -h /user/hive/warehouse/page_views_orc
create table page_views_parquet(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS PARQUET ;
insert into table page_views_parquet select * from page_views ;
select session_id,count(*) cnt from page_views_parquet group by session_id order by cnt desc limit 30 ;
dfs -du -h /user/hive/warehouse/page_views_parquet
-------------------------------------测试ORC与PARQUET数据格式-------------------------------------------
-------------------------------------测试ORC与Snappy压缩结合使用----------------------------------------
create table page_views_orc_snappy(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS orc tblproperties ("orc.compress"="SNAPPY");
insert into table page_views_orc_snappy select * from page_views ;
dfs -du -h /user/hive/warehouse/page_views_orc_snappy/ ;
create table page_views_orc_none(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS orc tblproperties ("orc.compress"="NONE");
insert into table page_views_orc_none select * from page_views ;
dfs -du -h /user/hive/warehouse/page_views_orc_none/ ;
set parquet.compression=SNAPPY ;
create table page_views_parquet_snappy(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS parquet;
insert into table page_views_parquet_snappy select * from page_views ;
dfs -du -h /user/hive/warehouse/page_views_parquet_snappy/ ;
总结:
在实际的项目开发当中,hive表的数据
* 存储格式
orcfile / parquet
* 数据压缩
snappy
-------------------------------------测试ORC与Snappy压缩结合使用----------------------------------------
-------------------------------------使用正则表达式创建表----------------------------------------
drop table if exists default.bf_log_src ;
create table IF NOT EXISTS default.bf_log_src (
remote_addr string,
remote_user string,
time_local string,
request string,
status string,
body_bytes_sent string,
request_body string,
http_referer string,
http_user_agent string,
http_x_forwarded_for string,
host string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
"input.regex" = "(\"[^ ]*\") (\"-|[^ ]*\") (\"[^\]]*\") (\"[^\"]*\") (\"[0-9]*\") (\"[0-9]*\") (-|[^ ]*) (\"[^ ]*\") (\"[^\"]*\") (-|[^ ]*) (\"[^ ]*\")"
)
STORED AS TEXTFILE;
load data local inpath '/opt/datas/moodle.iroot.access.log' into table default.bf_log_src ;
---------------------------------------创建orc格式并且使用Snappy压缩的表格----------------------------------------
drop table if exists default.bf_log_comm ;
create table IF NOT EXISTS default.bf_log_comm (
remote_addr string,
time_local string,
request string,
http_referer string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS orc tblproperties ("orc.compress"="SNAPPY");
insert into table default.bf_log_comm select remote_addr, time_local, request,http_referer from default.bf_log_src ;
select * from bf_log_comm limit 5 ;
---------------------------------------定义UDF,对原表数据进行清洗----------------------------------------
第一个udf
去除引号
add jar /opt/datas/hiveudf2.jar ;
create temporary function my_removequotes as "com.root.senior.hive.udf.RemoveQuotesUDF" ;
insert overwrite table default.bf_log_comm select my_removequotes(remote_addr), my_removequotes(time_local), my_removequotes(request), my_removequotes(http_referer) from default.bf_log_src ;
select * from bf_log_comm limit 5 ;
-----------------------------------------------Sqoop-----------------------------------------
RDBMS以Mysql数据库为例讲解,拷贝jdbc驱动包到$SQOOP_HOME/lib目录下
cp /opt/softwares/mysql-libs/mysql-connector-java-5.1.27/mysql-connector-java-5.1.27-bin.jar
/opt/modules/sqoop-1.4.5-cdh5.3.6/lib/
bin/sqoop list-databases \
--connect jdbc:mysql://hadoop.zxk.com:3306 \
--username root \
--password 123456
-----------------------------------------------Sqoop Import使用-----------------------------------------
bin/sqoop import \
--connect jdbc:mysql://hadoop.zxk.com:3306/test \
--username root \
--password 123456 \
--table my_user
------------------------------------------Sqoop Import使用设置路径与mappers-----------------------------
bin/sqoop import \
--connect jdbc:mysql://hadoop.zxk.com:3306/test \
--username root \
--password 123456 \
--table my_user \
--target-dir /user/root/sqoop/imp_my_user \
--num-mappers 1
-----------------------------------Import Hdfs文件系统 Sqoop设置文件存储格式为parquetfile-------------------
sqoop 底层的实现就是MapReduce,import来说,仅仅运行Map Task
bin/sqoop import \
--connect jdbc:mysql://hadoop.zxk.com:3306/test \
--username root \
--password 123456 \
--table my_user \
--target-dir /user/root/sqoop/imp_my_user_parquet \
--fields-terminated-by ',' \
--num-mappers 1 \
--as-parquetfile
------------------------------------Import Hdfs文件系统 Sqoop设置数据的一部分列----------------------------
bin/sqoop import \
--connect jdbc:mysql://hadoop.zxk.com:3306/test \
--username root \
--password 123456 \
--table my_user \
--target-dir /user/root/sqoop/imp_my_user_column \
--num-mappers 1 \
--columns id,account
----------------------------------Import Hdfs文件系统 -Sqoop设置数据的一部分列--------------------------------
bin/sqoop import \
--connect jdbc:mysql://hadoop.zxk.com:3306/test \
--username root \
--password 123456 \
--table my_user \
--target-dir /user/root/sqoop/imp_my_user_column \
--num-mappers 1 \
--columns id,account
-----------------------------------Import Hdfs文件系统 Sqoop使用查询语句Query----------------------------------
bin/sqoop import \
--connect jdbc:mysql://hadoop.zxk.com:3306/test \
--username root \
--password 123456 \
--query 'select id, account from my_user where $CONDITIONS' \
--target-dir /user/root/sqoop/imp_my_user_query \
--num-mappers 1
-----------------------------------Import Hdfs文件系统 Sqoop使用Snappy压缩-------------------------------------
bin/sqoop import \
--connect jdbc:mysql://hadoop.zxk.com:3306/test \
--username root \
--password 123456 \
--table my_user \
--target-dir /user/root/sqoop/imp_my_sannpy \
--delete-target-dir \
--num-mappers 1 \
--compress \
--compression-codec org.apache.hadoop.io.compress.SnappyCodec \
--fields-terminated-by ','
drop table if exists default.hive_user_snappy ;
create table default.hive_user_snappy(
id int,
username string,
password string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ;
load data inpath '/user/root/sqoop/imp_my_sannpy' overwrite into table default.hive_user_snappy ;
----------------------------Import Hdfs文件系统 增量数据的导入(incremental)--------------------------
有一个唯一标识符,通常这个表都有一个字段,类似于插入时间createtime
* query
where createtime => 20150924000000000 and createtime < 20150925000000000
* sqoop
Incremental import arguments:
--check-column <column> Source column to check for incremental
change
--incremental <import-type> Define an incremental import of type
'append' or 'lastmodified'
--last-value <value> Last imported value in the incremental
check column
bin/sqoop import \
--connect jdbc:mysql://hadoop.zxk.com:3306/test \
--username root \
--password 123456 \
--table my_user \
--target-dir /user/root/sqoop/imp_my_incr \
--num-mappers 1 \
--incremental append \
--check-column id \
--last-value 4
----------------------------Import Hdfs文件系统 增量数据的导入(direct)--------------------------
bin/sqoop import \
--connect jdbc:mysql://hadoop.zxk.com:3306/test \
--username root \
--password 123456 \
--table my_user \
--target-dir /user/root/sqoop/imp_my_incr \
--num-mappers 1 \
--delete-target-dir \
--direct
----------------------------将Hdfs文件系统文件数据导入MySQL的表中--------------------------
touch /opt/datas/user.txt
vi /opt/datas/user.txt
12,root,root
13,xuanyun,xuanyu
bin/hdfs dfs -mkdir -p /user/root/sqoop/exp/user/
bin/hdfs dfs -put /opt/datas/user.txt /user/root/sqoop/exp/user/
bin/sqoop export \
--connect jdbc:mysql://hadoop.zxk.com:3306/test \
--username root \
--password 123456 \
--table my_user \
--export-dir /user/root/sqoop/exp/user/ \
--num-mappers 1
----------------------------使用Sqoop将MySQL表中的数据导入Hive中---------------------------
use default ;
drop table if exists user_hive ;
create table user_hive(
id int,
account string,
password string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ;
bin/sqoop import \
--connect jdbc:mysql://hadoop.zxk.com:3306/test \
--username root \
--password 123456 \
--table my_user \
--fields-terminated-by ',' \
--delete-target-dir \
--num-mappers 1 \
--hive-import \
--hive-database default \
--hive-table user_hive
----------------------------使用Sqoop将Hive表中数据(HDFS文件系统)中的数据导入MySQL中---------------------------
CREATE TABLE `my_usert` (
`id` tinyint(4) NOT NULL AUTO_INCREMENT,
`account` varchar(255) DEFAULT NULL,
`passwd` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
);
bin/sqoop export \
--connect jdbc:mysql://hadoop.zxk.com:3306/test \
--username root \
--password 123456 \
--table my_usert \
--export-dir /user/hive/warehouse/user_hive \
--num-mappers 1 \
--input-fields-terminated-by ','
---------------------------------------------Flume的基本配置--------------------------------------------
bin/flume-ng
Usage: bin/flume-ng <command> [options]...
commands:
agent run a Flume agent
global options:
--conf,-c <conf> use configs in <conf> directory
-Dproperty=value sets a Java system property value
agent options:
--name,-n <name> the name of this agent (required)
--conf-file,-f <file> specify a config file (required if -z missing)
bin/flume-ng agent --conf conf --name agent-test --conf-file test.conf -Dflume.root.logger=DEBUG,console
bin/flume-ng agent -c conf -n agent-test -f test.conf
bin/flume-ng agent --conf conf --name a1 --conf-file conf/a1.conf -Dflume.root.logger=DEBUG,console
bin/flume-ng agent --conf conf --name a2 --conf-file conf/flume-tail.conf -Dflume.root.logger=DEBUG,console
---------------------------------------------Oozie的基本命令--------------------------------------------
oozie job -oozie http://hadoop.zxk.com:11000/oozie -config examples/apps/map-reduce/job.properties -run
export OOZIE_URL=http://hadoop.zxk.com:11000/oozie
oozie job -config oozie-apps/mr-wordcount-wf/job.properties -run
---------------------------------------------Oozie WorkFlow的基本使用-------------------------------------
---------------------------------------------Oozie WorkFlow的MapReduce基本使用----------------------------
如何定义一个WorkFlow
第一步:
* 生成job.properties文件 -->指向 workflow.xml文件所在的HDFS位置。
第二步:
* 编写 workflow.xml文件
* 编写步骤:
* 编写流程控制节点
* 第一步:定义XML文件
* 第二步:编写节点
* start 节点
* action 节点(包含 MapReduce Hive Sqoop Shell)
* 当action节点执行成功时跳转
(1):--->>> action 节点
(2):--->>> end 节点
* 当action节点执行失败时跳转
(1):--->>> fail 节点
(2):--->>> kill 节点(杀死线程)
(3):--->>> end 节点
* end 节点
* 编写Action节点
* 关键点: 如何使用Oozie调用MapReduce程序
* 将以前Java MapReduce 程序中的【Driver】部分的配置 转换为 workflow.xml 文件中的 configuration 部分配置
【注意】 目前的Workflow.XML文件支持的是旧MapReduce API 需要加上配置
<property>
<name>mapred.mapper.new-api</name>
<value>true</value>
</property>
<property>
<name>mapred.reducer.new-api</name>
<value>true</value>
</property>
第三步:
* 将生成的jar包上传到lib目录下
第四步:
* 使用Hadoop将文件夹上传到HDFS文件系统中
bin/hdfs dfs -put /opt/modules/oozie-4.0.0-cdh5.3.6/oozie-apps/*文件夹名称/ oozie-apps/
第五步:
* 在Ooize安装目录下运行命令
export OOZIE_URL=http://hadoop.zxk.com:11000/oozie
oozie job -config oozie-apps/*文件夹名称/job.properties -run
---------------------------------------------Oozie workflow.xml 样例与job.properties 样例-------------------------------------------
job.properties
nameNode=hdfs://hadoop.zxk.com:8020
jobTracker=hadoop.zxk.com:8032
queueName=default
oozieAppsRoot=user/root/oozie-apps
oozieDataRoot=user/root/oozie/datas
oozie.wf.application.path=${nameNode}/${oozieAppsRoot}/mr-wordcount-wf/workflow.xml
inputDir=mr-wordcount-wf/input
outputDir=mr-wordcount-wf/output
workflow.xml
<workflow-app xmlns="uri:oozie:workflow:0.5" name="mr-wordcount-wf">
<start to="mr-node-wordcount"/>
<action name="mr-node-wordcount">
<map-reduce>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${nameNode}/${oozieDataRoot}/${outputDir}"/>
</prepare>
<configuration>
<property>
<name>mapred.mapper.new-api</name>
<value>true</value>
</property>
<property>
<name>mapred.reducer.new-api</name>
<value>true</value>
</property>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>mapreduce.job.map.class</name>
<value>com.iroot.hadoop.senior.mapreduce.WordCount$WordCountMapper</value>
</property>
<property>
<name>mapreduce.job.reduce.class</name>
<value>com.iroot.hadoop.senior.mapreduce.WordCount$WordCountReducer</value>
</property>
<property>
<name>mapreduce.map.output.key.class</name>
<value>org.apache.hadoop.io.Text</value>
</property>
<property>
<name>mapreduce.map.output.value.class</name>
<value>org.apache.hadoop.io.IntWritable</value>
</property>
<property>
<name>mapreduce.job.output.key.class</name>
<value>org.apache.hadoop.io.Text</value>
</property>
<property>
<name>mapreduce.job.output.value.class</name>
<value>org.apache.hadoop.io.IntWritable</value>
</property>
<property>
<name>mapreduce.input.fileinputformat.inputdir</name>
<value>${nameNode}/${oozieDataRoot}/${inputDir}</value>
</property>
<property>
<name>mapreduce.output.fileoutputformat.outputdir</name>
<value>${nameNode}/${oozieDataRoot}/${outputDir}</value>
</property>
</configuration>
</map-reduce>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>
---------------------------------------------Oozie WorkFlow的Hive基本使用----------------------------
如何定义一个WorkFlow
第一步:
* 生成job.properties文件 -->指向 workflow.xml文件所在的HDFS位置。
第二步:
* 编写 workflow.xml文件
* 编写步骤:
* 编写流程控制节点
* 第一步:定义XML文件
* 第二步:编写节点
* start 节点
* action 节点(包含 MapReduce Hive Sqoop Shell)
* 当action节点执行成功时跳转
(1):--->>> action 节点
(2):--->>> end 节点
* 当action节点执行失败时跳转
(1):--->>> fail 节点
(2):--->>> kill 节点(杀死线程)
(3):--->>> end 节点
* end 节点
* 编写Action节点
【注意】 目前的Workflow.XML文件支持旧MapReduce API 不需要配置
第三步:
* 将生成的mysql的jar包上传到lib目录下
第四步:
* 将Hive的配置文件hive.site.xml 上传到目录下
第五步:
* 使用Hadoop将文件夹上传到HDFS文件系统中
bin/hdfs dfs -put /opt/modules/oozie-4.0.0-cdh5.3.6/oozie-apps/*文件夹名称/ oozie-apps/
第六步:
* 在Ooize安装目录下运行命令
export OOZIE_URL=http://hadoop.zxk.com:11000/oozie
oozie job -config oozie-apps/*文件夹名称/job.properties -run
job.properties样例
nameNode=hdfs://hadoop.zxk.com:8020
jobTracker=hadoop.zxk.com:8032
queueName=default
oozieAppsRoot=user/root/oozie-apps
oozieDataRoot=user/root/oozie/datas
oozie.use.system.libpath=true
oozie.wf.application.path=${nameNode}/${oozieAppsRoot}/hive-select/
outputDir=hive-select/output
workflow.xml样例
<workflow-app xmlns="uri:oozie:workflow:0.5" name="wf-hive-select">
<start to="hive-node"/>
<action name="hive-node">
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${nameNode}/${oozieDataRoot}/${outputDir}"/>
</prepare>
<job-xml>${nameNode}/${oozieAppsRoot}/hive-select/hive-site.xml</job-xml>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<script>select-dept.sql</script>
<param>OUTPUT=${nameNode}/${oozieDataRoot}/${outputDir}</param>
</hive>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Hive failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>
---------------------------------------------Oozie WorkFlow的Sqoop基本使用----------------------------
如何定义一个WorkFlow
第一步:
* 生成job.properties文件 -->指向 workflow.xml文件所在的HDFS位置。
第二步:
* 编写 workflow.xml文件
* 编写步骤:
* 编写流程控制节点
* 第一步:定义XML文件
* 第二步:编写节点
* start 节点
* action 节点(包含 MapReduce Hive Sqoop Shell)
* 当action节点执行成功时跳转
(1):--->>> action 节点
(2):--->>> end 节点
* 当action节点执行失败时跳转
(1):--->>> fail 节点
(2):--->>> kill 节点(杀死线程)
(3):--->>> end 节点
* end 节点
* 编写Action节点
【注意】 目前的Workflow.XML文件支持旧MapReduce API 不需要配置
【注意】 <command></command>中的语句的编写依据Sqoopde 规则
第三步:
* 将生成的mysql的jar包上传到lib目录下
第四步:
* 使用Hadoop将文件夹上传到HDFS文件系统中
bin/hdfs dfs -put /opt/modules/oozie-4.0.0-cdh5.3.6/oozie-apps/*文件夹名称/ oozie-apps/
第五步:
* 在Ooize安装目录下运行命令
export OOZIE_URL=http://hadoop.zxk.com:11000/oozie
oozie job -config oozie-apps/*文件夹名称/job.properties -run
job.properties样例
nameNode=hdfs://hadoop.zxk.com:8020
jobTracker=hadoop.zxk.com:8032
queueName=default
oozieAppsRoot=user/root/oozie-apps
oozieDataRoot=user/root/oozie/datas
oozie.use.system.libpath=true
oozie.wf.application.path=${nameNode}/${oozieAppsRoot}/sqoop-import-user
outputDir=sqoop-import-user/output
workflow.xml样例
<workflow-app xmlns="uri:oozie:workflow:0.5" name="sqoop-wf">
<start to="sqoop-node"/>
<action name="sqoop-node">
<sqoop xmlns="uri:oozie:sqoop-action:0.3">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${nameNode}/${oozieDataRoot}/${outputDir}"/>
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<command>import --connect jdbc:mysql://hadoop.zxk.com:3306/test --username root --password 123456 --table my_user --target-dir /user/root/oozie/datas/sqoop-import-user/output --fields-terminated-by "," --num-mappers 1</command>
</sqoop>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Sqoop failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>
---------------------------------------------Oozie WorkFlow的Shell脚本基本使用----------------------------
如何定义一个WorkFlow
第一步:
* 生成job.properties文件 -->指向 workflow.xml文件所在的HDFS位置。
* exec=dept-select.sh (定义脚本的位置与名称)
* script=dept-select.sql (定义SQL语句的位置与名称)
第二步:
* 编写 workflow.xml文件
* 编写步骤:
* 编写流程控制节点
* 第一步:定义XML文件
* 第二步:编写节点
* start 节点
* action 节点(包含 MapReduce Hive Sqoop Shell)
* 当action节点执行成功时跳转
(1):--->>> action 节点
(2):--->>> end 节点
* 当action节点执行失败时跳转
(1):--->>> fail 节点
(2):--->>> kill 节点(杀死线程)
(3):--->>> end 节点
* end 节点
* 编写Action节点
【注意】 目前的Workflow.XML文件支持旧MapReduce API 不需要配置
【注意】 <file></file>中的路径是指HDFS文件系统中的目录
<exec>${exec}</exec>
<file>${nameNode}/${oozieAppsRoot}/shell-hive-select/${exec}#${exec}</file>
<file>${nameNode}/${oozieAppsRoot}/shell-hive-select/${script}#${script}</file>
第三步:
* 编写脚本文件
#!/usr/bin/env bash
/opt/modules/hive-0.13.1/bin/hive -f dept-select.sql
第四步(可选):
* 编写SQL语句
insert overwrite directory '/user/root/oozie/datas/shell-hive-select/output'
select
deptno, dname
from default.dept ;
第五步:
* 使用Hadoop将文件夹上传到HDFS文件系统中
bin/hdfs dfs -put /opt/modules/oozie-4.0.0-cdh5.3.6/oozie-apps/*文件夹名称/ oozie-apps/
第六步:
* 在Ooize安装目录下运行命令
export OOZIE_URL=http://hadoop.zxk.com:11000/oozie
oozie job -config oozie-apps/*文件夹名称/job.properties -run
job.properties样例
nameNode=hdfs://hadoop.zxk.com:8020
jobTracker=hadoop.zxk.com:8032
queueName=default
oozieAppsRoot=user/root/oozie-apps
oozieDataRoot=user/root/oozie/datas
oozie.wf.application.path=${nameNode}/${oozieAppsRoot}/shell-hive-select
exec=dept-select.sh
script=dept-select.sql
workflow.xml样例
<workflow-app xmlns="uri:oozie:workflow:0.5" name="shell-wf">
<start to="shell-node"/>
<action name="shell-node">
<shell xmlns="uri:oozie:shell-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<exec>${exec}</exec>
<file>${nameNode}/${oozieAppsRoot}/shell-hive-select/${exec}#${exec}</file>
<file>${nameNode}/${oozieAppsRoot}/shell-hive-select/${script}#${script}</file>
<capture-output/>
</shell>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Shell action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>
----------------------------Oozie WorkFlow的两个及以上Action编写以及运行----------------------------
依据不同的情况:组合Hadoop,Hive,Sqoop,Shell以及coordinator文件看是否上传hive-site.xml文件以及lib目录
job.properties文件
nameNode=hdfs://hadoop.zxk.com:8020
jobTracker=hadoop.zxk.com:8032
queueName=default
oozieAppsRoot=user/root/oozie-apps
oozieDataRoot=user/root/oozie/datas
oozie.use.system.libpath=true
oozie.wf.application.path=${nameNode}/${oozieAppsRoot}/wf-user-select/
#oozie.coord.application.path=${nameNode}/${oozieAppsRoot}/wf-user-select
#start=2015-10-15T00:00+0800
#end=2015-10-26T00:00+0800
#workflowAppUri=${nameNode}/${oozieAppsRoot}/wf-user-select
outputDir=wf-user-select/output
Workflow文件
workflow-app xmlns="uri:oozie:workflow:0.5" name="wf-user-select">
<start to="hive-node"/>
<action name="hive-node">
<hive xmlns="uri:oozie:hive-action:0.5">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${nameNode}/${oozieDataRoot}/${outputDir}"/>
</prepare>
<job-xml>${nameNode}/${oozieAppsRoot}/hive-select/hive-site.xml</job-xml>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<script>select-user.sql</script>
<param>OUTPUT=${nameNode}/${oozieDataRoot}/${outputDir}</param>
</hive>
<ok to="sqoop-node"/>
<error to="fail"/>
</action>
<action name="sqoop-node">
<sqoop xmlns="uri:oozie:sqoop-action:0.3">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<command>export --connect jdbc:mysql://hadoop.zxk.com:3306/test --username root --password 123456 --table my_user --num-mappers 1 --fields-terminated-by "," --export-dir /user/root/oozie/datas/wf-user-select/output</command>
</sqoop>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Hive failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>
------------------------------------------Hue 集成时HDFS与YARN的配置---------------------------------------------------
core-site.xml配置
<property>
<name>hadoop.proxyuser.hue.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hue.groups</name>
<value>*</value>
</property>
hue.ini
secret_key=jFE93j;2[290-eiw.KEiwN2s3['d;/.q[eIW^y#e=+Iei*@Mn<qW5o
http_host=hadoop.zxk.com
http_port=8888
time_zone=Asia/Shanghai
django_debug_mode=false
http_500_debug_mode=false
[[hdfs_clusters]]
# HA support by using HttpFs
[[[default]]]
fs_defaultfs=hdfs://hadoop.zxk.com:8020
webhdfs_url=http://hadoop.zxk.com:50070/webhdfs/v1
hadoop_conf_dir=/opt/modules/hadoop-2.5.0/etc/hadoop
# Configuration for YARN (MR2)
[[yarn_clusters]]
[[[default]]]
resourcemanager_host=hadoop.zxk.com
resourcemanager_port=8032
submit_to=True
resourcemanager_api_url=http://hadoop.zxk.com:8088
proxy_api_url=http://hadoop.zxk.com:8088
history_server_api_url=http://hadoop.zxk.com:19888
[filebrowser]
archive_upload_tempdir=/tmp
------------------------------------------Hue 集成时Hive的配置------------------------------------------------
启动 HiveServer2 bin/bin/hiveserver2
启动 Metastore bin/hive --service metastore
hive-site.xml
<!--HiveServer2 -->
<property>
<name>hive.server2.thrift.port</name>
<value>10000</value>
</property>
<property>
<name>hive.server2.thrift.bind.host</name>
<value>hadoop.zxk.com</value>
</property>
<!--Remote Metastore -->
<property>
<name>hive.metastore.uris</name>
<value>thrift://hadoop.zxk.com:9083</value>
</property>
hue.ini
[beeswax]
hive_server_host=hadoop.zxk.com
hive_server_port=10000
hive_conf_dir=/opt/modules/hive-0.13.1/conf
server_conn_timeout=120
[librdbms]
[[databases]]
[[[sqlite]]]
nice_name=SQLite
name=/opt/app/hue-3.7.0-cdh5.3.6/desktop/desktop.db
engine=sqlite
[[[mysql]]]
nice_name="My SQL DB"
name=test
engine=mysql
host=hadoop.zxk.com
port=3306
user=root
password=123456
------------------------------------------Hue 集成时Oozie的配置------------------------------------------------
启动Oozie bin/oozied.sh start
-----------------------------------------Hue 集成 Oozie时的share目录-----------------------------------------
bin/oozie-setup.sh sharelib create \
-fs hdfs://hadoop.zxk.com:8020 \
-locallib oozie-sharelib-4.0.0-cdh5.3.6-yarn.tar.gz
----------------------------------------HBase Shell 的使用----------------------------------------------------
在 Xshell 中 使用 hbase shell 进入后 无法删除 问题:
在hbase shell下,误输入的指令不能使用backspace和delete删除
进入到XShell 文件 --> 属性 --> 终端 --> 键盘
在 DELETE键序列 和 BACKSPACE键序列 中都选择 ASCII 127
表的管理
1)查看有哪些表
hbase(main):002:0> list
2)创建表
# 语法:create <table>, {NAME => <family>, VERSIONS => <VERSIONS>}
# 例如:创建表t1,有两个family name:f1,f2,且版本数均为2
hbase(main):002:0> create 't1',{NAME => 'f1', VERSIONS => 2},{NAME => 'f2', VERSIONS => 2}
hbase(main):002:0> create 'user','info'
2-1)查看表的结构
hbase(main):002:0> describe 表名
2-2)插入数据
put 'user','10001','info:name','zx'
put 'user','10001','info:age','22'
put 'user','10001','info:sex','female'
put 'user','10001','info:address','henan'
put 'user','10002','info:name','cl'
put 'user','10002','info:age','22'
put 'user','10002','info:address','henan'
put 'user','10003','info:name','zxk'
put 'user','10003','info:age','22'
2-3)查询数据
get 'user','10001'
get 'user','10001','info:address'
scan 'user'
scan 'user', {COLUMNS => ['info:name', 'info:age'], STARTROW => '10002'}
3)删除表
分两步:首先disable,然后drop
例如:删除表t1
hbase(main):002:0> disable 't1'
hbase(main):002:0> drop 't1'
4)查看表的结构
# 语法:describe <table>
# 例如:查看表t1的结构
hbase(main):002:0> describe 't1'
5)修改表结构
修改表结构必须先disable
# 语法:alter 't1', {NAME => 'f1'}, {NAME => 'f2', METHOD => 'delete'}
# 例如:修改表test1的cf的TTL为180天
hbase(main):002:0> disable 'test1'
hbase(main):002:0> alter 'test1',{NAME=>'body',TTL=>'15552000'},{NAME=>'meta', TTL=>'15552000'}
hbase(main):002:0> enable 'test1'
------------------------------------------HBase 数据存储----------------------------------------------------------
一、HBase数据检索流程
(1)客户端读或写一个表的数据,首先链接Zookeeper,因为需要到Zookeeper中找读的数据,表是
通过Region来管理,每个Region由RegionServer管理,每个Region都有startkey及endkey。
(2)HBase的表格分为User Tables(用户表)和Catalog Tables(系统自带表)。
(3)User Tables(用户表)包含user信息、region信息(startkey和endkey)。例:user表的
region-01存在regionserver-03中。该信息是保存在meta-table中。
(4)在HBase新版本中,有类似于RDBMS(关系数据库管理系统)中DataBase的命名空间的概念。
HBase的所有表都在data目录下,data下包含default目录和hbase目录,这里的目录就是命名空间的概念。
(5)用户自定义的表默认情况下命名空间为default,而系统自带的元数据表的命名空间为hbase。
(6)meta表只有一个Region,它的Region也需要RegionServer管理,即为meta-region-server的功能。
用户首先找到meta-region-server,然后找到meta表,scan命令可以看到表格中column被什么server管理。
(7)综上所述,用户表由很多region组成,region信息存储在hbase:meta中。用户表的每一个region
都有key。Client需要先读zookeeper,其实通过meta-region-server找到的是meta表的region,找到后扫描
meta表的数据,然后再找到数据再操作。
二、HBase数据存储
2.1 HBase结构详解
HBase能高速实现数据存储和访问源于HBase数据存储。
1. 连接Zookeeper,从Zookeeper中找到要读的数据。我们需要知道表中RowKey在region的位置。
2. 客户端查找HRegionServer,HRegionServer管理众多Region。
3. HMaster也需要连接Zookeeper,连接的作用是:HMaster需要知道哪些HRegionServer是活动
的及HRegionServer所在的位置,然后管理HRegionServer。
4. HBase内部是把数据写到HDFS上的,DFS有客户端。
5. Region中包含HLog、Store。一张表有几个列簇,就有几个Store。Store中有很多memStore
及StoreFile。StoreFile是对HFile的封装。StoreFile真正存储在HDFS上。
6. 写数据时,先往HLog上写一份,再往memStore上写一份。当memStore达到一定大小则往StoreFile上写。
若memStore数据有丢失,则从HLog上恢复。
7. 读数据先到memStore上读,再到StoreFile上读,之后合并。
2.2 HBase数据存储详解
1. HBase中的所有数据文件都存储在Hadoop HDFS文件系统上,主要包括两种文件类型:
1)HFile:HBase中KeyValue数据的存储格式,HFile是Hadoop的二进制格式文件,
实际上StoreFIle就是对HFile做了轻量级的包装,进行数据的存储。
2)HLog File:HBase中WAL(Write Ahead Log)的存储格式,物理上是Hadoop的Sequence File。
2. HRegionServer内部管理了一系列HRegion对象,每个HRegion对应了table中的一个region,
HRegion中由多个HStore组成。每个HStore对应了Table中的一个column family的存储,可以看出
每个columnfamily其实就是一个集中的存储单元,因此最好将具备共同IO特性的column放在一个column family中,
这样最高效。
3. HStore存储是HBase存储的核心,由两部分组成,一部分是MemStore,一部分是StoreFile。
4. MemStore是 Sorted Memory Buffer,用户写入的数据首先会放入MemStore,当MemStore满了以后
会Flush成一个StoreFile(底层实现是HFile)。
5. HLog 文件结构:WAL意为Write ahead log,类似Mysql中的binlog,用来做灾难恢复。
Hlog记录数据的所有变更,一旦数据修改,就可以从log中进行恢复。
6. 每个HRegionServer维护一个HLog,而不是每个HRegion一个。这样不同region(来自不同table)
的日志会混在一起,这样做的目的是不断追加单个文件,相对于同时写多个文件而言,可以减少
磁盘寻址次数,因此可以提高对table的写性能。带来的麻烦是,如果一台HRegionServer下线,
为了恢复其上的region,需要将HRegionServer上的log进行拆分,然后分发到其它HRegionServer
上进行恢复。
2.3 用户写入数据流程
1. Client客户端写入数据后 -> 数据存入MemStore,一直到MemStore满之后 Flush成一个StoreFile,
直至增长到一定阈值 -> 触发Compact合并操作 -> 多个StoreFile合并成一个StoreFile。
2. 同时进行版本合并和数据删除 -> 当StoreFiles Compact后,
逐步形成越来越大的StoreFile ->单个StoreFile大小超过一定阈值后,
触发Split操作,把当前Region分成2个Region,Region会下线,新分出的2个孩子Region会被HMaster分配到
相应的HRegionServer上,使得原先1个Region的压力得以分流到2个Region上。
export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
export HADOOP_HOME=/opt/modules/hadoop-2.5.0
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp` $HADOOP_HOME/bin/yarn jar $HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar
CellCounter: Count cells in HBase table
completebulkload: Complete a bulk data load.
copytable: Export a table from local cluster to peer cluster
export: Write table data to HDFS.
import: Import data written by Export.
importtsv: Import data in TSV format.
rowcounter: Count rows in HBase table
verifyrep: Compare the data from tables in two different clusters. WARNING: It doesn't work for incrementColumnValues'd cells since the timestamp is changed after being appended to the log.
------------------------------------------HBase 与MapReduce集成------------------------------------------
(1)上传Jar包
(2)export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
(3)export HADOOP_HOME=/opt/modules/hadoop-2.5.0
(4)HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp` $HADOOP_HOME/bin/yarn jar $HADOOP_HOME/jars/hbase-mr-user2basic.jar
------------------------------------------HBase Importtsv的使用----------------------------------------
10001 21 male henan 13243169133
10002 22 male yunnan 13243169133
10003 23 male dali 13243169133
10004 24 male henan 13243169133
10005 25 male henan 13243169133
export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
export HADOOP_HOME=/opt/modules/hadoop-2.5.0
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp` $HADOOP_HOME/bin/yarn jar \
$HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar importtsv \
-Dimporttsv.columns=HBASE_ROW_KEY,\
info:name,info:age,info:sex,info:address,info:phone \
student \
hdfs://hadoop.zxk.com:8020/user/root/hbase/importtsv
export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
export HADOOP_HOME=/opt/modules/hadoop-2.5.0
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp` $HADOOP_HOME/bin/yarn jar \
$HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar importtsv \
-Dimporttsv.columns=HBASE_ROW_KEY,\
info:name,info:age,info:sex,info:address,info:phone \
-Dimporttsv.bulk.output=hdfs://hadoop.zxk.com:8020/user/root/hbase/hfileoutput \
student \
hdfs://hadoop.zxk.com:8020/user/root/hbase/importtsv
export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
export HADOOP_HOME=/opt/modules/hadoop-2.5.0
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp` $HADOOP_HOME/bin/yarn jar \
$HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar \
completebulkload \
hdfs://hadoop.zxk.com:8020/user/root/hbase/hfileoutput \
student
------------------------------------------HBase 表与命名空间NameSpace的使用----------------------------------------
在HBase中,默认情况创建的表,都在【default】命名空间下,在Hbase中
系统的命名空间
meta
namespace
命名空间NameSpace中的命令
Group name: namespace
Commands: alter_namespace, create_namespace, describe_namespace, drop_namespace, list_namespace, list_namespace_tables
create 'ns1:t1', {NAME => 'f1', VERSIONS => 5}
create 'ns1:t1', 'cf' 等同于 create 'ns1:t1', {NAME => 'f1'}
create 'ns1:t2', {NAME => 'f1'}, {NAME => 'f2'}, {NAME => 'f3'}等同于create 'ns1:t3', 'f1', 'f2', 'f3'
------------------------------------------HBase 表的预分区的创建----------------------------------------
每一个Region会自动创建rowkeyd的范围[startkey,endkey):包括数值为startkey-->endkey-1
默认情况的情况下,创建一个HBase表,自动为表分配一个Region。
结合实际使用来看,无论是在测试环境还生产环境,我们创建好HBase一张表以后,需要往表中导入大量的数据。
我们会将【文件中的数据】数据转化为 【hfile文件】,通过【bulk load】 加载到hbase 的表中去。每一个【Region】
会被一个【RegionServer】管理。当数据过大,此时的Region分割为两个【Region】时,【RegionServer】会出问题
解决方案:
创建表时,多创建一些Region(依据表的数据rowkey进行设计,结合业务)
例如:五个Region
被多个RegionServer进行管理
要点:
在插入数据时,会向五个Region中分别插入对应的数据,均衡数据的插入
HBase 表的预分区创建
Region划分,依赖于rowkey,我们需要预先预估一些rowkey
案例:
hbase> create 'ns1:t1', 'f1', SPLITS => ['10', '20', '30', '40']
hbase> create 't1', 'f1', SPLITS => ['10', '20', '30', '40']
hbase> create 't1', 'f1', SPLITS_FILE => 'splits.txt', OWNER => 'johndoe'
hbase> create 't1', {NAME => 'f1', VERSIONS => 5}, METADATA => { 'mykey' => 'myvalue' }
hbase> # Optionally pre-split the table into NUMREGIONS, using
hbase> # SPLITALGO ("HexStringSplit", "UniformSplit" or classname)
hbase> create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}
hbase> create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit', CONFIGURATION => {'hbase.hregion.scan.loadColumnFamiliesOnDemand' => 'true'}}
案例:
create 'bflogs', 'info', SPLITS => ['20151001000000000', '20151011000000000', '20151021000000000']
指定预估rowkey
年月日时分秒毫秒
'20151001000000000'
'20151011000000000'
'20151021000000000'
create 'bflogs2', 'info', SPLITS_FILE => '/opt/datas/bflogs-split.txt'
第三方式(很少使用)
create 't11', 'f11', {NUMREGIONS => 2, SPLITALGO => 'HexStringSplit'}
create 't12', 'f12', {NUMREGIONS => 4, SPLITALGO => 'UniformSplit'}
-----------------------------------------HBase 表的的创建思路----------------------------------------
如何在海量数据中,获取需要的数据(查询的数据)。
表的rowkey设计中:
核心思想:
依据rowkey查询最快
对rowkey进行范围查询range
前缀匹配
索引表/辅助表(主表) 创建与设计
列簇:info
列:rowkey
主表和索引表的数据 如何同步
>>> 程序,事务
>>> phoenix
>>> 只能使用JDBC方式,主表与索引表的数据才能同步
创建索引表
>>> solr
lily
cloudera search
-----------------------------------------HBase 创建表时Sanppy压缩-----------------------------------------
1)配置Haodop压缩
>>> bin/hadoop checknative
2)配置HBase
>>> hadoop-snappy jar -> 放入到HBase的lib目录下
>>> 在HBase的lib目录下,创建native ,需要将本地库native ln -s 到 Hadoop 的lib目录的native下
>>> 配置HBase-site.xml
<property>
<name>hbase.regionserver.codecs</name>
<value>snappy</value>
</property>
-----------------------------------------HBase Memstore-----------------------------------------
当RegionServer(RS)收到写请求的时候(write request),RS会将请求转至相应的Region。
每一个Region都存储着一些列(a set of rows)。根据其列族的不同,将这些列数据存储在相应的列族中
(Column Family,简写CF)。不同的CFs中的数据存储在各自的HStore中,HStore由一个Memstore及一系列
HFile组成。
Memstore位于RS的主内存中,而HFiles被写入到HDFS中。当RS处理写请求的时候,数据首先写入到
Memstore,然后当到达一定的阀值的时候,Memstore中的数据会被刷到HFile中。
用到Memstore最主要的原因是:存储在HDFS上的数据需要按照row key 排序。而HDFS本身被设计为
顺序读写(sequential reads/writes),不允许修改。这样的话,HBase就不能够高效的写数据,因为要
写入到HBase的数据不会被排序,这也就意味着没有为将来的检索优化。为了解决这个问题,HBase将最
近接收到的数据缓存在内存中(in Memstore),在持久化到HDFS之前完成排序,然后再快速的顺序写入HDFS。
需要注意的一点是实际的HFile中,不仅仅只是简单地排序的列数据的列表,详见Apache HBase I/O – HFile。
除了解决“无序”问题外,Memstore还有一些其他的好处,例如:
作为一个内存级缓存,缓存最近增加数据。一种显而易见的场合是,新插入数据总是比老数据频繁使用。
在持久化写入之前,在内存中对Rows/Cells可以做某些优化。比如,当数据的version被设为1的时候,对于
某些CF的一些数据,Memstore缓存了数个对该Cell的更新,在写入HFile的时候,仅需要保存一个最新的版本就好了,
其他的都可以直接抛弃。
有一点需要特别注意:每一次Memstore的flush,会为每一个CF创建一个新的HFile。在读方面相对来说就会
简单一些:HBase首先检查请求的数据是否在Memstore,不在的话就到HFile中查找,最终返回merged的一个结果
给用户。
HBase Memstore关注要点
迫于以下几个原因,HBase用户或者管理员需要关注Memstore并且要熟悉它是如何被使用的:
Memstore有许多配置可以调整以取得好的性能和避免一些问题。HBase不会根据用户自己的使用模式来调整这些
配置,你需要自己来调整。频繁的Memstore flush会严重影响HBase集群读性能,并有可能带来一些额外的负载。
Memstore flush的方式有可能影响你的HBase schema设计
接下来详细讨论一下这些要点:
Configuring Memstore Flushes
对Memstore Flush来说,主要有两组配置项:
决定Flush触发时机
决定Flush何时触发并且在Flush时候更新被阻断(block)
第一组是关于触发“普通”flush,这类flush发生时,并不影响并行的写请求。该类型flush的配置项有:
hbase.hregion.memstore.flush.size
<property>
<name>hbase.hregion.memstore.flush.size</name>
<value>134217728</value>
</property>
base.regionserver.global.memstore.lowerLimit
<property>
<name>hbase.regionserver.global.memstore.lowerLimit</name>
<value>0.35</value>
</property>
需要注意的是第一个设置是每个Memstore的大小,当你设置该配置项时,你需要考虑一下每台RS承载的
region总量。可能一开始你设置的该值比较小,后来随着region增多,那么就有可能因为第二个设置原因Memstore
的flush触发会变早许多。
第二组设置主要是出于安全考虑:有时候集群的“写负载”非常高,写入量一直超过flush的量,这时,
我们就希望memstore不要超过一定的安全设置。在这种情况下,写操作就要被阻止(blocked)一直到memstore
恢复到一个“可管理”(manageable)的大小。该类型flush配置项有:
hbase.regionserver.global.memstore.upperLimit
<property>
<name>hbase.regionserver.global.memstore.upperLimit</name>
<value>0.4</value>
</property>
hbase.hregion.memstore.block.multiplier
<property>
<name>hbase.hregion.memstore.block.multiplier</name>
<value>2</value>
</property>
某个节点“写阻塞”对该节点来说影响很大,但是对于整个集群的影响更大。HBase设计为:
每个Region仅属于一个RS但是“写负载”是均匀分布于整个集群(所有Region上)。
有一个如此“慢”的节点,将会使得整个集群都会变慢(最明显的是反映在速度上)。
提示:
严重关切Memstore的大小和Memstore Flush Queue的大小。理想情况下,
Memstore的大小不应该达到hbase.regionserver.global.memstore.upperLimit的设置,
Memstore Flush Queue 的size不能持续增长。
频繁的Memstore Flushes
要避免“写阻塞”,貌似让Flush操作尽量的早于达到触发“写操作”的阈值为宜。
但是,这将导致频繁的Flush操作,而由此带来的后果便是读性能下降以及额外的负载。
每次的Memstore Flush都会为每个CF创建一个HFile。频繁的Flush就会创建大量的HFile。
这样HBase在检索的时候,就不得不读取大量的HFile,读性能会受很大影响。
为预防打开过多HFile及避免读性能恶化,HBase有专门的HFile合并处理(HFile Compaction Process)。
HBase会周期性的合并数个小HFile为一个大的HFile。明显的,有Memstore Flush产生的HFile越多,
集群系统就要做更多的合并操作(额外负载)。更糟糕的是:Compaction处理是跟集群上的其他请求并行进行的。
当HBase不能够跟上Compaction的时候(同样有阈值设置项),会在RS上出现“写阻塞”。像上面说到的,这是最最不希望的。
提示:
严重关切RS上Compaction Queue 的size。要在其引起问题前,阻止其持续增大。
理想情况下,在不超过hbase.regionserver.global.memstore.upperLimit的情况下,Memstore应该尽可能多
的使用内存(配置给Memstore部分的,而不是真个Heap的)。
----------------------------------------------HBsse与Hive的集成----------------------------------------------
>> 数据存储在HBase中
>> hive 表的描述信息存储在hive中
hive-table <--映射--> hbase-table
hive-column<--映射-->hbase-rowkey,hbase-cf-column
handler
HBsse与Hive的集成两种方式
>> 管理表
创建hive表的时候,指定数据存储在hbase表中。
例如:
CREATE TABLE hbase_hive_table(key int, value string)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf1:val")
TBLPROPERTIES ("hbase.table.name" = "xyz");
>> 外部表
现在已经存在一个HBase表,需要对表中数据进行分析。
例如:
CREATE EXTERNAL TABLE hbase_user(id int, name string,age int)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:name,info:age")
TBLPROPERTIES ("hbase.table.name" = "user");
本质:
Hive就是HBase客户端。
需要一些配置,jar包
export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
export HIVE_HOME=/opt/modules/hive-0.13.1/lib
ln -s $HBASE_HOME/lib/hbase-common-0.98.6-hadoop2.jar $HIVE_HOME/hbase-common-0.98.6-hadoop2.jar
ln -s $HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar $HIVE_HOME/hbase-server-0.98.6-hadoop2.jar
ln -s $HBASE_HOME/lib/hbase-client-0.98.6-hadoop2.jar $HIVE_HOME/hbase-client-0.98.6-hadoop2.jar
ln -s $HBASE_HOME/lib/hbase-protocol-0.98.6-hadoop2.jar $HIVE_HOME/hbase-protocol-0.98.6-hadoop2.jar
ln -s $HBASE_HOME/lib/hbase-it-0.98.6-hadoop2.jar $HIVE_HOME/hbase-it-0.98.6-hadoop2.jar
ln -s $HBASE_HOME/lib/htrace-core-2.04.jar $HIVE_HOME/htrace-core-2.04.jar
ln -s $HBASE_HOME/lib/hbase-hadoop2-compat-0.98.6-hadoop2.jar $HIVE_HOME/hbase-hadoop2-compat-0.98.6-hadoop2.jar
ln -s $HBASE_HOME/lib/hbase-hadoop-compat-0.98.6-hadoop2.jar $HIVE_HOME/hbase-hadoop-compat-0.98.6-hadoop2.jar
ln -s $HBASE_HOME/lib/high-scale-lib-1.1.1.jar $HBASE_HOME/high-scale-lib-1.1.1.jar
----------------------------------------------HBase与Hue的集成----------------------------------------------
配置hue.ini
[hbase]
hbase_clusters=(Cluster|hadoop.zxk.com:9090)
hbase_conf_dir=/opt/modules/hbase-0.98.6-hadoop2/conf
启动ThriftServer bin/hbase-daemon.sh start thrift
-----------------------------------------Scala运行MapReduce----------------------------------------
val lineadd = sc.textFile("hdfs://hadoop.zxk.com:8020/user/root/mapreduce/wordcount/input/wc.input")
val wordsAdd=lineadd.map(line =>line.split(" "))
val wordsAdd=lineadd.flatMap(line =>line.split(" "))
val keyvalRdds =wordsAdd.map(word=>(word,1))
val count= keyvalRdds.reduceByKey((a,b)=>(a+b))
sc.textFile("hdfs://hadoop.zxk.com:8020/user/root/mapreduce/wordcount/input/wc.input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
-----------------------------------------Scala sortByKey----------------------------------------
## WordCount
val rdd = sc.textFile("hdfs://hadoop.zxk.com:8020/user/root/mapreduce/wordcount/input/wc.input")
val wordcount = rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)
wordcount.saveAsTextFile("hdfs://hadoop.zxk.com:8020/user/root/mapreduce/wordcount/sparkOutput89")
wordcount.sortByKey().collect # 默认情况是 升序
wordcount.sortByKey(true).collect # true情况是 升序
wordcount.sortByKey(false).collect # false情况是 升序
按照value值进行降序
# Value Sort
wordcount.map(x => (x._2,x._1)).sortByKey(false).collect
wordcount.map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)).collect
# Top N
wordcount.map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)).take(3)
Group Top Key
WordCount 程序,前KEY值
需求:
类似MapReduce中的二次排序
1) 按照第一个字段进行分组
2) 对分组中的第二字段进行排序(降序)
3) 获取每个分组Top N,比如获取前三个值
aa 78
bb 98
aa 80
cc 98
aa 69
cc 87
bb 97
cc 86
aa 97
bb 78
bb 34
cc 85
bb 92
cc 72
bb 32
bb 23
功能分析:
(aa,list(78,80,69,97)) -> (aa,list(69,78,80,97)) -> (aa,list(69,78,80))
val rdd = sc.textFile("hdfs://hadoop.zxk.com:8020/user/root/spark/grouptop/input/score.input")
rdd.map(_.split(" ")).collect
Array(aa, 78)
rdd.map(_.split(" ")).map(x => (x(0),x(1))).collect
(aa,78)
rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.collect
(aa,CompactBuffer(78, 80, 69, 97))
rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.map(
x => {
val xx = x._1
val yy = x._2
yy
}
).collect
Iterable[String]
Iterable 方法:
def toList: List[A]
返回包含此遍历的迭代器的所有元素的列表
rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.map(
x => {
val xx = x._1
val yy = x._2
yy.toList
}
).collect
List[String]
List(78, 80, 69, 97)
List 方法:
def sorted[B >: A]: List[A]
根据排序对列表进行排序
rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.map(
x => {
val xx = x._1
val yy = x._2
yy.toList.sorted
}
).collect
List[String]
List(69, 78, 80, 97)
List 方法:
def reverse: List[A]
返回新列表,在相反的顺序元素
rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.map(
x => {
val xx = x._1
val yy = x._2
yy.toList.sorted.reverse
}
).collect
List[String]
List(97, 80, 78, 69)
List 方法:
def take(n: Int): List[A]
返回前n个元素
def takeRight(n: Int): List[A]
返回最后n个元素
rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.map(
x => {
val xx = x._1
val yy = x._2
yy.toList.sorted.reverse.take(3)
}
).collect
要求返回的是一个元组对
rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.map(
x => {
val xx = x._1
val yy = x._2
(xx,yy.toList.sorted.reverse.take(3))
}
).collect
val groupTopKeyRdd = rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.map(
x => {
val xx = x._1
val yy = x._2
(xx,yy.toList.sorted.reverse.take(3))
}
)
groupTopKeyRdd.saveAsTextFile("hdfs://hadoop.zxk.com:8020/user/root/spark/grouptop/output")
-----------------------------------------Scala 提交jar包----------------------------------------
bin/spark-submit \
--master spark://hadoop.zxk.com:7077 \
jars/sparkApp.jar
Spark-----------------------------------------Streaming Demo-----------------------------------------
从Socket实时读取数据,进行实时处理
# rpm -ivh nc-1.84-22.el6.x86_64.rpm
## 运行nc针对于端口号9999
$ nc -lk 9999
## 运行Demo
bin/run-example streaming.NetworkWordCount hadoop.zxk.com 9999
-----------------------------------------Initializing StreamingContext-----------------------------------------
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(5))
// read data
val lines = ssc.socketTextStream("localhost", 9999)
// process
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
-----------------------------------------HDFS -----------------------------------------
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val ssc = new StreamingContext(sc, Seconds(5))
// read data
val lines = ssc.textFileStream("hdfs://hadoop.zxk.com:8020/user/root/streaming/input/hdfs/")
// process
val words = lines.flatMap(_.split("\t"))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
-----------------------------------如何在Spark-shell中执行某个scala代码-----------------------------------
:load /opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/HDFSWordCount.scala
-----------------------------------将处理数据保存到HDFS-----------------------------------
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val ssc = new StreamingContext(sc, Seconds(5))
// read data
val lines = ssc.textFileStream("hdfs://hadoop.zxk.com:8020/user/root/streaming/input/hdfs/")
// process
val words = lines.flatMap(_.split("\t"))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.saveAsTextFiles("hdfs://hadoop.zxk.com:8020/user/root/streaming/output/")
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
-----------------------------------Spark Streaming + Flume Integration-----------------------------------
Flume有三个组件
Source ---> Channel ---> Sink(Spark Streaming)
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.flume._
import org.apache.spark.storage.StorageLevel
val ssc = new StreamingContext(sc, Seconds(5))
// read data
val stream = FlumeUtils.createStream(ssc, "hadoop.zxk.com", 9999, StorageLevel.MEMORY_ONLY_SER_2)
stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
------------------
bin/spark-shell --jars \
/opt/modules/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/spark-streaming-flume_2.10-1.3.0.jar,/opt/modules/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/flume-avro-source-1.5.0-cdh5.3.6.jar,/opt/modules/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/flume-ng-sdk-1.5.0-cdh5.3.6.jar
------
run command:
bin/flume-ng agent -c conf -n a2 -f conf/flume-spark-push.sh -Dflume.root.logger=DEBUG,console
-----------------------------------启动Kafka集群-----------------------------------
启动Kafka集群
nohup bin/kafka-server-start.sh config/server.properties &
创建topic命令
bin/kafka-topics.sh --create --zookeeper hadoop.zxk.com:2181 --replication-factor 1 --partitions 1 --topic test
查看已用topic
bin/kafka-topics.sh --list --zookeeper hadoop.zxk.com:2181
生产数据
bin/kafka-console-producer.sh --broker-list hadoop.zxk.com:9092 --topic test
消费数据
bin/kafka-console-consumer.sh --zookeeper hadoop.zxk.com:2181 --topic test --from-beginning
-----------------------------------Spark Streaming + Kafka Integration-----------------------------------
import java.util.HashMap
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
val ssc = new StreamingContext(sc, Seconds(5))
val topicMap = Map("test" -> 1)
// read data
val lines = KafkaUtils.createStream(ssc, "hadoop-senior.ibeifeng.com:2181", "testWordCountGroup", topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
------------------
bin/spark-shell --jars \
/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/spark-streaming-kafka_2.10-1.3.0.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/kafka_2.10-0.8.2.1.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/kafka-clients-0.8.2.1.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/zkclient-0.3.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/metrics-core-2.2.0.jar
第两种方式
-----------------------------------Spark Streaming + Kafka Integration-----------------------------------
import kafka.serializer.StringDecoder
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
val ssc = new StreamingContext(sc, Seconds(5))
val kafkaParams = Map[String, String]("metadata.broker.list" -> "hadoop-senior.ibeifeng.com:9092")
val topicsSet = Set("test")
// read data
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
---------------------------------------------------------------------------------------------------------
bin/spark-shell --jars \
/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/spark-streaming-kafka_2.10-1.3.0.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/kafka_2.10-0.8.2.1.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/kafka-clients-0.8.2.1.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/zkclient-0.3.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/metrics-core-2.2.0.jar
-----------------------------------UpdataStateByKey-----------------------------------
import kafka.serializer.StringDecoder
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
val ssc = new StreamingContext(sc, Seconds(5))
ssc.checkpoint(".")
val kafkaParams = Map[String, String]("metadata.broker.list" -> "hadoop-senior.ibeifeng.com:9092")
val topicsSet = Set("test")
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.sum
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
}
// read data
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))
val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
stateDstream.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
-----------------------------------隐式转换--------------------------------------------------
在scala中,有一个很重要的功能,就是隐式转换
比如
A类对象,->通过一个函数将对象转换成另外一个对象 B类对象
/**
* Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated.
* @tparam S State type
*/
def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S]
): DStream[(K, S)] = ssc.withScope {
updateStateByKey(updateFunc, defaultPartitioner())
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。