当前位置:   article > 正文

Hadoop、hive、sqoop入门及完整小例子_sqoop,hive,hadoop

sqoop,hive,hadoop

Hadoop

MapReduce 和 HDFS

有自己的RPC和序列化机制

hadoop1.x

HDFS:在1.x中的NameNode只可能有一个,虽然可以通过SecondaryNameNodeNameNode进行数据同步备份,但是总会存在一定的时延,如果NameNode挂掉,但是如果有部份数据还没有同步到SecondaryNameNode上,是可能会存在着数据丢失的问题(冷备份、定期checkpoint、单点问题)。

MapReduce:

(1)JobTracker 是 Map-reduce 的集中处理点,存在单点故障;

(2)JobTracker 完成了太多的任务,造成了过多的资源消耗,当 map-reduce job 非常多的时候,会造成很大的内存开销,潜在来说,也增加了 JobTracker 失效的风险,这也是业界普遍总结出老 Hadoop 的 Map-Reduce 只能支持 4000 节点主机的上限;

 


流程:

(1)首先用户程序 (JobClient) 提交了一个 job,job 的信息会发送到 Job Tracker 中,Job Tracker 是 Map-reduce 框架的中心,他需要与集群中的机器定时通信 (heartbeat), 需要管理哪些程序应该跑在哪些机器上,需要管理所有 job 失败、重启等操作。

(2)TaskTracker 是 Map-reduce 集群中每台机器都有的一个部分,他做的事情主要是监视自己所在机器的资源情况。

(3)TaskTracker 同时监视当前机器的 tasks 运行状况。TaskTracker 需要把这些信息通过 heartbeat发送给JobTracker,JobTracker 会搜集这些信息以给新提交的 job 分配运行在哪些机器上。

hadoop2.x YARN+MapReduce

HDFS:HA

YARN 并不是下一代MapReduce(MRv2),下一代MapReduce与第一代MapReduce(MRv1)在编程接口、数据处理引擎(MapTask和ReduceTask)是完全一样的, 可认为MRv2重用了MRv1的这些模块,不同的是资源管理和作业管理系统,MRv1中资源管理和作业管理均是由JobTracker实现的,集两个功能于一身,而在MRv2中,将这两部分分开了。 其中,作业管理由ApplicationMaster实现,而资源管理由新增系统YARN完成,由于YARN具有通用性,因此YARN也可以作为其他计算框架的资源管理系统,不仅限于MapReduce,也是其他计算框架(例如Spark)的管理平台。


ResourceManager:全局资源管理器,基于应用程序对资源的需求进行调度的 ; 每一个应用程序需要不同类型的资源因此就需要不同的容器。资源按需调度,包括:内存,CPU,磁盘,网络等等。

NodeManager:节点代理,监控资源使用情况并且向调度器汇报。

ApplicationMaster:类似JobTracker,向调度器索要适当的资源容器,结合从 ResourceManager 获得的资源和 NodeManager 协同工作来运行和监控任务。

Container:类似TaskTracker,运行Map和Reduce任务。

编程

Mapper:

  1. protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
  2. String line = value.toString();
  3. String[] splited = line.split("\t");       
  4. for (String word : splited) {
  5. context.write(new Text(word), new LongWritable(1));
  6. }
  7. }


Reducer:

  1. protected void reduce(Text k2, Iterable<LongWritable> v2s, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
  2. long count = 0L;
  3. for (LongWritable v2 : v2s) {
  4. count += v2.get();
  5. }
  6. LongWritable v3 = new LongWritable(count);
  7. context.write(k2, v3);
  8. }


执行过程:

输入分片(Input split):跟HDFS块大小有关,输入分片调整,合并小文件

Map阶段

Combiner阶段:提高网络传输效率,原则是combiner的输入不会影响到reduce计算的最终输入,例如:如果计算只是求总数,最大值,最小值可以使用combiner,但是做平均值计算使用combiner的话,最终的reduce计算结果就会出错

Shuffle阶段:map输出一般比较大,先放到内存缓冲区,再spill到磁盘(对应溢出文件),多个溢出文件最后合并、partition(对应reducer)

Reduce阶段

