当前位置:   article > 正文

大数据学习笔记_create table if not exists tablea ( deptno int, dn

create table if not exists tablea ( deptno int, dname string ) partitioned b
  1. ---------------------------------CentOS安装JDK与MySQL---------------------------------------
  2. rpm -qa | grep java --查看系统中是否安装jdk
  3. rpm -e --nodeps java包 --强制卸载系统中的jdk
  4. tar -zxf jdk-7u79-linux-x64.tar.gz -C /opt/modules/ --解压jdk包
  5. vi ../etc/profile --编辑profile文件
  6. export JAVA_HOME=/opt/modules/jdk1.7.0_79 -- 在profile 文件结尾加上这两句话
  7. export PATH=$PATH:$JAVA_HOME/bin
  8. source /etc/profile 重启profile配置文件
  9. create table student(id int, name string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
  10. load data local inpath '/opt/datas/student.txt' into table student ;
  11. rpm -qa | grep mysql
  12. rpm -e --nodeps mysql包 --强制卸载系统中的mysql
  13. rpm -ivh MySQL-server-5.6.24-1.el6.x86_64.rpm
  14. ---------------------------------CentOS安装JDK与MySQL---------------------------------------
  15. ---------------------------------Hive创建表---------------------------------------
  16. create table if not exists default.log_1017
  17. (
  18. ip string comment 'remote ip address',
  19. user string,
  20. req_url string comment 'user request url'
  21. )
  22. comment 'Web Acsess Logs'
  23. row format delimited fields terminated by ' '
  24. stored as textfile ;
  25. create table if not exists default.log_1017_2
  26. as select * from default.log_1017
  27. create table if not exists default.log_1017_2
  28. like default.log_1017
  29. create table if not exists student(id int, name string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
  30. load data local inpath '/opt/datas/log-1017.txt' into table default.log_1017 ;
  31. create table if not exists default.emp
  32. (
  33. empno int ,
  34. ename string,
  35. job string,
  36. mgr int,
  37. hiredate string,
  38. sal double,
  39. comm double,
  40. deptno int
  41. )
  42. row format delimited fields terminated by '\t';
  43. create table if not exists default.dept
  44. (
  45. deptno int ,
  46. dname string,
  47. loc string
  48. )
  49. row format delimited fields terminated by '\t';
  50. load data local inpath '/opt/datas/emp.txt' overwrite into table default.emp ;
  51. load data local inpath '/opt/datas/dept.txt' into table default.dept ;
  52. ----创建外部表
  53. create external table if not exists default.emp_ext
  54. (
  55. empno int ,
  56. ename string,
  57. job string,
  58. mgr int,
  59. hiredate string,
  60. sal double,
  61. comm double,
  62. deptno int
  63. )
  64. row format delimited fields terminated by '\t';
  65. ----创建分区表
  66. create table if not exists default.dept_part
  67. (
  68. deptno int ,
  69. dname string,
  70. loc string
  71. )
  72. partitioned by (month string,day string)
  73. row format delimited fields terminated by '\t';
  74. load data local inpath '/opt/datas/dept.txt' overwrite into table default.dept_part partition (month='201810');
  75. ----第一种方式
  76. dfs -mkdir -p /user/hive/warehouse/dept_part/day=20181020;
  77. dfs -put /opt/datas/dept.txt /user/hive/warehouse/dept_part/day=20181020;
  78. msck repair table dept_part
  79. ----第二种方式
  80. dfs -mkdir -p /user/hive/warehouse/dept_part/day=20181021;
  81. dfs -put /opt/datas/dept.txt /user/hive/warehouse/dept_part/day=20181021;
  82. alert table dept_part add partition (day='20181021')
  83. insert overwrite local directory '/opt/datas/hive_exp_emp'
  84. row format delimited fields terminated by '\t' collection items terminated by '\n'
  85. select * from default.emp;
  86. insert overwrite directory '/user/root//hive_exp_emp'
  87. select * from default.emp;
  88. insert overwrite local directory '/opt/datas/hive_emp_res'
  89. select * from emp order by empno asc;
  90. set hive.exec.reducers.max=3
  91. insert overwrite local directory '/opt/datas/hive_emp_res'
  92. select * from emp sort by empno asc;
  93. insert overwrite local directory '/opt/datas/hive_emp_res'
  94. select * from emp distribute by deptno sort by empno asc;
  95. ---------------------------UDF编程---------------------------
  96. <!-- Hive Client-->
  97. <dependency>
  98. <groupId>org.apache.hive</groupId>
  99. <artifactId>hive-jdbc</artifactId>
  100. <version>${hive.version}</version>
  101. </dependency>
  102. <dependency>
  103. <groupId>org.apache.hive</groupId>
  104. <artifactId>hive-exec</artifactId>
  105. <version>${hive.version}</version>
  106. </dependency>
  107. UDF编程步骤:
  108. 1-->Create UDF Java
  109. 2-->打包成Jar包,并上传。
  110. 3-->Use UDF Class
  111. 2-1--> add jar /opt/datas/hive-udf.jar
  112. 2-2--> create temporary function my_lower as "com.root.senior.hive.udf.LowerUDF"
  113. 2-3--> select ename ,my_lower(ename) lowername from emp limit 5 ;
  114. CREATE FUNCTION self_lower AS 'com.root.senior.hive.udf.LowerUDF' USING JAR 'hdfs://hadoop.zxk.com:8020/user/root/hive/jars/hiveudf.jar';
  115. select ename, self_lower(ename) lowername from emp limit 5 ;
  116. ------------------------------------测试Snappy-----------------------------------
  117. bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.0.jar wordcount /user/root/mapreduce/wordcount/input /user/root/mapreduce/wordcount/output4
  118. bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.0.jar wordcount
  119. -Dmapreduce.map.output.compress=true
  120. -Dmapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec
  121. /user/root/mapreduce/wordcount/input
  122. /user/root/mapreduce/wordcount/output2
  123. -------------------------------------测试ORC与PARQUET数据格式-------------------------------------------
  124. create table page_views(
  125. track_time string,
  126. url string,
  127. session_id string,
  128. referer string,
  129. ip string,
  130. end_user_id string,
  131. city_id string
  132. )
  133. ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
  134. STORED AS TEXTFILE ;
  135. load data local inpath '/opt/datas/page_views.data' into table page_views;
  136. select session_id,count(*) cnt from page_views group by session_id order by cnt desc limit 30 ;
  137. dfs -du -h /user/hive/warehouse/page_views
  138. create table page_views_orc(
  139. track_time string,
  140. url string,
  141. session_id string,
  142. referer string,
  143. ip string,
  144. end_user_id string,
  145. city_id string
  146. )
  147. ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
  148. STORED AS ORC ;
  149. insert into table page_views_orc select * from page_views ;
  150. select session_id,count(*) cnt from page_views_orc group by session_id order by cnt desc limit 30 ;
  151. dfs -du -h /user/hive/warehouse/page_views_orc
  152. create table page_views_parquet(
  153. track_time string,
  154. url string,
  155. session_id string,
  156. referer string,
  157. ip string,
  158. end_user_id string,
  159. city_id string
  160. )
  161. ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
  162. STORED AS PARQUET ;
  163. insert into table page_views_parquet select * from page_views ;
  164. select session_id,count(*) cnt from page_views_parquet group by session_id order by cnt desc limit 30 ;
  165. dfs -du -h /user/hive/warehouse/page_views_parquet
  166. -------------------------------------测试ORC与PARQUET数据格式-------------------------------------------
  167. -------------------------------------测试ORC与Snappy压缩结合使用----------------------------------------
  168. create table page_views_orc_snappy(
  169. track_time string,
  170. url string,
  171. session_id string,
  172. referer string,
  173. ip string,
  174. end_user_id string,
  175. city_id string
  176. )
  177. ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
  178. STORED AS orc tblproperties ("orc.compress"="SNAPPY");
  179. insert into table page_views_orc_snappy select * from page_views ;
  180. dfs -du -h /user/hive/warehouse/page_views_orc_snappy/ ;
  181. create table page_views_orc_none(
  182. track_time string,
  183. url string,
  184. session_id string,
  185. referer string,
  186. ip string,
  187. end_user_id string,
  188. city_id string
  189. )
  190. ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
  191. STORED AS orc tblproperties ("orc.compress"="NONE");
  192. insert into table page_views_orc_none select * from page_views ;
  193. dfs -du -h /user/hive/warehouse/page_views_orc_none/ ;
  194. set parquet.compression=SNAPPY ;
  195. create table page_views_parquet_snappy(
  196. track_time string,
  197. url string,
  198. session_id string,
  199. referer string,
  200. ip string,
  201. end_user_id string,
  202. city_id string
  203. )
  204. ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
  205. STORED AS parquet;
  206. insert into table page_views_parquet_snappy select * from page_views ;
  207. dfs -du -h /user/hive/warehouse/page_views_parquet_snappy/ ;
  208. 总结:
  209. 在实际的项目开发当中,hive表的数据
  210. * 存储格式
  211. orcfile / parquet
  212. * 数据压缩
  213. snappy
  214. -------------------------------------测试ORC与Snappy压缩结合使用----------------------------------------
  215. -------------------------------------使用正则表达式创建表----------------------------------------
  216. drop table if exists default.bf_log_src ;
  217. create table IF NOT EXISTS default.bf_log_src (
  218. remote_addr string,
  219. remote_user string,
  220. time_local string,
  221. request string,
  222. status string,
  223. body_bytes_sent string,
  224. request_body string,
  225. http_referer string,
  226. http_user_agent string,
  227. http_x_forwarded_for string,
  228. host string
  229. )
  230. ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
  231. WITH SERDEPROPERTIES (
  232. "input.regex" = "(\"[^ ]*\") (\"-|[^ ]*\") (\"[^\]]*\") (\"[^\"]*\") (\"[0-9]*\") (\"[0-9]*\") (-|[^ ]*) (\"[^ ]*\") (\"[^\"]*\") (-|[^ ]*) (\"[^ ]*\")"
  233. )
  234. STORED AS TEXTFILE;
  235. load data local inpath '/opt/datas/moodle.iroot.access.log' into table default.bf_log_src ;
  236. ---------------------------------------创建orc格式并且使用Snappy压缩的表格----------------------------------------
  237. drop table if exists default.bf_log_comm ;
  238. create table IF NOT EXISTS default.bf_log_comm (
  239. remote_addr string,
  240. time_local string,
  241. request string,
  242. http_referer string
  243. )
  244. ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
  245. STORED AS orc tblproperties ("orc.compress"="SNAPPY");
  246. insert into table default.bf_log_comm select remote_addr, time_local, request,http_referer from default.bf_log_src ;
  247. select * from bf_log_comm limit 5 ;
  248. ---------------------------------------定义UDF,对原表数据进行清洗----------------------------------------
  249. 第一个udf
  250. 去除引号
  251. add jar /opt/datas/hiveudf2.jar ;
  252. create temporary function my_removequotes as "com.root.senior.hive.udf.RemoveQuotesUDF" ;
  253. insert overwrite table default.bf_log_comm select my_removequotes(remote_addr), my_removequotes(time_local), my_removequotes(request), my_removequotes(http_referer) from default.bf_log_src ;
  254. select * from bf_log_comm limit 5 ;
  255. -----------------------------------------------Sqoop-----------------------------------------
  256. RDBMS以Mysql数据库为例讲解,拷贝jdbc驱动包到$SQOOP_HOME/lib目录下
  257. cp /opt/softwares/mysql-libs/mysql-connector-java-5.1.27/mysql-connector-java-5.1.27-bin.jar
  258. /opt/modules/sqoop-1.4.5-cdh5.3.6/lib/
  259. bin/sqoop list-databases \
  260. --connect jdbc:mysql://hadoop.zxk.com:3306 \
  261. --username root \
  262. --password 123456
  263. -----------------------------------------------Sqoop Import使用-----------------------------------------
  264. bin/sqoop import \
  265. --connect jdbc:mysql://hadoop.zxk.com:3306/test \
  266. --username root \
  267. --password 123456 \
  268. --table my_user
  269. ------------------------------------------Sqoop Import使用设置路径与mappers-----------------------------
  270. bin/sqoop import \
  271. --connect jdbc:mysql://hadoop.zxk.com:3306/test \
  272. --username root \
  273. --password 123456 \
  274. --table my_user \
  275. --target-dir /user/root/sqoop/imp_my_user \
  276. --num-mappers 1
  277. -----------------------------------Import Hdfs文件系统 Sqoop设置文件存储格式为parquetfile-------------------
  278. sqoop 底层的实现就是MapReduce,import来说,仅仅运行Map Task
  279. bin/sqoop import \
  280. --connect jdbc:mysql://hadoop.zxk.com:3306/test \
  281. --username root \
  282. --password 123456 \
  283. --table my_user \
  284. --target-dir /user/root/sqoop/imp_my_user_parquet \
  285. --fields-terminated-by ',' \
  286. --num-mappers 1 \
  287. --as-parquetfile
  288. ------------------------------------Import Hdfs文件系统 Sqoop设置数据的一部分列----------------------------
  289. bin/sqoop import \
  290. --connect jdbc:mysql://hadoop.zxk.com:3306/test \
  291. --username root \
  292. --password 123456 \
  293. --table my_user \
  294. --target-dir /user/root/sqoop/imp_my_user_column \
  295. --num-mappers 1 \
  296. --columns id,account
  297. ----------------------------------Import Hdfs文件系统 -Sqoop设置数据的一部分列--------------------------------
  298. bin/sqoop import \
  299. --connect jdbc:mysql://hadoop.zxk.com:3306/test \
  300. --username root \
  301. --password 123456 \
  302. --table my_user \
  303. --target-dir /user/root/sqoop/imp_my_user_column \
  304. --num-mappers 1 \
  305. --columns id,account
  306. -----------------------------------Import Hdfs文件系统 Sqoop使用查询语句Query----------------------------------
  307. bin/sqoop import \
  308. --connect jdbc:mysql://hadoop.zxk.com:3306/test \
  309. --username root \
  310. --password 123456 \
  311. --query 'select id, account from my_user where $CONDITIONS' \
  312. --target-dir /user/root/sqoop/imp_my_user_query \
  313. --num-mappers 1
  314. -----------------------------------Import Hdfs文件系统 Sqoop使用Snappy压缩-------------------------------------
  315. bin/sqoop import \
  316. --connect jdbc:mysql://hadoop.zxk.com:3306/test \
  317. --username root \
  318. --password 123456 \
  319. --table my_user \
  320. --target-dir /user/root/sqoop/imp_my_sannpy \
  321. --delete-target-dir \
  322. --num-mappers 1 \
  323. --compress \
  324. --compression-codec org.apache.hadoop.io.compress.SnappyCodec \
  325. --fields-terminated-by ','
  326. drop table if exists default.hive_user_snappy ;
  327. create table default.hive_user_snappy(
  328. id int,
  329. username string,
  330. password string
  331. )
  332. ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ;
  333. load data inpath '/user/root/sqoop/imp_my_sannpy' overwrite into table default.hive_user_snappy ;
  334. ----------------------------Import Hdfs文件系统 增量数据的导入(incremental)--------------------------
  335. 有一个唯一标识符,通常这个表都有一个字段,类似于插入时间createtime
  336. * query
  337. where createtime => 20150924000000000 and createtime < 20150925000000000
  338. * sqoop
  339. Incremental import arguments:
  340. --check-column <column> Source column to check for incremental
  341. change
  342. --incremental <import-type> Define an incremental import of type
  343. 'append' or 'lastmodified'
  344. --last-value <value> Last imported value in the incremental
  345. check column
  346. bin/sqoop import \
  347. --connect jdbc:mysql://hadoop.zxk.com:3306/test \
  348. --username root \
  349. --password 123456 \
  350. --table my_user \
  351. --target-dir /user/root/sqoop/imp_my_incr \
  352. --num-mappers 1 \
  353. --incremental append \
  354. --check-column id \
  355. --last-value 4
  356. ----------------------------Import Hdfs文件系统 增量数据的导入(direct)--------------------------
  357. bin/sqoop import \
  358. --connect jdbc:mysql://hadoop.zxk.com:3306/test \
  359. --username root \
  360. --password 123456 \
  361. --table my_user \
  362. --target-dir /user/root/sqoop/imp_my_incr \
  363. --num-mappers 1 \
  364. --delete-target-dir \
  365. --direct
  366. ----------------------------将Hdfs文件系统文件数据导入MySQL的表中--------------------------
  367. touch /opt/datas/user.txt
  368. vi /opt/datas/user.txt
  369. 12,root,root
  370. 13,xuanyun,xuanyu
  371. bin/hdfs dfs -mkdir -p /user/root/sqoop/exp/user/
  372. bin/hdfs dfs -put /opt/datas/user.txt /user/root/sqoop/exp/user/
  373. bin/sqoop export \
  374. --connect jdbc:mysql://hadoop.zxk.com:3306/test \
  375. --username root \
  376. --password 123456 \
  377. --table my_user \
  378. --export-dir /user/root/sqoop/exp/user/ \
  379. --num-mappers 1
  380. ----------------------------使用Sqoop将MySQL表中的数据导入Hive中---------------------------
  381. use default ;
  382. drop table if exists user_hive ;
  383. create table user_hive(
  384. id int,
  385. account string,
  386. password string
  387. )
  388. ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ;
  389. bin/sqoop import \
  390. --connect jdbc:mysql://hadoop.zxk.com:3306/test \
  391. --username root \
  392. --password 123456 \
  393. --table my_user \
  394. --fields-terminated-by ',' \
  395. --delete-target-dir \
  396. --num-mappers 1 \
  397. --hive-import \
  398. --hive-database default \
  399. --hive-table user_hive
  400. ----------------------------使用Sqoop将Hive表中数据(HDFS文件系统)中的数据导入MySQL中---------------------------
  401. CREATE TABLE `my_usert` (
  402. `id` tinyint(4) NOT NULL AUTO_INCREMENT,
  403. `account` varchar(255) DEFAULT NULL,
  404. `passwd` varchar(255) DEFAULT NULL,
  405. PRIMARY KEY (`id`)
  406. );
  407. bin/sqoop export \
  408. --connect jdbc:mysql://hadoop.zxk.com:3306/test \
  409. --username root \
  410. --password 123456 \
  411. --table my_usert \
  412. --export-dir /user/hive/warehouse/user_hive \
  413. --num-mappers 1 \
  414. --input-fields-terminated-by ','
  415. ---------------------------------------------Flume的基本配置--------------------------------------------
  416. bin/flume-ng
  417. Usage: bin/flume-ng <command> [options]...
  418. commands:
  419. agent run a Flume agent
  420. global options:
  421. --conf,-c <conf> use configs in <conf> directory
  422. -Dproperty=value sets a Java system property value
  423. agent options:
  424. --name,-n <name> the name of this agent (required)
  425. --conf-file,-f <file> specify a config file (required if -z missing)
  426. bin/flume-ng agent --conf conf --name agent-test --conf-file test.conf -Dflume.root.logger=DEBUG,console
  427. bin/flume-ng agent -c conf -n agent-test -f test.conf
  428. bin/flume-ng agent --conf conf --name a1 --conf-file conf/a1.conf -Dflume.root.logger=DEBUG,console
  429. bin/flume-ng agent --conf conf --name a2 --conf-file conf/flume-tail.conf -Dflume.root.logger=DEBUG,console
  430. ---------------------------------------------Oozie的基本命令--------------------------------------------
  431. oozie job -oozie http://hadoop.zxk.com:11000/oozie -config examples/apps/map-reduce/job.properties -run
  432. export OOZIE_URL=http://hadoop.zxk.com:11000/oozie
  433. oozie job -config oozie-apps/mr-wordcount-wf/job.properties -run
  434. ---------------------------------------------Oozie WorkFlow的基本使用-------------------------------------
  435. ---------------------------------------------Oozie WorkFlow的MapReduce基本使用----------------------------
  436. 如何定义一个WorkFlow
  437. 第一步:
  438. * 生成job.properties文件 -->指向 workflow.xml文件所在的HDFS位置。
  439. 第二步:
  440. * 编写 workflow.xml文件
  441. * 编写步骤:
  442. * 编写流程控制节点
  443. * 第一步:定义XML文件
  444. * 第二步:编写节点
  445. * start 节点
  446. * action 节点(包含 MapReduce Hive Sqoop Shell)
  447. * 当action节点执行成功时跳转
  448. 1):--->>> action 节点
  449. 2):--->>> end 节点
  450. * 当action节点执行失败时跳转
  451. 1):--->>> fail 节点
  452. 2):--->>> kill 节点(杀死线程)
  453. 3):--->>> end 节点
  454. * end 节点
  455. * 编写Action节点
  456. * 关键点: 如何使用Oozie调用MapReduce程序
  457. * 将以前Java MapReduce 程序中的【Driver】部分的配置 转换为 workflow.xml 文件中的 configuration 部分配置
  458. 【注意】 目前的Workflow.XML文件支持的是旧MapReduce API 需要加上配置
  459. <property>
  460. <name>mapred.mapper.new-api</name>
  461. <value>true</value>
  462. </property>
  463. <property>
  464. <name>mapred.reducer.new-api</name>
  465. <value>true</value>
  466. </property>
  467. 第三步:
  468. * 将生成的jar包上传到lib目录下
  469. 第四步:
  470. * 使用Hadoop将文件夹上传到HDFS文件系统中
  471. bin/hdfs dfs -put /opt/modules/oozie-4.0.0-cdh5.3.6/oozie-apps/*文件夹名称/ oozie-apps/
  472. 第五步:
  473. * 在Ooize安装目录下运行命令
  474. export OOZIE_URL=http://hadoop.zxk.com:11000/oozie
  475. oozie job -config oozie-apps/*文件夹名称/job.properties -run
  476. ---------------------------------------------Oozie workflow.xml 样例与job.properties 样例-------------------------------------------
  477. job.properties
  478. nameNode=hdfs://hadoop.zxk.com:8020
  479. jobTracker=hadoop.zxk.com:8032
  480. queueName=default
  481. oozieAppsRoot=user/root/oozie-apps
  482. oozieDataRoot=user/root/oozie/datas
  483. oozie.wf.application.path=${nameNode}/${oozieAppsRoot}/mr-wordcount-wf/workflow.xml
  484. inputDir=mr-wordcount-wf/input
  485. outputDir=mr-wordcount-wf/output
  486. workflow.xml
  487. <workflow-app xmlns="uri:oozie:workflow:0.5" name="mr-wordcount-wf">
  488. <start to="mr-node-wordcount"/>
  489. <action name="mr-node-wordcount">
  490. <map-reduce>
  491. <job-tracker>${jobTracker}</job-tracker>
  492. <name-node>${nameNode}</name-node>
  493. <prepare>
  494. <delete path="${nameNode}/${oozieDataRoot}/${outputDir}"/>
  495. </prepare>
  496. <configuration>
  497. <property>
  498. <name>mapred.mapper.new-api</name>
  499. <value>true</value>
  500. </property>
  501. <property>
  502. <name>mapred.reducer.new-api</name>
  503. <value>true</value>
  504. </property>
  505. <property>
  506. <name>mapreduce.job.queuename</name>
  507. <value>${queueName}</value>
  508. </property>
  509. <property>
  510. <name>mapreduce.job.map.class</name>
  511. <value>com.iroot.hadoop.senior.mapreduce.WordCount$WordCountMapper</value>
  512. </property>
  513. <property>
  514. <name>mapreduce.job.reduce.class</name>
  515. <value>com.iroot.hadoop.senior.mapreduce.WordCount$WordCountReducer</value>
  516. </property>
  517. <property>
  518. <name>mapreduce.map.output.key.class</name>
  519. <value>org.apache.hadoop.io.Text</value>
  520. </property>
  521. <property>
  522. <name>mapreduce.map.output.value.class</name>
  523. <value>org.apache.hadoop.io.IntWritable</value>
  524. </property>
  525. <property>
  526. <name>mapreduce.job.output.key.class</name>
  527. <value>org.apache.hadoop.io.Text</value>
  528. </property>
  529. <property>
  530. <name>mapreduce.job.output.value.class</name>
  531. <value>org.apache.hadoop.io.IntWritable</value>
  532. </property>
  533. <property>
  534. <name>mapreduce.input.fileinputformat.inputdir</name>
  535. <value>${nameNode}/${oozieDataRoot}/${inputDir}</value>
  536. </property>
  537. <property>
  538. <name>mapreduce.output.fileoutputformat.outputdir</name>
  539. <value>${nameNode}/${oozieDataRoot}/${outputDir}</value>
  540. </property>
  541. </configuration>
  542. </map-reduce>
  543. <ok to="end"/>
  544. <error to="fail"/>
  545. </action>
  546. <kill name="fail">
  547. <message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
  548. </kill>
  549. <end name="end"/>
  550. </workflow-app>
  551. ---------------------------------------------Oozie WorkFlow的Hive基本使用----------------------------
  552. 如何定义一个WorkFlow
  553. 第一步:
  554. * 生成job.properties文件 -->指向 workflow.xml文件所在的HDFS位置。
  555. 第二步:
  556. * 编写 workflow.xml文件
  557. * 编写步骤:
  558. * 编写流程控制节点
  559. * 第一步:定义XML文件
  560. * 第二步:编写节点
  561. * start 节点
  562. * action 节点(包含 MapReduce Hive Sqoop Shell)
  563. * 当action节点执行成功时跳转
  564. 1):--->>> action 节点
  565. 2):--->>> end 节点
  566. * 当action节点执行失败时跳转
  567. 1):--->>> fail 节点
  568. 2):--->>> kill 节点(杀死线程)
  569. 3):--->>> end 节点
  570. * end 节点
  571. * 编写Action节点
  572. 【注意】 目前的Workflow.XML文件支持旧MapReduce API 不需要配置
  573. 第三步:
  574. * 将生成的mysql的jar包上传到lib目录下
  575. 第四步:
  576. * 将Hive的配置文件hive.site.xml 上传到目录下
  577. 第五步:
  578. * 使用Hadoop将文件夹上传到HDFS文件系统中
  579. bin/hdfs dfs -put /opt/modules/oozie-4.0.0-cdh5.3.6/oozie-apps/*文件夹名称/ oozie-apps/
  580. 第六步:
  581. * 在Ooize安装目录下运行命令
  582. export OOZIE_URL=http://hadoop.zxk.com:11000/oozie
  583. oozie job -config oozie-apps/*文件夹名称/job.properties -run
  584. job.properties样例
  585. nameNode=hdfs://hadoop.zxk.com:8020
  586. jobTracker=hadoop.zxk.com:8032
  587. queueName=default
  588. oozieAppsRoot=user/root/oozie-apps
  589. oozieDataRoot=user/root/oozie/datas
  590. oozie.use.system.libpath=true
  591. oozie.wf.application.path=${nameNode}/${oozieAppsRoot}/hive-select/
  592. outputDir=hive-select/output
  593. workflow.xml样例
  594. <workflow-app xmlns="uri:oozie:workflow:0.5" name="wf-hive-select">
  595. <start to="hive-node"/>
  596. <action name="hive-node">
  597. <hive xmlns="uri:oozie:hive-action:0.2">
  598. <job-tracker>${jobTracker}</job-tracker>
  599. <name-node>${nameNode}</name-node>
  600. <prepare>
  601. <delete path="${nameNode}/${oozieDataRoot}/${outputDir}"/>
  602. </prepare>
  603. <job-xml>${nameNode}/${oozieAppsRoot}/hive-select/hive-site.xml</job-xml>
  604. <configuration>
  605. <property>
  606. <name>mapred.job.queue.name</name>
  607. <value>${queueName}</value>
  608. </property>
  609. </configuration>
  610. <script>select-dept.sql</script>
  611. <param>OUTPUT=${nameNode}/${oozieDataRoot}/${outputDir}</param>
  612. </hive>
  613. <ok to="end"/>
  614. <error to="fail"/>
  615. </action>
  616. <kill name="fail">
  617. <message>Hive failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
  618. </kill>
  619. <end name="end"/>
  620. </workflow-app>
  621. ---------------------------------------------Oozie WorkFlow的Sqoop基本使用----------------------------
  622. 如何定义一个WorkFlow
  623. 第一步:
  624. * 生成job.properties文件 -->指向 workflow.xml文件所在的HDFS位置。
  625. 第二步:
  626. * 编写 workflow.xml文件
  627. * 编写步骤:
  628. * 编写流程控制节点
  629. * 第一步:定义XML文件
  630. * 第二步:编写节点
  631. * start 节点
  632. * action 节点(包含 MapReduce Hive Sqoop Shell)
  633. * 当action节点执行成功时跳转
  634. 1):--->>> action 节点
  635. 2):--->>> end 节点
  636. * 当action节点执行失败时跳转
  637. 1):--->>> fail 节点
  638. 2):--->>> kill 节点(杀死线程)
  639. 3):--->>> end 节点
  640. * end 节点
  641. * 编写Action节点
  642. 【注意】 目前的Workflow.XML文件支持旧MapReduce API 不需要配置
  643. 【注意】 <command></command>中的语句的编写依据Sqoopde 规则
  644. 第三步:
  645. * 将生成的mysql的jar包上传到lib目录下
  646. 第四步:
  647. * 使用Hadoop将文件夹上传到HDFS文件系统中
  648. bin/hdfs dfs -put /opt/modules/oozie-4.0.0-cdh5.3.6/oozie-apps/*文件夹名称/ oozie-apps/
  649. 第五步:
  650. * 在Ooize安装目录下运行命令
  651. export OOZIE_URL=http://hadoop.zxk.com:11000/oozie
  652. oozie job -config oozie-apps/*文件夹名称/job.properties -run
  653. job.properties样例
  654. nameNode=hdfs://hadoop.zxk.com:8020
  655. jobTracker=hadoop.zxk.com:8032
  656. queueName=default
  657. oozieAppsRoot=user/root/oozie-apps
  658. oozieDataRoot=user/root/oozie/datas
  659. oozie.use.system.libpath=true
  660. oozie.wf.application.path=${nameNode}/${oozieAppsRoot}/sqoop-import-user
  661. outputDir=sqoop-import-user/output
  662. workflow.xml样例
  663. <workflow-app xmlns="uri:oozie:workflow:0.5" name="sqoop-wf">
  664. <start to="sqoop-node"/>
  665. <action name="sqoop-node">
  666. <sqoop xmlns="uri:oozie:sqoop-action:0.3">
  667. <job-tracker>${jobTracker}</job-tracker>
  668. <name-node>${nameNode}</name-node>
  669. <prepare>
  670. <delete path="${nameNode}/${oozieDataRoot}/${outputDir}"/>
  671. </prepare>
  672. <configuration>
  673. <property>
  674. <name>mapred.job.queue.name</name>
  675. <value>${queueName}</value>
  676. </property>
  677. </configuration>
  678. <command>import --connect jdbc:mysql://hadoop.zxk.com:3306/test --username root --password 123456 --table my_user --target-dir /user/root/oozie/datas/sqoop-import-user/output --fields-terminated-by "," --num-mappers 1</command>
  679. </sqoop>
  680. <ok to="end"/>
  681. <error to="fail"/>
  682. </action>
  683. <kill name="fail">
  684. <message>Sqoop failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
  685. </kill>
  686. <end name="end"/>
  687. </workflow-app>
  688. ---------------------------------------------Oozie WorkFlow的Shell脚本基本使用----------------------------
  689. 如何定义一个WorkFlow
  690. 第一步:
  691. * 生成job.properties文件 -->指向 workflow.xml文件所在的HDFS位置。
  692. * exec=dept-select.sh (定义脚本的位置与名称)
  693. * script=dept-select.sql (定义SQL语句的位置与名称)
  694. 第二步:
  695. * 编写 workflow.xml文件
  696. * 编写步骤:
  697. * 编写流程控制节点
  698. * 第一步:定义XML文件
  699. * 第二步:编写节点
  700. * start 节点
  701. * action 节点(包含 MapReduce Hive Sqoop Shell)
  702. * 当action节点执行成功时跳转
  703. 1):--->>> action 节点
  704. 2):--->>> end 节点
  705. * 当action节点执行失败时跳转
  706. 1):--->>> fail 节点
  707. 2):--->>> kill 节点(杀死线程)
  708. 3):--->>> end 节点
  709. * end 节点
  710. * 编写Action节点
  711. 【注意】 目前的Workflow.XML文件支持旧MapReduce API 不需要配置
  712. 【注意】 <file></file>中的路径是指HDFS文件系统中的目录
  713. <exec>${exec}</exec>
  714. <file>${nameNode}/${oozieAppsRoot}/shell-hive-select/${exec}#${exec}</file>
  715. <file>${nameNode}/${oozieAppsRoot}/shell-hive-select/${script}#${script}</file>
  716. 第三步:
  717. * 编写脚本文件
  718. #!/usr/bin/env bash
  719. /opt/modules/hive-0.13.1/bin/hive -f dept-select.sql
  720. 第四步(可选):
  721. * 编写SQL语句
  722. insert overwrite directory '/user/root/oozie/datas/shell-hive-select/output'
  723. select
  724. deptno, dname
  725. from default.dept ;
  726. 第五步:
  727. * 使用Hadoop将文件夹上传到HDFS文件系统中
  728. bin/hdfs dfs -put /opt/modules/oozie-4.0.0-cdh5.3.6/oozie-apps/*文件夹名称/ oozie-apps/
  729. 第六步:
  730. * 在Ooize安装目录下运行命令
  731. export OOZIE_URL=http://hadoop.zxk.com:11000/oozie
  732. oozie job -config oozie-apps/*文件夹名称/job.properties -run
  733. job.properties样例
  734. nameNode=hdfs://hadoop.zxk.com:8020
  735. jobTracker=hadoop.zxk.com:8032
  736. queueName=default
  737. oozieAppsRoot=user/root/oozie-apps
  738. oozieDataRoot=user/root/oozie/datas
  739. oozie.wf.application.path=${nameNode}/${oozieAppsRoot}/shell-hive-select
  740. exec=dept-select.sh
  741. script=dept-select.sql
  742. workflow.xml样例
  743. <workflow-app xmlns="uri:oozie:workflow:0.5" name="shell-wf">
  744. <start to="shell-node"/>
  745. <action name="shell-node">
  746. <shell xmlns="uri:oozie:shell-action:0.2">
  747. <job-tracker>${jobTracker}</job-tracker>
  748. <name-node>${nameNode}</name-node>
  749. <configuration>
  750. <property>
  751. <name>mapred.job.queue.name</name>
  752. <value>${queueName}</value>
  753. </property>
  754. </configuration>
  755. <exec>${exec}</exec>
  756. <file>${nameNode}/${oozieAppsRoot}/shell-hive-select/${exec}#${exec}</file>
  757. <file>${nameNode}/${oozieAppsRoot}/shell-hive-select/${script}#${script}</file>
  758. <capture-output/>
  759. </shell>
  760. <ok to="end"/>
  761. <error to="fail"/>
  762. </action>
  763. <kill name="fail">
  764. <message>Shell action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
  765. </kill>
  766. <end name="end"/>
  767. </workflow-app>
  768. ----------------------------Oozie WorkFlow的两个及以上Action编写以及运行----------------------------
  769. 依据不同的情况:组合Hadoop,Hive,Sqoop,Shell以及coordinator文件看是否上传hive-site.xml文件以及lib目录
  770. job.properties文件
  771. nameNode=hdfs://hadoop.zxk.com:8020
  772. jobTracker=hadoop.zxk.com:8032
  773. queueName=default
  774. oozieAppsRoot=user/root/oozie-apps
  775. oozieDataRoot=user/root/oozie/datas
  776. oozie.use.system.libpath=true
  777. oozie.wf.application.path=${nameNode}/${oozieAppsRoot}/wf-user-select/
  778. #oozie.coord.application.path=${nameNode}/${oozieAppsRoot}/wf-user-select
  779. #start=2015-10-15T00:00+0800
  780. #end=2015-10-26T00:00+0800
  781. #workflowAppUri=${nameNode}/${oozieAppsRoot}/wf-user-select
  782. outputDir=wf-user-select/output
  783. Workflow文件
  784. workflow-app xmlns="uri:oozie:workflow:0.5" name="wf-user-select">
  785. <start to="hive-node"/>
  786. <action name="hive-node">
  787. <hive xmlns="uri:oozie:hive-action:0.5">
  788. <job-tracker>${jobTracker}</job-tracker>
  789. <name-node>${nameNode}</name-node>
  790. <prepare>
  791. <delete path="${nameNode}/${oozieDataRoot}/${outputDir}"/>
  792. </prepare>
  793. <job-xml>${nameNode}/${oozieAppsRoot}/hive-select/hive-site.xml</job-xml>
  794. <configuration>
  795. <property>
  796. <name>mapred.job.queue.name</name>
  797. <value>${queueName}</value>
  798. </property>
  799. </configuration>
  800. <script>select-user.sql</script>
  801. <param>OUTPUT=${nameNode}/${oozieDataRoot}/${outputDir}</param>
  802. </hive>
  803. <ok to="sqoop-node"/>
  804. <error to="fail"/>
  805. </action>
  806. <action name="sqoop-node">
  807. <sqoop xmlns="uri:oozie:sqoop-action:0.3">
  808. <job-tracker>${jobTracker}</job-tracker>
  809. <name-node>${nameNode}</name-node>
  810. <configuration>
  811. <property>
  812. <name>mapred.job.queue.name</name>
  813. <value>${queueName}</value>
  814. </property>
  815. </configuration>
  816. <command>export --connect jdbc:mysql://hadoop.zxk.com:3306/test --username root --password 123456 --table my_user --num-mappers 1 --fields-terminated-by "," --export-dir /user/root/oozie/datas/wf-user-select/output</command>
  817. </sqoop>
  818. <ok to="end"/>
  819. <error to="fail"/>
  820. </action>
  821. <kill name="fail">
  822. <message>Hive failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
  823. </kill>
  824. <end name="end"/>
  825. </workflow-app>
  826. ------------------------------------------Hue 集成时HDFS与YARN的配置---------------------------------------------------
  827. core-site.xml配置
  828. <property>
  829. <name>hadoop.proxyuser.hue.hosts</name>
  830. <value>*</value>
  831. </property>
  832. <property>
  833. <name>hadoop.proxyuser.hue.groups</name>
  834. <value>*</value>
  835. </property>
  836. hue.ini
  837. secret_key=jFE93j;2[290-eiw.KEiwN2s3['d;/.q[eIW^y#e=+Iei*@Mn<qW5o
  838. http_host=hadoop.zxk.com
  839. http_port=8888
  840. time_zone=Asia/Shanghai
  841. django_debug_mode=false
  842. http_500_debug_mode=false
  843. [[hdfs_clusters]]
  844. # HA support by using HttpFs
  845. [[[default]]]
  846. fs_defaultfs=hdfs://hadoop.zxk.com:8020
  847. webhdfs_url=http://hadoop.zxk.com:50070/webhdfs/v1
  848. hadoop_conf_dir=/opt/modules/hadoop-2.5.0/etc/hadoop
  849. # Configuration for YARN (MR2)
  850. [[yarn_clusters]]
  851. [[[default]]]
  852. resourcemanager_host=hadoop.zxk.com
  853. resourcemanager_port=8032
  854. submit_to=True
  855. resourcemanager_api_url=http://hadoop.zxk.com:8088
  856. proxy_api_url=http://hadoop.zxk.com:8088
  857. history_server_api_url=http://hadoop.zxk.com:19888
  858. [filebrowser]
  859. archive_upload_tempdir=/tmp
  860. ------------------------------------------Hue 集成时Hive的配置------------------------------------------------
  861. 启动 HiveServer2 bin/bin/hiveserver2
  862. 启动 Metastore bin/hive --service metastore
  863. hive-site.xml
  864. <!--HiveServer2 -->
  865. <property>
  866. <name>hive.server2.thrift.port</name>
  867. <value>10000</value>
  868. </property>
  869. <property>
  870. <name>hive.server2.thrift.bind.host</name>
  871. <value>hadoop.zxk.com</value>
  872. </property>
  873. <!--Remote Metastore -->
  874. <property>
  875. <name>hive.metastore.uris</name>
  876. <value>thrift://hadoop.zxk.com:9083</value>
  877. </property>
  878. hue.ini
  879. [beeswax]
  880. hive_server_host=hadoop.zxk.com
  881. hive_server_port=10000
  882. hive_conf_dir=/opt/modules/hive-0.13.1/conf
  883. server_conn_timeout=120
  884. [librdbms]
  885. [[databases]]
  886. [[[sqlite]]]
  887. nice_name=SQLite
  888. name=/opt/app/hue-3.7.0-cdh5.3.6/desktop/desktop.db
  889. engine=sqlite
  890. [[[mysql]]]
  891. nice_name="My SQL DB"
  892. name=test
  893. engine=mysql
  894. host=hadoop.zxk.com
  895. port=3306
  896. user=root
  897. password=123456
  898. ------------------------------------------Hue 集成时Oozie的配置------------------------------------------------
  899. 启动Oozie bin/oozied.sh start
  900. -----------------------------------------Hue 集成 Oozie时的share目录-----------------------------------------
  901. bin/oozie-setup.sh sharelib create \
  902. -fs hdfs://hadoop.zxk.com:8020 \
  903. -locallib oozie-sharelib-4.0.0-cdh5.3.6-yarn.tar.gz
  904. ----------------------------------------HBase Shell 的使用----------------------------------------------------
  905. 在 Xshell 中 使用 hbase shell 进入后 无法删除 问题:
  906. 在hbase shell下,误输入的指令不能使用backspace和delete删除
  907. 进入到XShell   文件 --> 属性   -->  终端  -->  键盘 
  908. 在 DELETE键序列  和 BACKSPACE键序列 中都选择  ASCII 127
  909. 表的管理
  910. 1)查看有哪些表
  911. hbase(main):002:0> list
  912. 2)创建表
  913. # 语法:create <table>, {NAME => <family>, VERSIONS => <VERSIONS>}
  914. # 例如:创建表t1,有两个family name:f1,f2,且版本数均为2
  915. hbase(main):002:0> create 't1',{NAME => 'f1', VERSIONS => 2},{NAME => 'f2', VERSIONS => 2}
  916. hbase(main):002:0> create 'user','info'
  917. 2-1)查看表的结构
  918. hbase(main):002:0> describe 表名
  919. 2-2)插入数据
  920. put 'user','10001','info:name','zx'
  921. put 'user','10001','info:age','22'
  922. put 'user','10001','info:sex','female'
  923. put 'user','10001','info:address','henan'
  924. put 'user','10002','info:name','cl'
  925. put 'user','10002','info:age','22'
  926. put 'user','10002','info:address','henan'
  927. put 'user','10003','info:name','zxk'
  928. put 'user','10003','info:age','22'
  929. 2-3)查询数据
  930. get 'user','10001'
  931. get 'user','10001','info:address'
  932. scan 'user'
  933. scan 'user', {COLUMNS => ['info:name', 'info:age'], STARTROW => '10002'}
  934. 3)删除表
  935. 分两步:首先disable,然后drop
  936. 例如:删除表t1
  937. hbase(main):002:0> disable 't1'
  938. hbase(main):002:0> drop 't1'
  939. 4)查看表的结构
  940. # 语法:describe <table>
  941. # 例如:查看表t1的结构
  942. hbase(main):002:0> describe 't1'
  943. 5)修改表结构
  944. 修改表结构必须先disable
  945. # 语法:alter 't1', {NAME => 'f1'}, {NAME => 'f2', METHOD => 'delete'}
  946. # 例如:修改表test1的cf的TTL为180天
  947. hbase(main):002:0> disable 'test1'
  948. hbase(main):002:0> alter 'test1',{NAME=>'body',TTL=>'15552000'},{NAME=>'meta', TTL=>'15552000'}
  949. hbase(main):002:0> enable 'test1'
  950. ------------------------------------------HBase 数据存储----------------------------------------------------------
  951. 一、HBase数据检索流程
  952. (1)客户端读或写一个表的数据,首先链接Zookeeper,因为需要到Zookeeper中找读的数据,表是
  953. 通过Region来管理,每个Region由RegionServer管理,每个Region都有startkey及endkey。
  954. (2)HBase的表格分为User Tables(用户表)和Catalog Tables(系统自带表)。
  955. (3)User Tables(用户表)包含user信息、region信息(startkey和endkey)。例:user表的
  956. region-01存在regionserver-03中。该信息是保存在meta-table中。
  957. (4)在HBase新版本中,有类似于RDBMS(关系数据库管理系统)中DataBase的命名空间的概念。
  958. HBase的所有表都在data目录下,data下包含default目录和hbase目录,这里的目录就是命名空间的概念。
  959. (5)用户自定义的表默认情况下命名空间为default,而系统自带的元数据表的命名空间为hbase。
  960. (6)meta表只有一个Region,它的Region也需要RegionServer管理,即为meta-region-server的功能。
  961. 用户首先找到meta-region-server,然后找到meta表,scan命令可以看到表格中column被什么server管理。
  962. (7)综上所述,用户表由很多region组成,region信息存储在hbase:meta中。用户表的每一个region
  963. 都有key。Client需要先读zookeeper,其实通过meta-region-server找到的是meta表的region,找到后扫描
  964. meta表的数据,然后再找到数据再操作。
  965. 二、HBase数据存储
  966. 2.1 HBase结构详解
  967. HBase能高速实现数据存储和访问源于HBase数据存储。
  968. 1. 连接Zookeeper,从Zookeeper中找到要读的数据。我们需要知道表中RowKey在region的位置。
  969. 2. 客户端查找HRegionServer,HRegionServer管理众多Region。
  970. 3. HMaster也需要连接Zookeeper,连接的作用是:HMaster需要知道哪些HRegionServer是活动
  971. 的及HRegionServer所在的位置,然后管理HRegionServer。
  972. 4. HBase内部是把数据写到HDFS上的,DFS有客户端。
  973. 5. Region中包含HLog、Store。一张表有几个列簇,就有几个Store。Store中有很多memStore
  974. 及StoreFile。StoreFile是对HFile的封装。StoreFile真正存储在HDFS上。
  975. 6. 写数据时,先往HLog上写一份,再往memStore上写一份。当memStore达到一定大小则往StoreFile上写。
  976. 若memStore数据有丢失,则从HLog上恢复。
  977. 7. 读数据先到memStore上读,再到StoreFile上读,之后合并。
  978. 2.2 HBase数据存储详解
  979.         1. HBase中的所有数据文件都存储在Hadoop HDFS文件系统上,主要包括两种文件类型:
  980.         1)HFile:HBase中KeyValue数据的存储格式,HFile是Hadoop的二进制格式文件,
  981. 实际上StoreFIle就是对HFile做了轻量级的包装,进行数据的存储。
  982.         2)HLog File:HBase中WAL(Write Ahead Log)的存储格式,物理上是Hadoop的Sequence File。
  983.         2. HRegionServer内部管理了一系列HRegion对象,每个HRegion对应了table中的一个region,
  984. HRegion中由多个HStore组成。每个HStore对应了Table中的一个column family的存储,可以看出
  985. 每个columnfamily其实就是一个集中的存储单元,因此最好将具备共同IO特性的column放在一个column family中,
  986. 这样最高效。
  987.         3. HStore存储是HBase存储的核心,由两部分组成,一部分是MemStore,一部分是StoreFile。
  988.         4. MemStore是 Sorted Memory Buffer,用户写入的数据首先会放入MemStore,当MemStore满了以后
  989. 会Flush成一个StoreFile(底层实现是HFile)。
  990.       5. HLog 文件结构:WAL意为Write ahead log,类似Mysql中的binlog,用来做灾难恢复。
  991. Hlog记录数据的所有变更,一旦数据修改,就可以从log中进行恢复。 
  992.         6. 每个HRegionServer维护一个HLog,而不是每个HRegion一个。这样不同region(来自不同table)
  993. 的日志会混在一起,这样做的目的是不断追加单个文件,相对于同时写多个文件而言,可以减少
  994. 磁盘寻址次数,因此可以提高对table的写性能。带来的麻烦是,如果一台HRegionServer下线,
  995. 为了恢复其上的region,需要将HRegionServer上的log进行拆分,然后分发到其它HRegionServer
  996. 上进行恢复。
  997. 2.3 用户写入数据流程
  998.     1. Client客户端写入数据后 -> 数据存入MemStore,一直到MemStore满之后 Flush成一个StoreFile,
  999. 直至增长到一定阈值 -> 触发Compact合并操作 -> 多个StoreFile合并成一个StoreFile。
  1000.     2. 同时进行版本合并和数据删除 -> 当StoreFiles Compact后,
  1001. 逐步形成越来越大的StoreFile ->单个StoreFile大小超过一定阈值后,
  1002. 触发Split操作,把当前Region分成2个Region,Region会下线,新分出的2个孩子Region会被HMaster分配到
  1003. 相应的HRegionServer上,使得原先1个Region的压力得以分流到2个Region上。
  1004. export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
  1005. export HADOOP_HOME=/opt/modules/hadoop-2.5.0
  1006. HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp` $HADOOP_HOME/bin/yarn jar $HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar
  1007. CellCounter: Count cells in HBase table
  1008. completebulkload: Complete a bulk data load.
  1009. copytable: Export a table from local cluster to peer cluster
  1010. export: Write table data to HDFS.
  1011. import: Import data written by Export.
  1012. importtsv: Import data in TSV format.
  1013. rowcounter: Count rows in HBase table
  1014. verifyrep: Compare the data from tables in two different clusters. WARNING: It doesn't work for incrementColumnValues'd cells since the timestamp is changed after being appended to the log.
  1015. ------------------------------------------HBase 与MapReduce集成------------------------------------------
  1016. (1)上传Jar包
  1017. (2)export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
  1018. (3)export HADOOP_HOME=/opt/modules/hadoop-2.5.0
  1019. (4)HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp` $HADOOP_HOME/bin/yarn jar $HADOOP_HOME/jars/hbase-mr-user2basic.jar
  1020. ------------------------------------------HBase Importtsv的使用----------------------------------------
  1021. 10001 21 male henan 13243169133
  1022. 10002 22 male yunnan 13243169133
  1023. 10003 23 male dali 13243169133
  1024. 10004 24 male henan 13243169133
  1025. 10005 25 male henan 13243169133
  1026. export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
  1027. export HADOOP_HOME=/opt/modules/hadoop-2.5.0
  1028. HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp` $HADOOP_HOME/bin/yarn jar \
  1029. $HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar importtsv \
  1030. -Dimporttsv.columns=HBASE_ROW_KEY,\
  1031. info:name,info:age,info:sex,info:address,info:phone \
  1032. student \
  1033. hdfs://hadoop.zxk.com:8020/user/root/hbase/importtsv
  1034. export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
  1035. export HADOOP_HOME=/opt/modules/hadoop-2.5.0
  1036. HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp` $HADOOP_HOME/bin/yarn jar \
  1037. $HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar importtsv \
  1038. -Dimporttsv.columns=HBASE_ROW_KEY,\
  1039. info:name,info:age,info:sex,info:address,info:phone \
  1040. -Dimporttsv.bulk.output=hdfs://hadoop.zxk.com:8020/user/root/hbase/hfileoutput \
  1041. student \
  1042. hdfs://hadoop.zxk.com:8020/user/root/hbase/importtsv
  1043. export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
  1044. export HADOOP_HOME=/opt/modules/hadoop-2.5.0
  1045. HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp` $HADOOP_HOME/bin/yarn jar \
  1046. $HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar \
  1047. completebulkload \
  1048. hdfs://hadoop.zxk.com:8020/user/root/hbase/hfileoutput \
  1049. student
  1050. ------------------------------------------HBase 表与命名空间NameSpace的使用----------------------------------------
  1051. 在HBase中,默认情况创建的表,都在【default】命名空间下,在Hbase中
  1052. 系统的命名空间
  1053. meta
  1054. namespace
  1055. 命名空间NameSpace中的命令
  1056. Group name: namespace
  1057. Commands: alter_namespace, create_namespace, describe_namespace, drop_namespace, list_namespace, list_namespace_tables
  1058. create 'ns1:t1', {NAME => 'f1', VERSIONS => 5}
  1059. create 'ns1:t1', 'cf' 等同于 create 'ns1:t1', {NAME => 'f1'}
  1060. create 'ns1:t2', {NAME => 'f1'}, {NAME => 'f2'}, {NAME => 'f3'}等同于create 'ns1:t3', 'f1', 'f2', 'f3'
  1061. ------------------------------------------HBase 表的预分区的创建----------------------------------------
  1062. 每一个Region会自动创建rowkeyd的范围[startkey,endkey):包括数值为startkey-->endkey-1
  1063. 默认情况的情况下,创建一个HBase表,自动为表分配一个Region。
  1064. 结合实际使用来看,无论是在测试环境还生产环境,我们创建好HBase一张表以后,需要往表中导入大量的数据。
  1065. 我们会将【文件中的数据】数据转化为 【hfile文件】,通过【bulk load】 加载到hbase 的表中去。每一个【Region】
  1066. 会被一个【RegionServer】管理。当数据过大,此时的Region分割为两个【Region】时,【RegionServer】会出问题
  1067. 解决方案:
  1068. 创建表时,多创建一些Region(依据表的数据rowkey进行设计,结合业务)
  1069. 例如:五个Region
  1070. 被多个RegionServer进行管理
  1071. 要点:
  1072. 在插入数据时,会向五个Region中分别插入对应的数据,均衡数据的插入
  1073. HBase 表的预分区创建
  1074. Region划分,依赖于rowkey,我们需要预先预估一些rowkey
  1075. 案例:
  1076. hbase> create 'ns1:t1', 'f1', SPLITS => ['10', '20', '30', '40']
  1077. hbase> create 't1', 'f1', SPLITS => ['10', '20', '30', '40']
  1078. hbase> create 't1', 'f1', SPLITS_FILE => 'splits.txt', OWNER => 'johndoe'
  1079. hbase> create 't1', {NAME => 'f1', VERSIONS => 5}, METADATA => { 'mykey' => 'myvalue' }
  1080. hbase> # Optionally pre-split the table into NUMREGIONS, using
  1081. hbase> # SPLITALGO ("HexStringSplit", "UniformSplit" or classname)
  1082. hbase> create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}
  1083. hbase> create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit', CONFIGURATION => {'hbase.hregion.scan.loadColumnFamiliesOnDemand' => 'true'}}
  1084. 案例:
  1085. create 'bflogs', 'info', SPLITS => ['20151001000000000', '20151011000000000', '20151021000000000']
  1086. 指定预估rowkey
  1087. 年月日时分秒毫秒
  1088. '20151001000000000'
  1089. '20151011000000000'
  1090. '20151021000000000'
  1091. create 'bflogs2', 'info', SPLITS_FILE => '/opt/datas/bflogs-split.txt'
  1092. 第三方式(很少使用)
  1093. create 't11', 'f11', {NUMREGIONS => 2, SPLITALGO => 'HexStringSplit'}
  1094. create 't12', 'f12', {NUMREGIONS => 4, SPLITALGO => 'UniformSplit'}
  1095. -----------------------------------------HBase 表的的创建思路----------------------------------------
  1096. 如何在海量数据中,获取需要的数据(查询的数据)。
  1097. 表的rowkey设计中:
  1098. 核心思想:
  1099. 依据rowkey查询最快
  1100. 对rowkey进行范围查询range
  1101. 前缀匹配
  1102. 索引表/辅助表(主表) 创建与设计
  1103. 列簇:info
  1104. 列:rowkey
  1105. 主表和索引表的数据 如何同步
  1106. >>> 程序,事务
  1107. >>> phoenix
  1108. >>> 只能使用JDBC方式,主表与索引表的数据才能同步
  1109. 创建索引表
  1110. >>> solr
  1111. lily
  1112. cloudera search
  1113. -----------------------------------------HBase 创建表时Sanppy压缩-----------------------------------------
  1114. 1)配置Haodop压缩
  1115. >>> bin/hadoop checknative
  1116. 2)配置HBase
  1117. >>> hadoop-snappy jar -> 放入到HBase的lib目录下
  1118. >>> 在HBase的lib目录下,创建native ,需要将本地库native ln -s 到 Hadoop 的lib目录的native下
  1119. >>> 配置HBase-site.xml
  1120. <property>
  1121. <name>hbase.regionserver.codecs</name>
  1122. <value>snappy</value>
  1123. </property>
  1124. -----------------------------------------HBase Memstore-----------------------------------------
  1125. 当RegionServer(RS)收到写请求的时候(write request),RS会将请求转至相应的Region。
  1126. 每一个Region都存储着一些列(a set of rows)。根据其列族的不同,将这些列数据存储在相应的列族中
  1127. (Column Family,简写CF)。不同的CFs中的数据存储在各自的HStore中,HStore由一个Memstore及一系列
  1128. HFile组成。
  1129. Memstore位于RS的主内存中,而HFiles被写入到HDFS中。当RS处理写请求的时候,数据首先写入到
  1130. Memstore,然后当到达一定的阀值的时候,Memstore中的数据会被刷到HFile中。
  1131. 用到Memstore最主要的原因是:存储在HDFS上的数据需要按照row key 排序。而HDFS本身被设计为
  1132. 顺序读写(sequential reads/writes),不允许修改。这样的话,HBase就不能够高效的写数据,因为要
  1133. 写入到HBase的数据不会被排序,这也就意味着没有为将来的检索优化。为了解决这个问题,HBase将最
  1134. 近接收到的数据缓存在内存中(in Memstore),在持久化到HDFS之前完成排序,然后再快速的顺序写入HDFS。
  1135. 需要注意的一点是实际的HFile中,不仅仅只是简单地排序的列数据的列表,详见Apache HBase I/O – HFile。
  1136. 除了解决“无序”问题外,Memstore还有一些其他的好处,例如:
  1137. 作为一个内存级缓存,缓存最近增加数据。一种显而易见的场合是,新插入数据总是比老数据频繁使用。
  1138. 在持久化写入之前,在内存中对Rows/Cells可以做某些优化。比如,当数据的version被设为1的时候,对于
  1139. 某些CF的一些数据,Memstore缓存了数个对该Cell的更新,在写入HFile的时候,仅需要保存一个最新的版本就好了,
  1140. 其他的都可以直接抛弃。
  1141. 有一点需要特别注意:每一次Memstore的flush,会为每一个CF创建一个新的HFile。在读方面相对来说就会
  1142. 简单一些:HBase首先检查请求的数据是否在Memstore,不在的话就到HFile中查找,最终返回merged的一个结果
  1143. 给用户。
  1144. HBase Memstore关注要点
  1145. 迫于以下几个原因,HBase用户或者管理员需要关注Memstore并且要熟悉它是如何被使用的:
  1146. Memstore有许多配置可以调整以取得好的性能和避免一些问题。HBase不会根据用户自己的使用模式来调整这些
  1147. 配置,你需要自己来调整。频繁的Memstore flush会严重影响HBase集群读性能,并有可能带来一些额外的负载。
  1148. Memstore flush的方式有可能影响你的HBase schema设计
  1149. 接下来详细讨论一下这些要点:
  1150. Configuring Memstore Flushes
  1151. 对Memstore Flush来说,主要有两组配置项:
  1152. 决定Flush触发时机
  1153. 决定Flush何时触发并且在Flush时候更新被阻断(block)
  1154. 第一组是关于触发“普通”flush,这类flush发生时,并不影响并行的写请求。该类型flush的配置项有:
  1155. hbase.hregion.memstore.flush.size
  1156. <property>
  1157. <name>hbase.hregion.memstore.flush.size</name>
  1158. <value>134217728</value>
  1159. </property>
  1160. base.regionserver.global.memstore.lowerLimit
  1161. <property>
  1162. <name>hbase.regionserver.global.memstore.lowerLimit</name>
  1163. <value>0.35</value>
  1164. </property>
  1165. 需要注意的是第一个设置是每个Memstore的大小,当你设置该配置项时,你需要考虑一下每台RS承载的
  1166. region总量。可能一开始你设置的该值比较小,后来随着region增多,那么就有可能因为第二个设置原因Memstore
  1167. 的flush触发会变早许多。
  1168. 第二组设置主要是出于安全考虑:有时候集群的“写负载”非常高,写入量一直超过flush的量,这时,
  1169. 我们就希望memstore不要超过一定的安全设置。在这种情况下,写操作就要被阻止(blocked)一直到memstore
  1170. 恢复到一个“可管理”(manageable)的大小。该类型flush配置项有:
  1171. hbase.regionserver.global.memstore.upperLimit
  1172. <property>
  1173. <name>hbase.regionserver.global.memstore.upperLimit</name>
  1174. <value>0.4</value>
  1175. </property>
  1176. hbase.hregion.memstore.block.multiplier
  1177. <property>
  1178. <name>hbase.hregion.memstore.block.multiplier</name>
  1179. <value>2</value>
  1180. </property>
  1181. 某个节点“写阻塞”对该节点来说影响很大,但是对于整个集群的影响更大。HBase设计为:
  1182. 每个Region仅属于一个RS但是“写负载”是均匀分布于整个集群(所有Region上)。
  1183. 有一个如此“慢”的节点,将会使得整个集群都会变慢(最明显的是反映在速度上)。
  1184. 提示:
  1185. 严重关切Memstore的大小和Memstore Flush Queue的大小。理想情况下,
  1186. Memstore的大小不应该达到hbase.regionserver.global.memstore.upperLimit的设置,
  1187. Memstore Flush Queue 的size不能持续增长。
  1188. 频繁的Memstore Flushes
  1189. 要避免“写阻塞”,貌似让Flush操作尽量的早于达到触发“写操作”的阈值为宜。
  1190. 但是,这将导致频繁的Flush操作,而由此带来的后果便是读性能下降以及额外的负载。
  1191. 每次的Memstore Flush都会为每个CF创建一个HFile。频繁的Flush就会创建大量的HFile。
  1192. 这样HBase在检索的时候,就不得不读取大量的HFile,读性能会受很大影响。
  1193. 为预防打开过多HFile及避免读性能恶化,HBase有专门的HFile合并处理(HFile Compaction Process)。
  1194. HBase会周期性的合并数个小HFile为一个大的HFile。明显的,有Memstore Flush产生的HFile越多,
  1195. 集群系统就要做更多的合并操作(额外负载)。更糟糕的是:Compaction处理是跟集群上的其他请求并行进行的。
  1196. 当HBase不能够跟上Compaction的时候(同样有阈值设置项),会在RS上出现“写阻塞”。像上面说到的,这是最最不希望的。
  1197. 提示:
  1198. 严重关切RS上Compaction Queue 的size。要在其引起问题前,阻止其持续增大。
  1199. 理想情况下,在不超过hbase.regionserver.global.memstore.upperLimit的情况下,Memstore应该尽可能多
  1200. 的使用内存(配置给Memstore部分的,而不是真个Heap的)。
  1201. ----------------------------------------------HBsse与Hive的集成----------------------------------------------
  1202. >> 数据存储在HBase中
  1203. >> hive 表的描述信息存储在hive中
  1204. hive-table <--映射--> hbase-table
  1205. hive-column<--映射-->hbase-rowkey,hbase-cf-column
  1206. handler
  1207. HBsse与Hive的集成两种方式
  1208. >> 管理表
  1209. 创建hive表的时候,指定数据存储在hbase表中。
  1210. 例如:
  1211. CREATE TABLE hbase_hive_table(key int, value string)
  1212. STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
  1213. WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf1:val")
  1214. TBLPROPERTIES ("hbase.table.name" = "xyz");
  1215. >> 外部表
  1216. 现在已经存在一个HBase表,需要对表中数据进行分析。
  1217. 例如:
  1218. CREATE EXTERNAL TABLE hbase_user(id int, name string,age int)
  1219. STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
  1220. WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:name,info:age")
  1221. TBLPROPERTIES ("hbase.table.name" = "user");
  1222. 本质:
  1223. Hive就是HBase客户端。
  1224. 需要一些配置,jar包
  1225. export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
  1226. export HIVE_HOME=/opt/modules/hive-0.13.1/lib
  1227. ln -s $HBASE_HOME/lib/hbase-common-0.98.6-hadoop2.jar $HIVE_HOME/hbase-common-0.98.6-hadoop2.jar
  1228. ln -s $HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar $HIVE_HOME/hbase-server-0.98.6-hadoop2.jar
  1229. ln -s $HBASE_HOME/lib/hbase-client-0.98.6-hadoop2.jar $HIVE_HOME/hbase-client-0.98.6-hadoop2.jar
  1230. ln -s $HBASE_HOME/lib/hbase-protocol-0.98.6-hadoop2.jar $HIVE_HOME/hbase-protocol-0.98.6-hadoop2.jar
  1231. ln -s $HBASE_HOME/lib/hbase-it-0.98.6-hadoop2.jar $HIVE_HOME/hbase-it-0.98.6-hadoop2.jar
  1232. ln -s $HBASE_HOME/lib/htrace-core-2.04.jar $HIVE_HOME/htrace-core-2.04.jar
  1233. ln -s $HBASE_HOME/lib/hbase-hadoop2-compat-0.98.6-hadoop2.jar $HIVE_HOME/hbase-hadoop2-compat-0.98.6-hadoop2.jar
  1234. ln -s $HBASE_HOME/lib/hbase-hadoop-compat-0.98.6-hadoop2.jar $HIVE_HOME/hbase-hadoop-compat-0.98.6-hadoop2.jar
  1235. ln -s $HBASE_HOME/lib/high-scale-lib-1.1.1.jar $HBASE_HOME/high-scale-lib-1.1.1.jar
  1236. ----------------------------------------------HBase与Hue的集成----------------------------------------------
  1237. 配置hue.ini
  1238. [hbase]
  1239. hbase_clusters=(Cluster|hadoop.zxk.com:9090)
  1240. hbase_conf_dir=/opt/modules/hbase-0.98.6-hadoop2/conf
  1241. 启动ThriftServer bin/hbase-daemon.sh start thrift
  1242. -----------------------------------------Scala运行MapReduce----------------------------------------
  1243. val lineadd = sc.textFile("hdfs://hadoop.zxk.com:8020/user/root/mapreduce/wordcount/input/wc.input")
  1244. val wordsAdd=lineadd.map(line =>line.split(" "))
  1245. val wordsAdd=lineadd.flatMap(line =>line.split(" "))
  1246. val keyvalRdds =wordsAdd.map(word=>(word,1))
  1247. val count= keyvalRdds.reduceByKey((a,b)=>(a+b))
  1248. sc.textFile("hdfs://hadoop.zxk.com:8020/user/root/mapreduce/wordcount/input/wc.input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
  1249. -----------------------------------------Scala sortByKey----------------------------------------
  1250. ## WordCount
  1251. val rdd = sc.textFile("hdfs://hadoop.zxk.com:8020/user/root/mapreduce/wordcount/input/wc.input")
  1252. val wordcount = rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)
  1253. wordcount.saveAsTextFile("hdfs://hadoop.zxk.com:8020/user/root/mapreduce/wordcount/sparkOutput89")
  1254. wordcount.sortByKey().collect # 默认情况是 升序
  1255. wordcount.sortByKey(true).collect # true情况是 升序
  1256. wordcount.sortByKey(false).collect # false情况是 升序
  1257. 按照value值进行降序
  1258. # Value Sort
  1259. wordcount.map(x => (x._2,x._1)).sortByKey(false).collect
  1260. wordcount.map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)).collect
  1261. # Top N
  1262. wordcount.map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)).take(3)
  1263. Group Top Key
  1264. WordCount 程序,前KEY值
  1265. 需求:
  1266. 类似MapReduce中的二次排序
  1267. 1) 按照第一个字段进行分组
  1268. 2) 对分组中的第二字段进行排序(降序)
  1269. 3) 获取每个分组Top N,比如获取前三个值
  1270. aa 78
  1271. bb 98
  1272. aa 80
  1273. cc 98
  1274. aa 69
  1275. cc 87
  1276. bb 97
  1277. cc 86
  1278. aa 97
  1279. bb 78
  1280. bb 34
  1281. cc 85
  1282. bb 92
  1283. cc 72
  1284. bb 32
  1285. bb 23
  1286. 功能分析:
  1287. (aa,list(78,80,69,97)) -> (aa,list(69,78,80,97)) -> (aa,list(69,78,80))
  1288. val rdd = sc.textFile("hdfs://hadoop.zxk.com:8020/user/root/spark/grouptop/input/score.input")
  1289. rdd.map(_.split(" ")).collect
  1290. Array(aa, 78)
  1291. rdd.map(_.split(" ")).map(x => (x(0),x(1))).collect
  1292. (aa,78)
  1293. rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.collect
  1294. (aa,CompactBuffer(78, 80, 69, 97))
  1295. rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.map(
  1296. x => {
  1297. val xx = x._1
  1298. val yy = x._2
  1299. yy
  1300. }
  1301. ).collect
  1302. Iterable[String]
  1303. Iterable 方法:
  1304. def toList: List[A]
  1305. 返回包含此遍历的迭代器的所有元素的列表
  1306. rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.map(
  1307. x => {
  1308. val xx = x._1
  1309. val yy = x._2
  1310. yy.toList
  1311. }
  1312. ).collect
  1313. List[String]
  1314. List(78, 80, 69, 97)
  1315. List 方法:
  1316. def sorted[B >: A]: List[A]
  1317. 根据排序对列表进行排序
  1318. rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.map(
  1319. x => {
  1320. val xx = x._1
  1321. val yy = x._2
  1322. yy.toList.sorted
  1323. }
  1324. ).collect
  1325. List[String]
  1326. List(69, 78, 80, 97)
  1327. List 方法:
  1328. def reverse: List[A]
  1329. 返回新列表,在相反的顺序元素
  1330. rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.map(
  1331. x => {
  1332. val xx = x._1
  1333. val yy = x._2
  1334. yy.toList.sorted.reverse
  1335. }
  1336. ).collect
  1337. List[String]
  1338. List(97, 80, 78, 69)
  1339. List 方法:
  1340. def take(n: Int): List[A]
  1341. 返回前n个元素
  1342. def takeRight(n: Int): List[A]
  1343. 返回最后n个元素
  1344. rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.map(
  1345. x => {
  1346. val xx = x._1
  1347. val yy = x._2
  1348. yy.toList.sorted.reverse.take(3)
  1349. }
  1350. ).collect
  1351. 要求返回的是一个元组对
  1352. rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.map(
  1353. x => {
  1354. val xx = x._1
  1355. val yy = x._2
  1356. (xx,yy.toList.sorted.reverse.take(3))
  1357. }
  1358. ).collect
  1359. val groupTopKeyRdd = rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.map(
  1360. x => {
  1361. val xx = x._1
  1362. val yy = x._2
  1363. (xx,yy.toList.sorted.reverse.take(3))
  1364. }
  1365. )
  1366. groupTopKeyRdd.saveAsTextFile("hdfs://hadoop.zxk.com:8020/user/root/spark/grouptop/output")
  1367. -----------------------------------------Scala 提交jar包----------------------------------------
  1368. bin/spark-submit \
  1369. --master spark://hadoop.zxk.com:7077 \
  1370. jars/sparkApp.jar
  1371. Spark-----------------------------------------Streaming Demo-----------------------------------------
  1372. 从Socket实时读取数据,进行实时处理
  1373. # rpm -ivh nc-1.84-22.el6.x86_64.rpm
  1374. ## 运行nc针对于端口号9999
  1375. $ nc -lk 9999
  1376. ## 运行Demo
  1377. bin/run-example streaming.NetworkWordCount hadoop.zxk.com 9999
  1378. -----------------------------------------Initializing StreamingContext-----------------------------------------
  1379. import org.apache.spark._
  1380. import org.apache.spark.streaming._
  1381. import org.apache.spark.streaming.StreamingContext._
  1382. val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
  1383. val ssc = new StreamingContext(conf, Seconds(5))
  1384. // read data
  1385. val lines = ssc.socketTextStream("localhost", 9999)
  1386. // process
  1387. val words = lines.flatMap(_.split(" "))
  1388. val pairs = words.map(word => (word, 1))
  1389. val wordCounts = pairs.reduceByKey(_ + _)
  1390. wordCounts.print()
  1391. ssc.start() // Start the computation
  1392. ssc.awaitTermination() // Wait for the computation to terminate
  1393. -----------------------------------------HDFS -----------------------------------------
  1394. import org.apache.spark._
  1395. import org.apache.spark.streaming._
  1396. import org.apache.spark.streaming.StreamingContext._
  1397. val ssc = new StreamingContext(sc, Seconds(5))
  1398. // read data
  1399. val lines = ssc.textFileStream("hdfs://hadoop.zxk.com:8020/user/root/streaming/input/hdfs/")
  1400. // process
  1401. val words = lines.flatMap(_.split("\t"))
  1402. val pairs = words.map(word => (word, 1))
  1403. val wordCounts = pairs.reduceByKey(_ + _)
  1404. wordCounts.print()
  1405. ssc.start() // Start the computation
  1406. ssc.awaitTermination() // Wait for the computation to terminate
  1407. -----------------------------------如何在Spark-shell中执行某个scala代码-----------------------------------
  1408. :load /opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/HDFSWordCount.scala
  1409. -----------------------------------将处理数据保存到HDFS-----------------------------------
  1410. import org.apache.spark._
  1411. import org.apache.spark.streaming._
  1412. import org.apache.spark.streaming.StreamingContext._
  1413. val ssc = new StreamingContext(sc, Seconds(5))
  1414. // read data
  1415. val lines = ssc.textFileStream("hdfs://hadoop.zxk.com:8020/user/root/streaming/input/hdfs/")
  1416. // process
  1417. val words = lines.flatMap(_.split("\t"))
  1418. val pairs = words.map(word => (word, 1))
  1419. val wordCounts = pairs.reduceByKey(_ + _)
  1420. wordCounts.saveAsTextFiles("hdfs://hadoop.zxk.com:8020/user/root/streaming/output/")
  1421. ssc.start() // Start the computation
  1422. ssc.awaitTermination() // Wait for the computation to terminate
  1423. -----------------------------------Spark Streaming + Flume Integration-----------------------------------
  1424. Flume有三个组件
  1425. Source ---> Channel ---> Sink(Spark Streaming)
  1426. import org.apache.spark._
  1427. import org.apache.spark.streaming._
  1428. import org.apache.spark.streaming.StreamingContext._
  1429. import org.apache.spark.streaming.flume._
  1430. import org.apache.spark.storage.StorageLevel
  1431. val ssc = new StreamingContext(sc, Seconds(5))
  1432. // read data
  1433. val stream = FlumeUtils.createStream(ssc, "hadoop.zxk.com", 9999, StorageLevel.MEMORY_ONLY_SER_2)
  1434. stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
  1435. ssc.start() // Start the computation
  1436. ssc.awaitTermination() // Wait for the computation to terminate
  1437. ------------------
  1438. bin/spark-shell --jars \
  1439. /opt/modules/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/spark-streaming-flume_2.10-1.3.0.jar,/opt/modules/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/flume-avro-source-1.5.0-cdh5.3.6.jar,/opt/modules/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/flume-ng-sdk-1.5.0-cdh5.3.6.jar
  1440. ------
  1441. run command:
  1442. bin/flume-ng agent -c conf -n a2 -f conf/flume-spark-push.sh -Dflume.root.logger=DEBUG,console
  1443. -----------------------------------启动Kafka集群-----------------------------------
  1444. 启动Kafka集群
  1445. nohup bin/kafka-server-start.sh config/server.properties &
  1446. 创建topic命令
  1447. bin/kafka-topics.sh --create --zookeeper hadoop.zxk.com:2181 --replication-factor 1 --partitions 1 --topic test
  1448. 查看已用topic
  1449. bin/kafka-topics.sh --list --zookeeper hadoop.zxk.com:2181
  1450. 生产数据
  1451. bin/kafka-console-producer.sh --broker-list hadoop.zxk.com:9092 --topic test
  1452. 消费数据
  1453. bin/kafka-console-consumer.sh --zookeeper hadoop.zxk.com:2181 --topic test --from-beginning
  1454. -----------------------------------Spark Streaming + Kafka Integration-----------------------------------
  1455. import java.util.HashMap
  1456. import org.apache.spark._
  1457. import org.apache.spark.streaming._
  1458. import org.apache.spark.streaming.StreamingContext._
  1459. import org.apache.spark.streaming.kafka._
  1460. val ssc = new StreamingContext(sc, Seconds(5))
  1461. val topicMap = Map("test" -> 1)
  1462. // read data
  1463. val lines = KafkaUtils.createStream(ssc, "hadoop-senior.ibeifeng.com:2181", "testWordCountGroup", topicMap).map(_._2)
  1464. val words = lines.flatMap(_.split(" "))
  1465. val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
  1466. wordCounts.print()
  1467. ssc.start() // Start the computation
  1468. ssc.awaitTermination() // Wait for the computation to terminate
  1469. ------------------
  1470. bin/spark-shell --jars \
  1471. /opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/spark-streaming-kafka_2.10-1.3.0.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/kafka_2.10-0.8.2.1.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/kafka-clients-0.8.2.1.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/zkclient-0.3.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/metrics-core-2.2.0.jar
  1472. 第两种方式
  1473. -----------------------------------Spark Streaming + Kafka Integration-----------------------------------
  1474. import kafka.serializer.StringDecoder
  1475. import org.apache.spark._
  1476. import org.apache.spark.streaming._
  1477. import org.apache.spark.streaming.StreamingContext._
  1478. import org.apache.spark.streaming.kafka._
  1479. val ssc = new StreamingContext(sc, Seconds(5))
  1480. val kafkaParams = Map[String, String]("metadata.broker.list" -> "hadoop-senior.ibeifeng.com:9092")
  1481. val topicsSet = Set("test")
  1482. // read data
  1483. val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
  1484. val lines = messages.map(_._2)
  1485. val words = lines.flatMap(_.split(" "))
  1486. val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
  1487. wordCounts.print()
  1488. ssc.start() // Start the computation
  1489. ssc.awaitTermination() // Wait for the computation to terminate
  1490. ---------------------------------------------------------------------------------------------------------
  1491. bin/spark-shell --jars \
  1492. /opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/spark-streaming-kafka_2.10-1.3.0.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/kafka_2.10-0.8.2.1.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/kafka-clients-0.8.2.1.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/zkclient-0.3.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/metrics-core-2.2.0.jar
  1493. -----------------------------------UpdataStateByKey-----------------------------------
  1494. import kafka.serializer.StringDecoder
  1495. import org.apache.spark._
  1496. import org.apache.spark.streaming._
  1497. import org.apache.spark.streaming.StreamingContext._
  1498. import org.apache.spark.streaming.kafka._
  1499. val ssc = new StreamingContext(sc, Seconds(5))
  1500. ssc.checkpoint(".")
  1501. val kafkaParams = Map[String, String]("metadata.broker.list" -> "hadoop-senior.ibeifeng.com:9092")
  1502. val topicsSet = Set("test")
  1503. val updateFunc = (values: Seq[Int], state: Option[Int]) => {
  1504. val currentCount = values.sum
  1505. val previousCount = state.getOrElse(0)
  1506. Some(currentCount + previousCount)
  1507. }
  1508. // read data
  1509. val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
  1510. val lines = messages.map(_._2)
  1511. val words = lines.flatMap(_.split(" "))
  1512. val wordDstream = words.map(x => (x, 1))
  1513. val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
  1514. stateDstream.print()
  1515. ssc.start() // Start the computation
  1516. ssc.awaitTermination() // Wait for the computation to terminate
  1517. -----------------------------------隐式转换--------------------------------------------------
  1518. 在scala中,有一个很重要的功能,就是隐式转换
  1519. 比如
  1520. A类对象,->通过一个函数将对象转换成另外一个对象 B类对象
  1521. /**
  1522. * Return a new "state" DStream where the state for each key is updated by applying
  1523. * the given function on the previous state of the key and the new values of each key.
  1524. * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
  1525. * @param updateFunc State update function. If `this` function returns None, then
  1526. * corresponding state key-value pair will be eliminated.
  1527. * @tparam S State type
  1528. */
  1529. def updateStateByKey[S: ClassTag](
  1530. updateFunc: (Seq[V], Option[S]) => Option[S]
  1531. ): DStream[(K, S)] = ssc.withScope {
  1532. updateStateByKey(updateFunc, defaultPartitioner())
  1533. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/389918
推荐阅读
相关标签
  

闽ICP备14008679号