赞
踩
为什么不能用配有大量硬盘的数据库进行大规模分析?为什么需要Hadoop?
因为计算机硬盘的发展趋势是:寻址时间的提升远远不如传输速率的提升,如果访问包含大量地址的数据,读取就会消耗很多时间,如果使用Hadoop,更好的利用传输速率,读取花费的时间远远小于传输的时间,提高分析效率。
MapReduce任务过程分为两个阶段,map阶段和reduce阶段,每个阶段都是以键值对作为输入输出,类型由程序员确定。
提供MapReduce API,允许使用非Java的其他语言写map和reduce函数。Hadoop Streaming使用Unix标准流作为Hadoop和应用程序之间的接口,所以使用任何编程语言通过标准输入输出来写MapReduce程序。
文件系统的基本操作
hadoop有一个抽象的文件系统概念,HDFS只是其中一个实现,Java抽象类org.apache.hadoop.fs.FileSystem定义了hadoop中一个文件系统的客户端接口,该抽象类有几个具体实现:hadoop fs -ls file:///,列出本地根目录下所有文件,
接口
- package ex3_HDFS;
-
- import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
- import org.apache.hadoop.io.IOUtils;
-
- import java.io.IOException;
- import java.io.InputStream;
- import java.net.URL;
-
- //从Hadoop URL读取数据
- //通过URLStreamHander实例以标准输入输出方式显示Hadoop文件系统的文件
- public class URLCat {
- static {
- URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
- }
-
- public static void main(String[] args) throws Exception {
- InputStream in = null;
- try {
- in = new URL(args[0]).openStream();
- IOUtils.copyBytes(in, System.out, 4096, false);
- } finally {
- IOUtils.closeStream(in);
- }
- }
- }
-
- //返回默认文件系统(core-site.xml中指定的,没有指定使用默认的本地文件系统)
- public static FileSystem get(Configuration conf) throws IOException
-
- //通过给定的URI和权限确定要使用的文件系统,如果URI中没有给定,使用默认文件系统
- public static FileSystem get(URI uri, Configuration conf) throws IOException
-
- //作为给定用户访问文件系统,对于安全至关重要(参见10.4)
- public static FileSystem get(URI uri, Configuration conf, String user) throws IOException
-
- //Configuartion对象风阿航了客户端或服务器的配置,通过设置配置文件读取路径实现(如etc/hadoop/core-site.xml)
public static LocalFileSystem getLocal(Configurature conf) throws IOException
- public static FSDataInputStream open(Path f) throws IOException //缓冲区默认4KB
- public abstract static FSDataInputStream open(Path f, int buffersize) throws IOException
- package ex3_HDFS;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IOUtils;
-
- import java.io.IOException;
- import java.io.InputStream;
- import java.net.URI;
-
- //通过FileSystem API读取数据
- //直接使用FileSystem以标准输出格式显示hadoop文件系统中的文件
- public class FileSystemCat {
- public static void main(String[] args) throws IOException {
- String uri = args[0];
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.get(URI.create(uri), conf);//通过uri够构建URI
- // 对象,获得FileSystem对象
- InputStream in = null;
- try {
- in = fs.open(new Path(uri)); //通过uri构建Path对象
- IOUtils.copyBytes(in, System.out, 4096, false);
- } finally {
- IOUtils.closeStream(in);
- }
- }
- }
实际上open()返回的是FSDataInputStream对象,不是标准的java.io对象,是继承了java.io.DataInputStream的一个特殊类,并支持随机访问,可以从流的任意位置读取数据。
FSDataOutputStream对象与FSDataInputStream对象类似,也有一个查询文件当前位置的方法getPos(),没有seek(),对于输出流定位没有什么意义
- package org.apache.hadoop.fs
-
- public clsss FSDataInputStrean extends DataInpuStream
- implements Seekable, PositionedReadable {
- //implementation elided
- }
-
- //Seekable接口支持在文件中找到指定位置,并提供一个当前位置相对于起始位置的偏移量getPos()
- public interface Seekable {
- void seek(long pos) throws IOException;//定位大于文件位置会抛出异常
- long getPos() throws IOException;//与java.io.InputStream的skip()不同,seek可以移动到文件中任意一个绝对位置,skip只能相对于当前位置定位到另一个位置
- }
-
- //PositionedReadable接口,从指定偏移量处读取文件的一部分
- public interface PositionedReadable {
- public int read(long position, byte[] buffer, int offset, int length) throws IOException;//从文件指定position处,读取length长度字节数据到buffer的偏移量offset处,返回实际读取的字节数,需要检查这个值,有可能小于length
- public void readFully(long position, byte[] buffer, int offset, int length) throws IOException;//将指定长度length的字节数读到buffer中,在文件末尾会引发EOFException
- public void readFully(long position, byte[] buffer) throws IOException;
- }
-
- //所有这些方法会保留文件当前偏移量,并且是线程安全的(FSDataInputStream并不是为并发访问设计的,
- //因此最好新建多个实例),它提供了在读取文件主体时,访问文件其他部分的便利方法
- //seek()是一个相对开销高的操作,谨慎使用。建议用流数据来构建应用的访问模式比如MapReduce,而非执行大量的seek()
-
- package ex3_HDFS;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FSDataInputStream;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IOUtils;
-
-
- import java.io.IOException;
- import java.net.URI;
-
- //使用seek()方法,将hadoop文件系统中的一个文件在标准输出上显示两次
- public class FileSystemDoubleCat {
- public static void main(String[] args) throws IOException {
- String uri = args[0];
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.get(URI.create(uri), conf);
- FSDataInputStream in = null;
- try {
- in = fs.open(new Path(uri)); //返回的是FSDataInputStream对象
- IOUtils.copyBytes(in, System.out, 4096, false);
- in.seek(0); //go back to the start of the file
- IOUtils.copyBytes(in, System.out, 4096, false);
- } finally {
- IOUtils.closeStream(in);
- }
- }
- }
- public FSDataOutputStream create(Path f) throws IOExcepction
-
- //重载方法Progressable用于回调接口,可以把写进datanode的进度通知个应用
- package org.apache.hadoop.util;
- public interface Porgressable {
- public void progress();
- }
-
- //追加数据
- //可以创建无边界文件并非所有的文件系统都支持追加操作
- public FSDataOutputInstream append(Path f) throws IOException
- package ex3_HDFS;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IOUtils;
- import org.apache.hadoop.util.Progressable;
-
- import java.io.BufferedInputStream;
- import java.io.FileInputStream;
- import java.io.IOException;
- import java.io.InputStream;
- import java.io.OutputStream;
- import java.net.URI;
-
- public class FileCopyWithProgress {
- public static void main(String[] args) throws IOException {
- String localSrc = args[0];
- String dst = args[1];
- InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
-
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.get(URI.create(dst), conf);
- OutputStream out = fs.create(new Path(dst), new Progressable() {
- //每写一次打印一次
- @Override
- public void progress() { //匿名内部类,重写Progressable类progress方法
- System.out.printf(".");
- }
- });
-
- IOUtils.copyBytes(in, out, 4096, true);
- }
- }
- //一次性创建所有必要目录,返回true
- public boolean mkdirs(Path f) throws IOException
- //传入参数为文件,返回长度为1的FileStatus数组,参数为目录时,返回0或多个FileStatus对象
- //PathFilter可以限制匹配的文件和目录
- //传入多个Path时,相当于依次调用listStatus
- public FileStatus[] listStatus(Path f) throws IOException
- public FileStatus[] listStatus(Path f, PathFilter filter) throws IOException
- public FileStatus[] listStatus(Path[] files) throws IOException
- public FileStatus[] listStatus(Path[] files, PathFilter filter) throws IOException
- package ex3_HDFS;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileStatus;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.FileUtil;
- import org.apache.hadoop.fs.Path;
-
- import java.io.IOException;
- import java.net.URI;
-
- //显示hadoop文件系统中的一组路径的文件信息
- public class ListStatus {
- public static void main(String[] args) throws IOException {
- String uri = args[0];
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.get(URI.create(uri), conf);//获取FS对象
-
- Path[] paths = new Path[args.length];
- for (int i = 0; i < paths.length; i++) { //创建Path对象素组
- paths[i] = new Path(args[i]);
- }
-
- FileStatus[] status = fs.listStatus(paths);//通过fs.listStatus()
- // 方法获得FileStatus数组
- Path[] listedPaths = FileUtil.stat2Paths(status);//再将FileStatus转为Path对象
- for (Path p : listedPaths) {
- System.out.println(p.toString());
- }
- }
- }
- public FileStatus[] globStatus(Path pathPattern) throws IOException
- public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException
通配符与unix bash shell相同
- package org.apache.hadoop.fs
- public interface PathFilter {
- boolean accept(Path path);
- } //与java.io.FileFilter一样,是Path对象而不是File对象
-
- //例子,用于排除匹配正则表达式的路径
- public class RegexExcludePathFilter implements PathFilter {
- private final String regex;
- public RegexExcludePathFilter(String regex) {
- this.regex = regex;
- }
- public boolean accept(Path path) {
- return !path.toString().matches(regex);
- }
- }
- public boolean delete(Path f, boolean recursive) throws IOException
- //recursive为true,非空目录及其内容才会删除,否则抛出IOException
描述文件读写的数据可见性,hdfs为性能牺牲了一些POSIX要求
旨在为所有的应用公平的分配资源,每启动一个新的作业,这个作业所在的队列(一个用户一个队列)就会分配一半的资源给这个作业
所有的yarn调度器都试图以本地为重,当本地节点正在运行时,应当放宽本地性需求,在同一机架上分配一个容器。然而,通过实践发现,如果等待一小段时间,能够增加在所请求节点上分配到容器的机会
不同应用占用的资源类型重点不同(有的占内存,有的占cpu),计算主导资源,主导资源多的分配容器多
Hadoop ChecksumFileSystem使用CRC-32计算校验和,HDFS用于校验和的是一个更有效的变体CRC-32C
hadoop的LocalFileSystem执行客户端的校验和验证,在写入filename文件时,文件系统会在包含每个文件块校验和的同一目录内新建.filename.crc隐藏文件,文件块的大小由file.bytes-per-checksum控制,默认512字节。文件块大小的元数据存储在.crc文件,所以当文件块的大小变化时,仍可以正确读回原文件
禁用校验和:
LocalFileSystem通过ChecksumFileSystem来完成自己的任务,有了这个类向其他文件系统中添加校验和就很容易了,一般用法:
- FileSystem rawFs = ... //底层文件系统称为源文件系统
- FileSystem checksummedFs = new ChecksumFileSystem(rawFs);//把源文件系统添加校验和
-
- //可以使用ChecksumFileSystem的实例checksummedFs的getRawFileSystem()获取它(源文件系统)
- //其他方法
- //getChecksumFile(),获取任意文件校验和路径
如果读取文件时检测到错误,会调用字节的reportChecksumFailure()方法,默认为空方法,LocalFileSystem会将这个错误的文件及其校验和移到同一存储设备名为bad_files的边际文件夹中
与hadoop结合使用的常见压缩算法:
是否支持切分是指:是否可以搜索数据流的任意位置并进一步往下读数据。可切分压缩尤其适合MapReduce
是压缩-解压缩算法的一种实现,在hadoop中,一个对CompressionCodec接口的实现代表一个codec
序列化是指将结构化对象转化为字节流以便在网络上传输或写进磁盘进行永久存储的过程,反序列化是将字节流转回结构化对象。
序列化用于分布式处理的两大领域,进程间通信,永久存储
- 客户端:提交MapReduce作业
- yarn资源管理器,负责计算机资源的分配
- yarn节点管理器,负责启动和监视机器上的计算容器
- MapReduce的application master 负责协调运行MapReduce作业任务,它和MapReduce任务在容器中运行
- 分布式文件系统hdfs,与其他实体间共享作业文件
作业的提交:Job的sumbit()方法创建一个内部的JobSubmiter实例并调用submitJobInternal()方法,提交过程:
- 向资源 管理器请求新ID,用于MapReduce作业,步骤2
- 检查作业的输出说明,如没有指定输出目录或已存在,不提交,错误抛回MapReduce程序
- 计算作业的输入分片,如无法计算,没有输入目录,不提交,错误抛回MapReduce程序
- 将运行作业所需要的资源(jar,配置文件,计算所得分片)复制到一个以作业ID命名目录下的共享文件系统,步骤3
- 调用资源管理器的submitApplication()提交作业,步骤4
- 资源管理器收到调用它的submitApplication消息后,将请求传递个yarn调度器,调度器分配一个容器,然后资源管理器在节点管理器的管理下在容器中启动application master进程,步骤5a,5b
- application master创建多个簿记对象以保持对作业进度的跟踪来进行作业的初始化,步骤6
- application master是一个java应用程序,主类是MRAppMaster
- 接受来自共享文件系统的、在客户端计算的输入分片,步骤7,对每个切片创建map任务对象及由mapreduce.job.reduces属性确定的多个reduce任务对象,分配任务ID
- application master必须决定如何运行构成MapReduce作业的各个任务,如果在新的容器中分配和运行任务的开销大于并行运行他们的开销时,就选择和自己在同一个JVM上运行,这样的作业称为uberized,或者作为uber任务运行
- 如果作业不适合作为uber任务运行,那么application master就会该作业中所有map任务和reduce任务向资源管理器请求容器,步骤8
- map任务的请求先发出,优先级高于reduce任务,因为所有的map任务必须在reduce的排序阶段启动前完成,直到有5%的map任务完成,为reduce任务的请求才会发出
- reduce任务能够在集群中任意位置运行,但是map任务有着本地化局限(所有的yarn调度器都试图以本地请求为重)
- 一旦资源管理器的调度器为任务分配了一个节点上的容器,application master就通过与节点管理器通信来启动容器,步骤9a,9b
- 该任务由一个主类为YarnChild的java应用程序执行,运行任务前,首先将任务需要的资源本地化(配置文件,jar文件,所有来自分布式缓存的文件),最后运行map或reduce任务
- yarn在指定的JVM中运行,因此用户定义的map,reduce函数甚至是YarnChild中的任何缺陷都不会影响到节点管理器
- 一个作业和它的每个任务都有一个状态(作业或任务的状态,map和reduce的进度,作业计数器的值,状态消息或描述)
- map或reduce任务运行时,每隔3秒钟,任务通过umbilical接口和父application master通信,报告自己的进度和状态,application master会形成一个作业的汇聚视图
- 作业期间,客户端每秒钟轮询一次application master,以接受最新状态
- application master收到最后一个任务完成的通知后,把作业的状态设置为成功,在Job轮询时,打印消息通知用户,从waitForCompletion返回。Job的统计信息和计数值输出到控制台
- 作业完成时,application master和任务容器清理其工作状态,OutputCommiter的commitJob()调用,作业信息由作业历史服务器存档
任务运行失败
- 任务长时间没有收到进度会标记为失败,可以设置超时mapreduce.task.timeout,通常为10分钟
- 任务失败4次后application master 将不会重新执行,mapreduce.map.maxattempts,mapreducer.reduce.maxattempts
- 不希望失败几个任务整个作业都失败,设置百分比,mapreduce.map.failure.maxpercent,mapreduce.reduce.failure.maxpercent
- 任务尝试是可以被终止的,不会被记入失败运行次数
application master运行失败
- application master失败2次不再尝试,mapreduce.am.max-atttempts
- yarn在集群上对yarn application master最大尝试次数做了限制,yarn.resourcemanager.am.max-attempts,默认2,要增加MapReduce application master尝试次数,必须增加集群上的yarn设置
- 恢复过程,application master向资源管理器发送周期性心跳,资源管理器检测到该失败在新的容器里开始新的master
- MapReduce application maser 可以使用作业历史恢复失败的,yarn.app.mapreduce.am.jon.recover.enable,false关闭
节点管理器运行失败
- 如果10分钟内节点管理器没有向资源管理器发送心跳信息,yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms,资源管理器将从节点池中移出该节点,该节点上的所有任务都要进行恢复
- 节点管理器失败次数过高,可能会被拉黑,mapreduce.job.maxtaskfailures.per.tracker
资源管理器运行失败
- 运行一对资源管理器,关于运行中的应用程序的信息存储在一个高可用的状态存存储区中,备机到主机的切换由故障转移器处理,使用zookeeper的leader选举机制以确保同一时刻只有一个资源管理器
map输出传入reducer的过程称为shuffle
- 每一个map任务都有一个环形内存缓冲区,线程根据最终要传入的reducer把数据划分成相应的分区,按键进行内存中排序(如果有一个combiner函数,它就在排序后的输出上运行,使map输出结果跟紧凑),一旦缓存内容达到阈值,就会新建一个溢出文件写入磁盘(如果溢出文件超过3个,combiner会在输出文件写到磁盘之前再次运行)。
- 现在tasktracker需要为分区文件运行reduce任务,reduce任务需要集群上若干个map任务的输出作为其特殊的分区文件,每个map任务完成时,reduce任务就开始复制其输出,这是reduce的复制阶段(小的话复制到JVM内存,大的话复制到磁盘,超过某个阈值,合并后写到磁盘中,如果指定combiner,则在合并期间运行它降低写入磁盘的数据量)
- 随着磁盘上副本增多,后台线程会将他们合并为更大的、排好序的文件
- 复制完所有map输出,reduce任务进入合并阶段,合并map输出,维持其顺序排序,循环进行(并不是每次合并设置的合并因子,读取文件合并后再写入磁盘,减少文件写入磁盘的数据量,最后一趟总是直接合并到reduce)
- 在reduce阶段,对已排序输出中的每个键调用reduce函数,函数的输出直接写到文件系统,一般为hdfs
- 调优shuffle过程提高mapreduce性能,总的原则是给shuffle尽可能多的内存
- 在map端,可以通过避免多次溢写磁盘来获得最佳性能
- 在reduce端,中间数据全部驻留内存就能获得最佳性能
- 任务执行环境,其属性可以从传递Mapper或reducer的所有方法的相关对象中获取
- 推测执行:对于一个作业中拖后腿的一些任务,如果比预期慢的时候,hadoop会尽量检测,并启用另外一个相同的任务作为备份,这就是所谓的推测执行。其中一个运行成功,另外一个都会被终止
- 推测执行是一种优化措施,默认启动,可以基于集群或某个作业单独为map或reduce启用会禁用该功能
- 推测执行是以降低集群效率为代价的,推测执行会减少整体的吞吐量。
- 关闭reduce任务推测执行有益,因为reduce任务都必须先取得所有的map输出,这可能大幅度增加集群上的网络传输
- 关于OutputCommitters:hadoop MapReduce使用一个提交协议来确保作业和任务都完全成功或失败,这个行为通过对作业使用OutputCommitter来实现
- 任务附属文件
Hive是一个构建在hadoop上的数据仓库框架,被设计用mapreduce操作hdfs数据
- 在非交互模式,-f指定文件中的命令,hive -f script.q
- 较短的脚本,-e内嵌命令,hive -e 'SELECT * FROM table'
- 强制不显示信息,-S
- CREATE TABLE records (year STRING, temperature INT, quality INT)
- ROW FORMAT DELIMITED
- FIELDS TERMINATED BY '\t';
-
- //row format HiveQL特有,声明数据文件的每一行是由\t分隔
- //fields terminated
- LOAD DATA LOCAL INPATH 'xxx/xxx.txt' //把本地文件放入仓库目录中,不加Local是在hdfs,复制还是移动??
- OVERWRITE INTO TABLE records; //overwrite删掉表对应目录中已有的所有文件,省略则简单的加入此文件
-
- //仓库目录由选项hive.metastore.warehouse.dir控制,默认/usr/hive/warehouse
- SELECT year, MAX(temperature)
- FROM records
- WHERE temperature != 1 AND quiality IN (1, 2 ,3)
- GROUP BY year;
-
- //data
- 2000 10 1
- 2000 12 1
- 2001 1 1
- 2001 2 1
conf目录下配置hive-site.xml,设置每次运行hive使用的选项,该目录下还有hive-default.xml(默认值)
- hive --config /xxx/xxx/hive-conf //指定目录而不是文件
- //或者设置环境变量
- HIVE_CONF_DIR
- hive -hiveconf fs.defaultFS=hdfs://localhost \
- -hiveconf mapreduce.framework.name=yarn \\
- -hiveconf yarn.resourcemanager.address=localhost:8032
-
-
- SET hive.enforce.bucketing=true; //使用SET更改设置
-
- SET hive.enforce.bucketing //只有属性名,查询
-
- SET-v //列出所有属性
- SET命令
- 命令行 -hiveconf选项
- hive-site.xml和hadoop本地配置文件(core-site.xml,hdfs-site.xml,mapred-site.xml,yarn-site.xml)
- hive-default和hadoop默认文件(core-default.xml,...)
执行引擎 默认mapreduce
日志记录 ${java.io.tmpdir}${user.name}/hive.log
修改日志目录hive -hiveconf hive.log.dir='/tmp/${user.name}'
配置文件conf/hive-log4j.properites
hive -hiveconf hive.root.logger=DEBUG,console 在对话中配置
hive的表由存储的数据和描述数据形式的元数据组成,数据一般存在hdfs中,元数据存放在关系型数据库中,而不是hdfs
- 托管表,把数据移入它的仓库目录,一张表对应一个文件夹/user/hive/warehouse/table_name
- drop 删除元数据和数据
- 外部表,hive到仓库以外的位置访问数据,hive不管理的数据(也可以是hdfs),创建表时使用external关键字
- drop只删除元数据
- 一般是初始数据集当做外部表使用,可以用来导出数据供其他应用使用
- //分区是在创建表的时候定义的
- CREATE TABLE logs (ts BIGINT, line STRING)
- PARTITIONED BY (dt STRING, country STRING);
-
- //把数据加载到分区时要显式指定分区值
- LOAD DATA LOCAL INPATH '/xxx/xxx/file1'
- INTO TABLE logs
- PARTITION (dt='2000-01-01', country='GB'); //文件系统级别分别会创建对应的子目录
-
- //创建表之后可以使用ALTER TABLE 增加或移出分区
-
- //显示分区
- SHOW PARTITION logs
- //指定划分桶的列和个数
- CREATE TABLE bucketed_users (id INT, name STRING)
- CLUSTERED BY(id) INTO 4 BUCKETS; //物理上每个桶就是一个文件
-
- //排序桶,进一步提升map端连接效率
- CREATE TABLE bucketed_users (id INT, name STRING)
- CLUSTERED BY(id) SORTED BY (id ASC) INTO 4 BUCKETS;
-
- //向分桶后的表中插入数据需要设置hive.enforce.bucketing为true
- INSERT OVERWRITE TABLE bucketed_users
- SELECT * FROM users;
-
- //TABLESAMPLE对表进行取样
- SELECT * FROM bucketed_users
- TABLESAMPLE(BUCKET 1 OUT OF 4 ON id); //返回1/4的桶
-
- SELECT * FROM bucketed_users
- TABLESAMPLE(BUCKET 1 OUT OF 2 ON id); //返回一半的桶
- 默认存储模式:分割的文本
- 二进制存储格式:顺序文件,Avro数据文件,Parquet文件,RCFile,ORCFile
- 使用定制的SerDe:RegexSerDe
- 存储句柄
- INSERT语句,使用PARTITION()指明分区,可以在SELECT中使用分区值动态指明分区
- 多表插入,FROM source 可以放在INSERT前面,之后可以写多个INSERT语句,使用一个源表
- CREATE TABLE...AS SELECT...,原子性的,查询失败不会创建新表
- 重命名,添加列....
- DROP TABLE table 删除元数据和数据
- TRUNCATE TABLE table 删除数据,保留元数据(表结构),类似方法:CREATE TABLE table LIKE existing_table
- 排序和聚集
- MapReduce脚本,TRANSFORM,MAP,REDUCE可以在hive中调用外部脚本或程序,运行之前使用ADD注册脚本,通过这一操作,hadoop把脚本传到hadoop集群上
- 连接
- 内连接,查询前使用EXPLAIN,查看使用了多少MapReduce作业,EXPLAIN EXTENDED更详细的信息
- 外连接,左外连接,右外连接
- 半连接
- map连接,表很小可以放入每个mapper的内存,map连接可以充分利用分桶的表,需要启动优化选项:SET hive.optimize.bucketmapjoin=true
- 子查询,只允许子查询出现在select的from语句中,或where子句的in或exists中,不支持相关子查询
- 视图,create view as select...,视图是select语句定义的虚表,创建视图时并不物化存储到磁盘,视图只读
必须用java写,hive本身也是用java写的,对于其他语言,可以使用select transform查询,流经用户的脚本
Hbase是一个在HDFS上开发的面向列的分布式数据库,如果需要实时的随机访问超大规模数据集,就可以使用Hbase这一Hadoop应用。可以简单的增加节点来扩展
- 列族,列族的前缀:列族修饰符,info:format,info:geo,列族的前缀必须预先给出,列族成员可以随时按需加入
- 单元格有版本
- 行是排序的
- 一个master主控机协调管理一个或多个regionserver从属机,负载启动一个全新的安装,把区域分配给注册的regionserver,活肤regionsrver故障,
- 依赖于zookeeper
- 从属节点在conf/regionservers文件中
- 使用SSH机制运行远程命令
- 站点配置在conf/hbase-site.xml,hbase-env.sh文件中
- 通过hadoop文件系统API存储数据
- Hbase保留名为hbase:meta的特殊目录表,维护这当前集群上所有区域的列表,区域名作为键
- 区域名:【表名, 起始行, 创建的时间戳.MD5哈希值.】
- create 'test', 'data' //新建一个test表,有一个data列族,列族属性为默认值
- list //列出空间里所有的表
- put 'test', 'row1', 'data:1', 'value1' //添加数据
- put 'test', 'row2', 'data:2', 'value2'
- put 'test', 'row3', 'data:3', 'value3'
- get 'test', 'row1' //获取row1
- scan 'test' //列出整个表
- disable 'test' //把表设为离线
- drop 'table'
- enable 'test' //重新设为在线
- create 'stations', {NAME => 'info'}
- create 'observation', {NAME => 'data'}
- //只对最新数据感兴趣,把version改为1,默认3
ZooKeeper是hadoop的分布式协调服务
- 指定基本时间单元
- tickTime=2000
- 指定存储数据的本地文件系统位置
- dataDir=/user/xxx
- 指定连接客户端监听的端口
- clientPort=2181
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。