赞
踩
MapReduce 和 HDFS
有自己的RPC和序列化机制
HDFS:在1.x中的NameNode只可能有一个,虽然可以通过SecondaryNameNode与NameNode进行数据同步备份,但是总会存在一定的时延,如果NameNode挂掉,但是如果有部份数据还没有同步到SecondaryNameNode上,是可能会存在着数据丢失的问题(冷备份、定期checkpoint、单点问题)。
MapReduce:
(1)JobTracker 是 Map-reduce 的集中处理点,存在单点故障;
(2)JobTracker 完成了太多的任务,造成了过多的资源消耗,当 map-reduce job 非常多的时候,会造成很大的内存开销,潜在来说,也增加了 JobTracker 失效的风险,这也是业界普遍总结出老 Hadoop 的 Map-Reduce 只能支持 4000 节点主机的上限;
流程:
(1)首先用户程序 (JobClient) 提交了一个 job,job 的信息会发送到 Job Tracker 中,Job Tracker 是 Map-reduce 框架的中心,他需要与集群中的机器定时通信 (heartbeat), 需要管理哪些程序应该跑在哪些机器上,需要管理所有 job 失败、重启等操作。
(2)TaskTracker 是 Map-reduce 集群中每台机器都有的一个部分,他做的事情主要是监视自己所在机器的资源情况。
(3)TaskTracker 同时监视当前机器的 tasks 运行状况。TaskTracker 需要把这些信息通过 heartbeat发送给JobTracker,JobTracker 会搜集这些信息以给新提交的 job 分配运行在哪些机器上。
HDFS:HA
YARN 并不是下一代MapReduce(MRv2),下一代MapReduce与第一代MapReduce(MRv1)在编程接口、数据处理引擎(MapTask和ReduceTask)是完全一样的, 可认为MRv2重用了MRv1的这些模块,不同的是资源管理和作业管理系统,MRv1中资源管理和作业管理均是由JobTracker实现的,集两个功能于一身,而在MRv2中,将这两部分分开了。 其中,作业管理由ApplicationMaster实现,而资源管理由新增系统YARN完成,由于YARN具有通用性,因此YARN也可以作为其他计算框架的资源管理系统,不仅限于MapReduce,也是其他计算框架(例如Spark)的管理平台。
ResourceManager:全局资源管理器,基于应用程序对资源的需求进行调度的 ; 每一个应用程序需要不同类型的资源因此就需要不同的容器。资源按需调度,包括:内存,CPU,磁盘,网络等等。
NodeManager:节点代理,监控资源使用情况并且向调度器汇报。
ApplicationMaster:类似JobTracker,向调度器索要适当的资源容器,结合从 ResourceManager 获得的资源和 NodeManager 协同工作来运行和监控任务。
Container:类似TaskTracker,运行Map和Reduce任务。
Mapper:
- protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
- String line = value.toString();
- String[] splited = line.split("\t");
- for (String word : splited) {
- context.write(new Text(word), new LongWritable(1));
- }
- }
- protected void reduce(Text k2, Iterable<LongWritable> v2s, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
- long count = 0L;
- for (LongWritable v2 : v2s) {
- count += v2.get();
- }
- LongWritable v3 = new LongWritable(count);
- context.write(k2, v3);
- }
输入分片(Input split):跟HDFS块大小有关,输入分片调整,合并小文件
Map阶段
Combiner阶段:提高网络传输效率,原则是combiner的输入不会影响到reduce计算的最终输入,例如:如果计算只是求总数,最大值,最小值可以使用combiner,但是做平均值计算使用combiner的话,最终的reduce计算结果就会出错
Shuffle阶段:map输出一般比较大,先放到内存缓冲区,再spill到磁盘(对应溢出文件),多个溢出文件最后合并、partition(对应reducer)
Reduce阶段
进阶小例子:
- public static class AverageMaper extends Mapper<Object,Text,Text,Text>
- {
- //private final static IntWritable one=new IntWritable(1);
- private static Text word=new Text();
- public void map(Object key,Text value,Context context) throws
- IOException,InterruptedException
- {
- StringTokenizer itr=new StringTokenizer(value.toString());
- while(itr.hasMoreTokens())
- {
- word.set(itr.nextToken());
- if(itr.hasMoreTokens())
- context.write(word, new Text(itr.nextToken()+",1"));
- }
- }
- }
- public static class AveragerCombine extends Reducer<Text,Text,Text,Text>
- {
- public void reduce(Text key,Iterable<Text> values,Context context) throws
- IOException,InterruptedException
- {
- int sum=0;
- int cnt=0;
- for(Text val:values)
- {
- String []str=val.toString().split(",");
- sum+=Integer.parseInt(str[0]);
- cnt+=Integer.parseInt(str[1]);
- }
- context.write(key,new Text(sum+","+cnt));
- }
- }
- public static class AveragerReduce extends Reducer<Text,Text,Text,DoubleWritable>
- {
- public void reduce(Text key,Iterable<Text> values,Context context) throws
- IOException,InterruptedException
- {
- int sum=0;
- int cnt=0;
- for(Text val:values)
- {
- String []str=val.toString().split(",");
- sum+=Integer.parseInt(str[0]);
- cnt+=Integer.parseInt(str[1]);
- }
- double res=(sum*1.0)/cnt;
- context.write(key, new DoubleWritable(res));
- }
- }
试验
- hdfs dfs -mkdir -p /user/who
- hdfs dfs -mkdir input
- hdfs dfs -put ./etc/hadoop/*.xml input
- hdfs dfs -ls /user/who/input 或者 hdfs dfs -ls input
-
- // output 目录不能存在
- hdfs dfs -rm -r output
- // hadoop grep 例子
- hadoop jar ./share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.1.jar grep input output 'dfs[a-z.]'
- hdfs dfs -cat output/* 或者先取回本地再查看 hdfs dfs -get output ./output
- // 直接文本查看
- hdfs dfs -text /user/who/input/core-site.xml
- // 本地验证
- find ./etc/ -type f -name *.xml|xargs grep 'dfs[a-z.]' - --color
- // hadoop wordcount 例子
- hadoop jar ./share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.1.jar wordcount input output
HDFS信息:http://192.168.56.102:50070/dfshealth.html
MapReduce查看:http://192.168.56.102:8088/cluster/apps
Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的sql查询功能,可以将sql语句转换为MapReduce任务进行运行。 其优点是学习成本低,可以通过类SQL语句快速实现简单的MapReduce统计,不必开发专门的MapReduce应用,十分适合数据仓库的统计分析。
创建目录并赋予权限:
hdfs dfs -mkdir -p /user/hive/warehouse
hdfs dfs -mkdir -p /user/hive/tmp
hdfs dfs -mkdir -p /user/hive/log
hdfs dfs -chmod g+w /user/hive/warehouse
hdfs dfs -chmod g+w /usr/hive/tmp
hdfs dfs -chmod g+w /usr/hive/log
- <property>
- <name>hive.exec.scratchdir</name>
- <value>/user/hive/tmp</value>
- <description>HDFS root scratch dir for Hive jobs which gets created with write all (733) permission. For each connecting user, an HDFS scratch dir: ${hive.exec.scratchdir}/<username> is created, with ${hive.scratch.dir.permission}.</description>
- </property>
- <property>
- <name>hive.metastore.warehouse.dir</name>
- <value>/user/hive/warehouse</value>
- <description>location of default database for the warehouse</description>
- </property>
- <property>
- <name>hive.querylog.location</name>
- <value>/user/hive/log</value>
- <description>Location of Hive run time structured log file</description>
- </property>
Hive MetaStore配置,默认使用derby
- <property>
- <name>javax.jdo.option.ConnectionURL</name>
- <value>jdbc:mysql://localhost:3306/hive?createDatabaseIfNotExist=true&characterEncoding=UTF-8&useSSL=false</value>
- </property>
- <property>
- <name>javax.jdo.option.ConnectionDriverName</name>
- <value>com.mysql.jdbc.Driver</value>
- </property>
- <property>
- <name>javax.jdo.option.ConnectionUserName</name>
- <value>hive</value>
- </property>
- <property>
- <name>javax.jdo.option.ConnectionPassword</name>
- <value>hive</value>
- </property>
许多关系型数据库都提供了命名空间的概念,用于划分不同的数据库或者Schema。例如MySQL支持的Database概念,PostgreSQL支持的namespace概念。Hive也有自己的schema的概念:
- CREATE DATABASE dbname;
- USE dbname;
- DROP DATABASE dbname;
- SHOW DATABASES; // 默认 default
(1)如何执行:
- // 进入hive命令行后可以直接执行dfs命令,如 dfs -ls /user/hive
- hive
- hive -e "show tables"
- hive -f create_table.hql
(2)表操作:
内部表和外部表的区别:
内部表:将数据移动到对应的warehouse目录下,速度非常快,不会对数据是否符合定义的Schema做校验,这个工作通常在读取的时候进行,称为Schema On Read。使用DROP语句删除后,其数据和表的元数据都被删除。
外部表:不会把数据移动到warehouse目录中。事实上,Hive甚至不会校验外部表的目录是否存在。这使得我们可以在创建表格之后再创建数据。当删除外部表时,Hive只删除元数据,而外部数据不动。
适用场景:大多数情况下,这两者的区别不是很明显。如果数据的所有处理都在Hive中进行,那么更倾向于选择内部表。但是如果Hive和其他工具针对相同的数据集做处理,外部表更合适。一种常见的模式是使用外部表访问存储的HDFS(通常由其他工具创建)中的初始数据,然后使用Hive转换数据并将其结果放在内部表中。相反,外部表可以用于将Hive的处理结果导出供其他应用使用。使用外部表的另一种场景是针对一个数据集,关联多个Schema。
外部表创建:
- CREATE EXTERNAL TABLE external_table (dummy STRING)
- LOCATION '/user/root/external_table';
表操作:
- SHOW TABLES;
- ALTER TABLE source RENAME TO target;
- ALTER TABLE source ADD COLUMNS (col3 STRING);
- DROP TABLE source;
- TRUNCATE TABLE my_table;
- CREATE TABLE
- IF NOT EXISTS useracc (
- dt string,
- time string,
- x_forwarded_for string,
- client_ip string,
- server_ip string,
- http_status string,
- content_length string,
- time_taken string,
- http_method string,
- url string,
- query_string string
- ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' STORED AS TEXTFILE;
create table if not exists table002 like table001;
- CREATE TABLE table002
- AS select ip, time from table001;
(3)插入数据:
直接插入:
insert into users values (0,'Nat'),(2,'Joe'),(3,'Kay'),(4,'Ann');
LOAD DATA LOCAL INPATH "/home/hadoopUser/data/test1.txt" INTO TABLE test1;
LOAD DATA INPATH "/input/test1.txt" OVERWRITE INTO TABLE test1;
从查询结果导入:
INSERT INTO TABLE test4 SELECT * FROM test1;
- CREATE TABLE table002
- AS select ip, time from table001;
多表插入:
- FROM records
- INSERT INTO TABLE stations_by_year
- SELECT year ,COUNT(DISTINCT station)
- GROUP BY year
- INSERT INTO TABLE record_by_year
- SELECT year,count(1)
- GROUP BY year
- INSERT INTO TABLE good_records_by_year
- SELECT year , count(1)
- WHERE temperature != 9999 AND quality in (0,1,4,5,9)
- GROUP BY year;
Hive将表划分为分区,Partition根据分区字段进行。分区可以让数据的部分查询变得更快。我们将用于分区的字段成为分区字段,但是在数据文件中,不存在这些字段的值,这些值是从目录中推断出来的。但是在SELECT语句中,我们依然可使用分区字段。
静态分区:
- create table if not exists partition_table002 like partition_table001;
- insert overwrite table partition_table002 partition (dt='20150617', ht='00') select name, ip from partition_table001 where dt='20150607';
动态分区:
- set hive.exec.dynamic.partition=true;
- set hive.exec.dynamic.partition.mode=nonstrict;
- insert overwrite table partition_table002 partition (dt, ht) select * from partition_table001 where dt='20150617';
- INSERT OVERWRITE TABLE T PARTITION (dt, ht) SELECT key, value, dt, ht FROM srcpart WHERE dt is not null and ht>10;
hive先获取select的最后两个位置的dt和ht参数值,然后将这两个值填写到insert语句partition中的两个dt和ht变量中,即动态分区是通过位置来对应分区值的。原始表select出来的值和输出partition的值的关系仅仅是通过位置来确定的,和名字并没有关系,比如这里dt和st的名称完全没有关系。
混合使用:
- INSERT OVERWRITE TABLE T PARTITION (ds='2010-03-03', hr) SELECT key, value, /*ds,*/ hr FROM srcpart WHERE ds is not null and hr>10; // right
- INSERT OVERWRITE TABLE T PARTITION (ds, hr = 11) SELECT key, value, ds/*, hr*/ FROM srcpart WHERE ds is not null and hr=11; // wrong
(5)分桶:
对于每一个表(table)或者分区,Hive可以进一步组织成桶。Hive也是针对某一列进行桶的组织。Hive采用对列值哈希,然后除以桶的个数求余的方式决定该条记录存放在哪个桶当中。采用桶能够带来一些好处,比如JOIN操作。对于JOIN操作两个表有一个相同的列,如果对这两个表都进行了桶操作。那么将保存相同列值的桶进行JOIN操作就可以,可以大大较少JOIN的数据量。
bucket主要作用:数据sampling;提升某些查询操作效率,例如mapside join。
- set hive.enforce.bucketing = true;
-
- create table student(id INT, age INT, name STRING)
- partitioned by(stat_date STRING)
- clustered by(id) sorted by(age) into 2 bucket
- row format delimited fields terminated by ',';
-
- from student_tmp
- insert overwrite table student partition(stat_date="20120802")
- select id,age,name where stat_date="20120802" sort by age;
抽样语法:TABLESAMPLE(BUCKET x OUT OF y)
select * from student tablesample(bucket 1 out of 2 on id);
y必须是table总bucket数的倍数或者因子。hive根据y的大小,决定抽样的比例。例如,table总共分了64份,当y=32时,抽取 (64/32=)2个bucket的数据,当y=128时,抽取(64/128=)1/2个bucket的数据。x表示从哪个bucket开始抽取。例 如,table总bucket数为32,tablesample(bucket 3 out of 16),表示总共抽取(32/16=)2个bucket的数据,分别为第3个bucket和第(3+16=)19个bucket的数据。
具体步骤参考另一篇文章:大数据串讲-从日志文件分析访问量最高的10个接口及响应访问量
具体参考另一篇文章:Apache Flume 日志收集案例
ELK(ElasticSearch、Logstash、Kibana)三搭档之一
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。