进阶小例子:

  1. public static class AverageMaper extends Mapper<Object,Text,Text,Text>
  2. {
  3. //private final static IntWritable one=new IntWritable(1);
  4. private static Text word=new Text();
  5. public void map(Object key,Text value,Context context) throws
  6. IOException,InterruptedException
  7. {
  8. StringTokenizer itr=new StringTokenizer(value.toString());
  9. while(itr.hasMoreTokens())
  10. {
  11. word.set(itr.nextToken());
  12. if(itr.hasMoreTokens())
  13. context.write(word, new Text(itr.nextToken()+",1"));
  14. }
  15. }
  16. }
  17. public static class AveragerCombine extends Reducer<Text,Text,Text,Text>
  18. {
  19. public void reduce(Text key,Iterable<Text> values,Context context) throws
  20. IOException,InterruptedException
  21. {
  22. int sum=0;
  23. int cnt=0;
  24. for(Text val:values)
  25. {
  26. String []str=val.toString().split(",");
  27. sum+=Integer.parseInt(str[0]);
  28. cnt+=Integer.parseInt(str[1]);
  29. }
  30. context.write(key,new Text(sum+","+cnt));
  31. }
  32. }
  33. public static class AveragerReduce extends Reducer<Text,Text,Text,DoubleWritable>
  34. {
  35. public void reduce(Text key,Iterable<Text> values,Context context) throws
  36. IOException,InterruptedException
  37. {
  38. int sum=0;
  39. int cnt=0;
  40. for(Text val:values)
  41. {
  42. String []str=val.toString().split(",");
  43. sum+=Integer.parseInt(str[0]);
  44. cnt+=Integer.parseInt(str[1]);
  45. }
  46. double res=(sum*1.0)/cnt;
  47. context.write(key, new DoubleWritable(res));
  48. }
  49. }

试验


  1. hdfs dfs -mkdir -p /user/who
  2. hdfs dfs -mkdir input
  3. hdfs dfs -put ./etc/hadoop/*.xml input
  4. hdfs dfs -ls /user/who/input 或者 hdfs dfs -ls input
  5. // output 目录不能存在
  6. hdfs dfs -rm -r output
  7. // hadoop grep 例子
  8. hadoop jar ./share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.1.jar grep input output 'dfs[a-z.]'
  9. hdfs dfs -cat output/* 或者先取回本地再查看 hdfs dfs -get output ./output
  10. // 直接文本查看
  11. hdfs dfs -text /user/who/input/core-site.xml
  12. // 本地验证
  13. find ./etc/ -type f -name *.xml|xargs grep 'dfs[a-z.]' - --color
  14. // hadoop wordcount 例子
  15. hadoop jar ./share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.1.jar wordcount input output

HDFS信息:http://192.168.56.102:50070/dfshealth.html

MapReduce查看:http://192.168.56.102:8088/cluster/apps

Hive

Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的sql查询功能,可以将sql语句转换为MapReduce任务进行运行。 其优点是学习成本低,可以通过类SQL语句快速实现简单的MapReduce统计,不必开发专门的MapReduce应用,十分适合数据仓库的统计分析。

关键配置

创建目录并赋予权限:

hdfs dfs -mkdir -p /user/hive/warehouse
hdfs dfs -mkdir -p /user/hive/tmp
hdfs dfs -mkdir -p /user/hive/log
hdfs dfs -chmod g+w /user/hive/warehouse
hdfs dfs -chmod g+w /usr/hive/tmp
hdfs dfs -chmod g+w /usr/hive/log

配置目录:

  1. <property>
  2. <name>hive.exec.scratchdir</name>
  3. <value>/user/hive/tmp</value>
  4. <description>HDFS root scratch dir for Hive jobs which gets created with write all (733) permission. For each connecting user, an HDFS scratch dir: ${hive.exec.scratchdir}/<username> is created, with ${hive.scratch.dir.permission}.</description>
  5. </property>
  6. <property>
  7. <name>hive.metastore.warehouse.dir</name>
  8. <value>/user/hive/warehouse</value>
  9. <description>location of default database for the warehouse</description>
  10. </property>
  11. <property>
  12. <name>hive.querylog.location</name>
  13. <value>/user/hive/log</value>
  14. <description>Location of Hive run time structured log file</description>
  15. </property>

Hive MetaStore配置,默认使用derby

  1. <property>
  2. <name>javax.jdo.option.ConnectionURL</name>
  3. <value>jdbc:mysql://localhost:3306/hive?createDatabaseIfNotExist=true&characterEncoding=UTF-8&useSSL=false</value>
  4. </property>
  5. <property>
  6. <name>javax.jdo.option.ConnectionDriverName</name>
  7. <value>com.mysql.jdbc.Driver</value>
  8. </property>
  9. <property>
  10. <name>javax.jdo.option.ConnectionUserName</name>
  11. <value>hive</value>
  12. </property>
  13. <property>
  14. <name>javax.jdo.option.ConnectionPassword</name>
  15. <value>hive</value>
  16. </property>

注意
:mysql-connector-java-*.jar 放入Hive的lib目录

语法

许多关系型数据库都提供了命名空间的概念,用于划分不同的数据库或者Schema。例如MySQL支持的Database概念,PostgreSQL支持的namespace概念。Hive也有自己的schema的概念:

  1. CREATE DATABASE dbname;
  2. USE dbname;
  3. DROP DATABASE dbname;
  4. SHOW DATABASES; // 默认 default

(1)如何执行:

  1. // 进入hive命令行后可以直接执行dfs命令,如 dfs -ls /user/hive
  2. hive
  3. hive -e "show tables"
  4. hive -f create_table.hql

(2)表操作:

内部表和外部表的区别:

内部表:将数据移动到对应的warehouse目录下,速度非常快,不会对数据是否符合定义的Schema做校验,这个工作通常在读取的时候进行,称为Schema On Read。使用DROP语句删除后,其数据和表的元数据都被删除。

外部表:不会把数据移动到warehouse目录中。事实上,Hive甚至不会校验外部表的目录是否存在。这使得我们可以在创建表格之后再创建数据。当删除外部表时,Hive只删除元数据,而外部数据不动。

适用场景:大多数情况下,这两者的区别不是很明显。如果数据的所有处理都在Hive中进行,那么更倾向于选择内部表。但是如果Hive和其他工具针对相同的数据集做处理,外部表更合适。一种常见的模式是使用外部表访问存储的HDFS(通常由其他工具创建)中的初始数据,然后使用Hive转换数据并将其结果放在内部表中。相反,外部表可以用于将Hive的处理结果导出供其他应用使用。使用外部表的另一种场景是针对一个数据集,关联多个Schema。

外部表创建:

  1. CREATE EXTERNAL TABLE external_table (dummy STRING)
  2. LOCATION '/user/root/external_table';

表操作:

  1. SHOW TABLES;
  2. ALTER TABLE source RENAME TO target;
  3. ALTER TABLE source ADD COLUMNS (col3 STRING);
  4. DROP TABLE source;
  5. TRUNCATE TABLE my_table;

直接创建:

  1. CREATE TABLE
  2. IF NOT EXISTS useracc (
  3. dt string,
  4. time string,
  5. x_forwarded_for string,
  6. client_ip string,
  7. server_ip string,
  8. http_status string,
  9. content_length string,
  10. time_taken string,
  11. http_method string,
  12. url string,
  13. query_string string
  14. ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' STORED AS TEXTFILE;

复制表结构:

create table if not exists table002 like table001;

CTAS语句:

  1. CREATE TABLE table002
  2. AS select ip, time from table001;

(3)插入数据:

直接插入:

insert into users values (0,'Nat'),(2,'Joe'),(3,'Kay'),(4,'Ann');

本地文件系统导入:

LOAD DATA LOCAL INPATH "/home/hadoopUser/data/test1.txt"  INTO TABLE test1;

HDFS导入:

LOAD DATA INPATH "/input/test1.txt" OVERWRITE INTO TABLE test1;  

从查询结果导入:

INSERT INTO TABLE test4 SELECT * FROM test1; 

CTAS语句:

  1. CREATE TABLE table002
  2. AS select ip, time from table001;

多表插入:

  1. FROM records
  2. INSERT INTO TABLE stations_by_year
  3. SELECT year ,COUNT(DISTINCT station)
  4. GROUP BY year
  5. INSERT INTO TABLE record_by_year
  6. SELECT year,count(1)
  7. GROUP BY year
  8. INSERT INTO TABLE good_records_by_year
  9. SELECT year , count(1)
  10. WHERE temperature != 9999 AND quality in (0,1,4,5,9)
  11. GROUP BY year;

(4)分区:

Hive将表划分为分区,Partition根据分区字段进行。分区可以让数据的部分查询变得更快。我们将用于分区的字段成为分区字段,但是在数据文件中,不存在这些字段的值,这些值是从目录中推断出来的。但是在SELECT语句中,我们依然可使用分区字段。

静态分区:

  1. create table if not exists partition_table002 like partition_table001;
  2. insert overwrite table partition_table002 partition (dt='20150617', ht='00') select name, ip from partition_table001 where dt='20150607';

动态分区:

  1. set hive.exec.dynamic.partition=true;
  2. set hive.exec.dynamic.partition.mode=nonstrict;
  3. insert overwrite table partition_table002 partition (dt, ht) select * from partition_table001 where dt='20150617';
  4. INSERT OVERWRITE TABLE T PARTITION (dt, ht) SELECT key, value, dt, ht FROM srcpart WHERE dt is not null and ht>10;

hive先获取select的最后两个位置的dt和ht参数值,然后将这两个值填写到insert语句partition中的两个dt和ht变量中,即动态分区是通过位置来对应分区值的。原始表select出来的值和输出partition的值的关系仅仅是通过位置来确定的,和名字并没有关系,比如这里dt和st的名称完全没有关系。

混合使用:

  1. INSERT OVERWRITE TABLE T PARTITION (ds='2010-03-03', hr) SELECT key, value, /*ds,*/ hr FROM srcpart WHERE ds is not null and hr>10; // right
  2. INSERT OVERWRITE TABLE T PARTITION (ds, hr = 11) SELECT key, value, ds/*, hr*/ FROM srcpart WHERE ds is not null and hr=11; // wrong

(5)分桶:

对于每一个表(table)或者分区,Hive可以进一步组织成桶。Hive也是针对某一列进行桶的组织。Hive采用对列值哈希,然后除以桶的个数求余的方式决定该条记录存放在哪个桶当中。采用桶能够带来一些好处,比如JOIN操作。对于JOIN操作两个表有一个相同的列,如果对这两个表都进行了桶操作。那么将保存相同列值的桶进行JOIN操作就可以,可以大大较少JOIN的数据量。

bucket主要作用:数据sampling;提升某些查询操作效率,例如mapside join。

  1. set hive.enforce.bucketing = true;
  2. create table student(id INT, age INT, name STRING)
  3. partitioned by(stat_date STRING)
  4. clustered by(id) sorted by(age) into 2 bucket
  5. row format delimited fields terminated by ',';
  6. from student_tmp
  7. insert overwrite table student partition(stat_date="20120802")
  8. select id,age,name where stat_date="20120802" sort by age;

'set hive.enforce.bucketing = true'  可以自动控制上一轮reduce的数量从而适配bucket的个数,当然,用户也可以自主设置mapred.reduce.tasks去适配bucket 个数,推荐使用'set hive.enforce.bucketing = true' 。

抽样语法:TABLESAMPLE(BUCKET x OUT OF y)

 select * from student tablesample(bucket 1 out of 2 on id);

y必须是table总bucket数的倍数或者因子。hive根据y的大小,决定抽样的比例。例如,table总共分了64份,当y=32时,抽取 (64/32=)2个bucket的数据,当y=128时,抽取(64/128=)1/2个bucket的数据。x表示从哪个bucket开始抽取。例 如,table总bucket数为32,tablesample(bucket 3 out of  16),表示总共抽取(32/16=)2个bucket的数据,分别为第3个bucket和第(3+16=)19个bucket的数据。

试验

具体步骤参考另一篇文章:大数据串讲-从日志文件分析访问量最高的10个接口及响应访问量

Sqoop


Sqoop(发音:skup)是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql...)间进行数据的传递,可以将一个关系型数据库 (例如 : MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。
Sqoop项目开始于2009年,最早是作为Hadoop的一个第三方模块存在,后来为了让使用者能够快速部署,也为了让开发人员能够更快速的迭代开发,Sqoop独立成为一个 Apache项目。

数据收集

手动SCP

Rsync命令同步文件

Sqoop从关系型数据库导入

Flume收集

具体参考另一篇文章:Apache Flume 日志收集案例

Logstash收集

ELK(ElasticSearch、Logstash、Kibana)三搭档之一



声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/782540
推荐阅读
相关标签
  

闽ICP备14008679号