当前位置:   article > 正文

HBase的简单学习四

HBase的简单学习四

HBase的进阶

1.1 hbase的写流程

Hbase读取数据的流程:
1)是由客户端发起读取数据的请求,首先会与zookeeper建立连接
2)从zookeeper中获取一个hbase:meta表位置信息,被哪一个regionserver所管理着
     hbase:meta表:hbase的元数据表,在这个表中存储了自定义表相关的元数据,包括表名,表有哪些列簇,表有哪些region,每个region存储的位置,每个region被哪个regionserver所管理,这个表也是存储在某一个region上的,并且这个meta表只会被一个regionserver所管理。这个表的位置信息只有zookeeper知道。
3)连接这个meta表对应的regionserver,从meta表中获取当前你要读取的这个表对应的regionsever是谁。
     当一个表多个region怎么办呢?
     如果我们获取数据是以get的方式,只会返回一个regionserver
     如果我们获取数据是以scan的方式,会将所有的region对应的regionserver的地址全部返回。
4)连接要读取表的对应的regionserver,从regionserver上的开始读取数据:
       读取顺序:memstore-->blockcache-->storefile-->Hfile中
       注意:如果是scan操作,就不仅仅去blockcache了,而是所有都会去找。

1.2 hbase的写流程

Hbase的写入数据流程:
1)由客户端发起写数据请求,首先会与zookeeper建立连接
2)从zookeeper中获取hbase:meta表被哪一个regionserver所管理
3)连接hbase:meta表中获取对应的regionserver地址 (从meta表中获取当前要写入数据的表对应的region所管理的regionserver) 只会返回一个regionserver地址
4)与要写入数据的regionserver建立连接,然后开始写入数据,将数据首先会写入到HLog,然后将数据写入到对应store模块中的memstore中
(可能会写多个),当这两个地方都写入完成之后,表示数据写入完成。


-------------------------后面的步骤是服务器内部的操作-----------------
异步操作
5)随着客户端不断地写入数据,memstore中的数据会越来多,当内存中的数据达到阈值(128M/1h)的时候,放入到blockchache中,生成新的memstore接收用户过来的数据,然后当blockcache的大小达到一定阈值(0.85)的时候,开始触发flush机制,将数据最终刷新到HDFS中形成小的Hfile文件。

6)随着不断地刷新,storefile不断地在HDFS上生成小HFIle文件,当小的HFile文件达到阈值的时候(3个及3个以上),就会触发Compaction机制,将小的HFile合并成一个大的HFile.

7)随着不断地合并,大的HFile文件会越来越大,当达到一定阈值(2.0版本之后最终10G)的时候,会触发分裂机制(split),将大的HFile文件进行一分为二,同时管理这个大的HFile的region也会被一分为二,形成两个新的region和两个新的HFile文件,一对一的进行管理,将原来旧的region和分裂之前大的HFile文件慢慢地就会下线处理。

1.3 Region的分裂策略

region中存储的是一张表的数据,当region中的数据条数过多的时候,会直接影响查询效率。当region过大的时候,region会被拆分为两个region,HMaster会将分裂的region分配到不同的regionserver上,这样可以让请求分散到不同的RegionServer上,已达到负载均衡 , 这也是HBase的一个优点 。

  • ConstantSizeRegionSplitPolicy

    0.94版本前,HBase region的默认切分策略

    当region中最大的store大小超过某个阈值(hbase.hregion.max.filesize=10G)之后就会触发切分,一个region等分为2个region。

    但是在生产线上这种切分策略却有相当大的弊端(切分策略对于大表和小表没有明显的区分):

    • 阈值(hbase.hregion.max.filesize)设置较大对大表比较友好,但是小表就有可能不会触发分裂,极端情况下可能就1个,形成热点,这对业务来说并不是什么好事。

    • 如果设置较小则对小表友好,但一个大表就会在整个集群产生大量的region,这对于集群的管理、资源使用、failover来说都不是一件好事。

  • IncreasingToUpperBoundRegionSplitPolicy

    0.94版本~2.0版本默认切分策略

    总体看和ConstantSizeRegionSplitPolicy思路相同,一个region中最大的store大小大于设置阈值就会触发切分。 但是这个阈值并不像ConstantSizeRegionSplitPolicy是一个固定的值,而是会在一定条件下不断调整,调整规则和region所属表在当前regionserver上的region个数有关系.

    region split阈值的计算公式是:

    • 设regioncount:是region所属表在当前regionserver上的region的个数

    • 阈值 = regioncount^3 * 128M * 2,当然阈值并不会无限增长,最大不超过MaxRegionFileSize(10G),当region中最大的store的大小达到该阈值的时候进行region split

    例如:

    • 第一次split阈值 = 1^3 * 256 = 256MB

    • 第二次split阈值 = 2^3 * 256 = 2048MB

    • 第三次split阈值 = 3^3 * 256 = 6912MB

    • 第四次split阈值 = 4^3 * 256 = 16384MB > 10GB,因此取较小的值10GB

    • 后面每次split的size都是10GB了

    特点

    • 相比ConstantSizeRegionSplitPolicy,可以自适应大表、小表;

    • 在集群规模比较大的情况下,对大表的表现比较优秀

    • 对小表不友好,小表可能产生大量的小region,分散在各regionserver上

    • 小表达不到多次切分条件,导致每个split都很小,所以分散在各个regionServer上

  • SteppingSplitPolicy

    2.0版本默认切分策略

    相比 IncreasingToUpperBoundRegionSplitPolicy 简单了一些 ​ region切分的阈值依然和待分裂region所属表在当前regionserver上的region个数有关系

    • 如果region个数等于1,切分阈值为flush size 128M

    • 否则为MaxRegionFileSize。

    这种切分策略对于大集群中的大表、小表会比 IncreasingToUpperBoundRegionSplitPolicy 更加友好,小表不会再产生大量的小region,而是适可而止。

  • KeyPrefixRegionSplitPolicy

    根据rowKey的前缀对数据进行分区,这里是指定rowKey的前多少位作为前缀,比如rowKey都是16位的,指定前5位是前缀,那么前5位相同的rowKey在相同的region中。

  • DelimitedKeyPrefixRegionSplitPolicy

    保证相同前缀的数据在同一个region中,例如rowKey的格式为:userid_eventtype_eventid,指定的delimiter为 _ ,则split的的时候会确保userid相同的数据在同一个region中。 按照分隔符进行切分,而KeyPrefixRegionSplitPolicy是按照指定位数切分。

  • BusyRegionSplitPolicy

    按照一定的策略判断Region是不是Busy状态,如果是即进行切分

    如果你的系统常常会出现热点Region,而你对性能有很高的追求,那么这种策略可能会比较适合你。它会通过拆分热点Region来缓解热点Region的压力,但是根据热点来拆分Region也会带来很多不确定性因素,因为你也不知道下一个被拆分的Region是哪个。

  • DisabledRegionSplitPolicy

    不启用自动拆分, 需要指定手动拆分

1.4  Compaction操作

 Minor Compaction:

  • 指选取一些小的、相邻的StoreFile将他们合并成一个更大的StoreFile,在这个过程中不会处理已经Deleted或Expired的Cell。一次 Minor Compaction 的结果是更少并且更大的StoreFile。

Major Compaction:

  • 指将所有的StoreFile合并成一个StoreFile,这个过程会清理三类没有意义的数据:被删除的数据TTL过期数据版本号超过设定版本号的数据。另外,一般情况下,major compaction时间会持续比较长,整个过程会消耗大量系统资源,对上层业务有比较大的影响。因此线上业务都会将关闭自动触发major compaction功能,改为手动在业务低峰期触发。

1.5 经典面试题

HBase适合存储PB级别的海量数据(百亿千亿量级条记录),如果根据记录主键Rowkey来查询,能在几十到百毫秒内返回数据。

那么HBase是如何做到的呢?

接下来,简单阐述一下数据的查询思路和过程。

查询过程

第1步:

项目有100亿业务数据,存储在一个HBase集群上(由多个服务器数据节点构成),每个数据节点上有若干个Region(区域),每个Region实际上就是HBase中一批数据的集合(一段连续范围rowkey的数据)。

我们现在开始根据主键RowKey来查询对应的记录,通过meta表可以帮我们迅速定位到该记录所在的数据节点,以及数据节点中的Region,目前我们有100亿条记录,占空间10TB。所有记录被切分成5000个Region,那么现在,每个Region就是2G。

由于记录在1个Region中,所以现在我们只要查询这2G的记录文件,就能找到对应记录。

第2步:

由于HBase存储数据是按照列族存储的。比如一条记录有400个字段,前100个字段是人员信息相关,这是一个列簇(列的集合);中间100个字段是公司信息相关,是一个列簇。另外100个字段是人员交易信息相关,也是一个列簇;最后还有100个字段是其他信息,也是一个列簇

这四个列簇是分开存储的,这时,假设2G的Region文件中,分为4个列族,那么每个列族就是500M。

到这里,我们只需要遍历这500M的列簇就可以找到对应的记录。

第3步:

如果要查询的记录在其中1个列族上,1个列族在HDFS中会包含1个或者多个HFile。

如果一个HFile一般的大小为100M,那么该列族包含5个HFile在磁盘上或内存中。

由于HBase的内存进入磁盘中的数据是排好序(字典顺序)的,要查询的记录有可能在最前面,也有可能在最后面,按平均来算,我们只需遍历2.5个HFile共250M,即可找到对应的记录。

第4步:

每个HFile中,是以键值对(key/value)方式存储,只要遍历文件中的key位置并判断符合条件即可

一般key是有限的长度,假设key/value比是1:24,最终只需要10M的数据量,就可获取的对应的记录。

如果数据在机械磁盘上,按其访问速度100M/S,只需0.1秒即可查到。

如果是SSD的话,0.01秒即可查到。

当然,扫描HFile时还可以通过布隆过滤器快速定位到对应的HFile,以及HBase是有内存缓存机制的,如果数据在内存中,效率会更高。

 二 HBase与Hive的集成

2.1 相关概念

HBase与Hive的对比

hive:

数据仓库:Hive的本质其实就相当于将HDFS中已经存储的文件在Mysql中做了一个双射关系,以方便使用HQL去管理查询。

用于数据分析、清洗:Hive适用于离线的数据分析和清洗,延迟较高。

基于HDFS、MapReduce:Hive存储的数据依旧在DataNode上,编写的HQL语句终将是转换为MapReduce代码执行。

HBase

数据库:是一种面向列族存储的非关系型数据库。

用于存储结构化和非结构化的数据:适用于单表非关系型数据的存储,不适合做关联查询,类似JOIN等操作。

基于HDFS:数据持久化存储的体现形式是HFile,存放于DataNode中,被ResionServer以region的形式进行管理。

延迟较低,接入在线业务使用:面对大量的企业数据,HBase可以直线单表大量数据的存储,同时提供了高效的数据访问速度。

2.1  配置相关文件

1.配置hive-site.xml 文件

  1. <property>
  2. <name>hive.zookeeper.quorum</name>
  3. <value>master,node1,node2</value>
  4. </property>
  5. <property>
  6. <name>hive.zookeeper.client.port</name>
  7. <value>2181</value>
  8. </property>

2.3 关联

1.

HBase中已经存储了某一张表,在Hive中创建一个外部表来关联HBase中的这张表

建立外部表的字段名要和hbase中的列名一致

前提是hbase中已经有表了

 示例,以后模仿这个

  1. create external table students_hbase
  2. (
  3. id string,
  4. name string,
  5. age string,
  6. gender string,
  7. clazz string
  8. )
  9. stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
  10. with serdeproperties ("hbase.columns.mapping" = "
  11. :key,
  12. info:name,
  13. info:age,
  14. info:gender,
  15. info:clazz
  16. ")
  17. tblproperties("hbase.table.name" = "default:students");

三 Phoenix

3.1 安装

1.准备好文件,上传到Linux中

2.解压 tar -zxvf phoenix-hbase-2.2-5.1.3-bin.tar.gz -C ../

3.2 配置文件

1.配置环境变量

/etc/profile

2.将phoenix-server-hbase-2.2-5.1.3.jar复制到所有节点的hbase lib目录下

3.3 启动

1.先要启动hbase

2.sqlline.py master,node1,node2

3.4 常用语法

1.建表语句

必须要设置主键

  1. CREATE TABLE IF NOT EXISTS student_p1 (
  2. id VARCHAR NOT NULL PRIMARY KEY,
  3. name VARCHAR,
  4. age BIGINT,
  5. gender VARCHAR ,
  6. clazz VARCHAR
  7. );

2.插入语句

语法 upsert into 表名 values(内容)

  1. upsert into STUDENT values('1500100004','葛德曜',24,'男','理科三班');
  2. upsert into STUDENT values('1500100005','宣谷芹',24,'男','理科六班');
  3. upsert into STUDENT values('1500100006','羿彦昌',24,'女','理科三班');

3.查询语句

跟sql语句差不多

4.删除数据

语法 delete from 表名 where 主键名=主键值;

delete from student_p1 where id='1500100004';

5. 删除表

drop table student_p1;

6.退出

!quit

3.5 注意事项

1.在phoenix中创建表之后,我们可以使用 !table或者show table 查看所有的表,并以大写的形式展现给我们,我们使用sql语句查询的时候不分表的大小写。

2.在phoenix中创建表之后,可以在hbase中可以通过scan加大写的表名查看phoenix中的表,但是在hbase中创建的表,在phoenix就无法查看。如果想查看,则需要在phoenix中进行表的映射。映射方式有两种:视图映射和表映射

3、如何在phoneix中使用hbase原本的数据表呢?
        视图映射:视图并不是真正意义上的表,而是在phoneix创建一个映射关系,以表的形式将hbase中原本数据映射过来,可以在基础之上编写sql语句进行分析,需要注意的是,我们在视图上sql分析的时候,表名和列名需要加双引号。删除视图不会影响原本hbase中的数据,视图无法做修改,只能查询,视图在phoneix中被看作成一个只读表。
        表映射:建表的语法来说与视图映射相差一个单词,其他的没啥区别。使用上,表映射可以直接在phoneix中对表数据进行增删改查。将phoneix中表映射删了,原来hbase中的表也对应删除。
        
    4、映射查询的时候,主键可以不用加双引号,非主键的列必须加双引号

3.7映射

3.7.1 视图映射

1.Phoenix创建的视图是只读的,所以只能用来做查询,无法通过视图对源数据进行修改等操作

2.在phoenix创建视图, primary key 对应到hbase中的rowkey

3.创建的映射要与想要映射的hbase中的表名一致,且视图要被双引号包起来。

4.视图映射相关建表案例

创建的是映射,所以是view

  1. CREATE view "students" (
  2. id VARCHAR NOT NULL PRIMARY KEY,
  3. "info"."name" VARCHAR,
  4. "info"."age" VARCHAR,
  5. "info"."gender" VARCHAR,
  6. "info"."clazz" VARCHAR
  7. ) column_encoded_bytes=0;

5.查询相关事项

a.映射创建好之后,想要查看映射内容,必须使用sql语句的时候将视图名用双引号包起来,才能查看

b.映射查询的时候,主键可以不用加双引号,非主键的列必须加双引号

c.删除视图不会影响原本hbase中的数据,视图无法做修改,只能查询,视图在phoneix中被看作成一个只读表。

drop view "视图名";

3.7.2 表映射

1.建表映射

1) 当HBase中已经存在表时,可以以类似创建视图的方式创建关联表,只需要将create view改为create table即可。

2)当HBase中不存在表时,可以直接使用create table指令创建需要的表,并且在创建指令中可以根据需要对HBase表结构进行显示的说明。

3)创建的映射要与想要映射的hbase中的表名一致,且表要被双引号包起来。

  1. CREATE table "students" (
  2. id VARCHAR NOT NULL PRIMARY KEY,
  3. "info"."name" VARCHAR,
  4. "info"."age" VARCHAR,
  5. "info"."gender" VARCHAR,
  6. "info"."clazz" VARCHAR
  7. ) column_encoded_bytes=0;

2.使用create table创建的关联表,如果对表进行了修改,源数据也会改变,同时如果关联表被删除,源表也会被删除。但是视图就不会,如果删除视图,源数据不会发生改变。

3.8 sql案例

1.过滤出文科一班的学生总分前10的学生信息

过滤出文科一班的学生
select ID as id,"name" as name,"clazz" as clazz from "students" where "clazz"='文科一班';

将成绩表做切分转换
select regexp_split(ID,'-')[1] as student_id,regexp_split(ID,'-')[2] as subject_id,"score" as score from "scores";

在第二步的基础之上求每个学生的总分
select t1.student_id as student_id,sum(to_number(t1.score)) as sum_score from (select regexp_split(ID,'-')[1] as student_id,regexp_split(ID,'-')[2] as subject_id,"score" as score from "scores") t1 group by t1.student_id;

与步骤1的文科学生进行关联
select b1.id as student_id,b1.name as name,b1.clazz as clazz,to_char(b2.sum_score) as sum_score from (select ID as id,"name" as name,"clazz" as clazz from "students" where "clazz"='文科一班') b1 join (select t1.student_id as student_id,sum(to_number(t1.score)) as sum_score from (select regexp_split(ID,'-')[1] as student_id,regexp_split(ID,'-')[2] as subject_id,"score" as score from "scores") t1 group by t1.student_id) b2 on (b1.id=b2.student_id) order by b2.sum_score desc limit 10;

2.注意事项

通过这个例子遇到的注意点:
1、切分字符串的函数不是split,而是regexp_split
2、Phoenix中,数组的索引是从1开始的
3、给字段起别名之后的嵌套查询,就不需要再加双引号了,主键本身就可以不用加
4、sum函数中的数据类型必须是数值类型,如果是10的整数倍,会以科学计数法进行标识 580->5.8E+2(5.8*10^2)
5、to_number()转数值  to_char()转字符串

四  bulkLoad实现批量导入

4.1相关概念

1.优点:

  1. 如果我们一次性入库hbase巨量数据,处理速度慢不说,还特别占用Region资源, 一个比较高效便捷的方法就是使用 “Bulk Loading”方法,即HBase提供的HFileOutputFormat类。

  2. 它是利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接生成这种hdfs内存储的数据格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。配合mapreduce完成,高效便捷,而且不占用region资源,增添负载。

2.限制:

  1. 仅适合初次数据导入,即表内数据为空,或者每次入库表内都无数据的情况。

  2. HBase集群与Hadoop集群为同一集群,即HBase所基于的HDFS为生成HFile的MR的集群

4.2 编写代码

代码编写:

提前在Hbase中创建好表

生成Hfile基本流程:

  1. 设置Mapper的输出KV类型:

    K: ImmutableBytesWritable(代表行键)

    V: KeyValue (代表cell)

2. 开发Mapper

读取你的原始数据,按你的需求做处理

输出rowkey作为K,输出一些KeyValue(Put)作为V

3. 配置job参数

a. Zookeeper的连接地址

b. 配置输出的OutputFormat为HFileOutputFormat2,并为其设置参数

4. 提交job

导入HFile到RegionServer的流程

构建一个表描述对象

构建一个region定位工具

然后用LoadIncrementalHFiles来doBulkload操作

  1. package com.shujia.jinjie;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.hbase.HBaseConfiguration;
  5. import org.apache.hadoop.hbase.KeyValue;
  6. import org.apache.hadoop.hbase.TableName;
  7. import org.apache.hadoop.hbase.client.*;
  8. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  9. import org.apache.hadoop.hbase.mapreduce.CellSortReducer;
  10. import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
  11. import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
  12. import org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner;
  13. import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
  14. import org.apache.hadoop.hbase.util.Bytes;
  15. import org.apache.hadoop.io.LongWritable;
  16. import org.apache.hadoop.io.Text;
  17. import org.apache.hadoop.mapreduce.Job;
  18. import org.apache.hadoop.mapreduce.Mapper;
  19. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  20. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  21. import java.io.FileInputStream;
  22. import java.io.IOException;
  23. class MyBulkLoadMapper extends Mapper<LongWritable,Text,ImmutableBytesWritable,KeyValue>{
  24. /*
  25. ==> tl_hefei_shushan_503.txt <==
  26. D55433A437AEC8D8D3DB2BCA56E9E64392A9D93C 117210031795040 83401 8340104 301 20180503190539 20180503233517 20180503
  27. 8827F3196977C6F752680505FEC0C7D3A18D4DFC \N \N \N \N \N \N \N
  28. */
  29. protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>.Context context) throws IOException, InterruptedException {
  30. String line = value.toString();
  31. //处理脏数据
  32. if (!(line.contains("shushan")) && !(line.contains("\\N"))){
  33. String[] strings = line.split("\t");
  34. String phoneNum = strings[0];
  35. String wg = strings[1];
  36. String city = strings[2];
  37. String qx = strings[3];
  38. String stayTime = strings[4];
  39. String startTime = strings[5];
  40. String endTime = strings[6];
  41. String date = strings[7];
  42. //因为手机号代表一个人,一个人可能会出现在多个区域,不能直接将手机号作为rk,为了数据的完整性
  43. //可以将手机号和进入网格的时间拼接
  44. byte[] rk = Bytes.toBytes(phoneNum + "-" + startTime);
  45. byte[] family = Bytes.toBytes("info");
  46. //创建输出key对象
  47. // ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable(rk);
  48. // ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable(rk);
  49. ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable(rk);
  50. //将其余的6列封装成单元格
  51. KeyValue keyValue1 = new KeyValue(rk, family, Bytes.toBytes("wg"), Bytes.toBytes(wg));
  52. context.write(immutableBytesWritable,keyValue1);
  53. KeyValue keyValue2 = new KeyValue(rk, family, Bytes.toBytes("city"), Bytes.toBytes(city));
  54. context.write(immutableBytesWritable,keyValue2);
  55. KeyValue keyValue3 = new KeyValue(rk, family, Bytes.toBytes("qx"), Bytes.toBytes(qx));
  56. context.write(immutableBytesWritable,keyValue3);
  57. KeyValue keyValue4 = new KeyValue(rk, family, Bytes.toBytes("stayTime"), Bytes.toBytes(stayTime));
  58. context.write(immutableBytesWritable,keyValue4);
  59. KeyValue keyValue5 = new KeyValue(rk, family, Bytes.toBytes("endTime"), Bytes.toBytes(endTime));
  60. context.write(immutableBytesWritable,keyValue5);
  61. KeyValue keyValue6 = new KeyValue(rk, family, Bytes.toBytes("date"), Bytes.toBytes(date));
  62. context.write(immutableBytesWritable,keyValue6);
  63. }
  64. }
  65. }
  66. public class BulkLoadingDemo {
  67. public static void main(String[] args) throws Exception{
  68. //获取集群配置文件,可以是hdfs集群也可以是hbase集群
  69. Configuration conf = HBaseConfiguration.create();
  70. //设置zookeeper集群信息
  71. conf.set("hbase.zookeeper.quorum","master:2181,node1:2181,node2:2181");
  72. //创建Job作业
  73. Job job = Job.getInstance(conf);
  74. //给job作业起一个名字
  75. job.setJobName("hbase使用bulkloading方式批量加载数据作业");
  76. //设置主类
  77. job.setJarByClass(BulkLoadingDemo.class);
  78. //设置Map类
  79. job.setMapperClass(MyBulkLoadMapper.class);
  80. //无需设置自定义reduce类,使用hbase中的reduce类
  81. job.setOutputFormatClass(HFileOutputFormat2.class);
  82. //设置map的输出keyvalue类型
  83. //key -- 行键
  84. //value -- 单元格(列)
  85. job.setMapOutputKeyClass(ImmutableBytesWritable.class);
  86. job.setMapOutputValueClass(KeyValue.class);
  87. //设置region中字典排序的过程的类
  88. job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
  89. job.setReducerClass(KeyValueSortReducer.class);
  90. //设置文件的输入输出路径
  91. FileInputFormat.setInputPaths(job,new Path("/bigdata29/data/dianxin_data"));
  92. //设置hfile文件
  93. FileOutputFormat.setOutputPath(job,new Path("/bigdata29/hbase/out2"));
  94. //获取数据库连接对象
  95. Connection conn = ConnectionFactory.createConnection(conf);
  96. //获取数据库操作对象
  97. Admin admin = conn.getAdmin();
  98. TableName tn = TableName.valueOf("dianxin_data_bulk");
  99. Table table = conn.getTable(tn);
  100. RegionLocator regionLocator = conn.getRegionLocator(tn);
  101. //使用HFileOutputFormat2类创建Hfile文件
  102. HFileOutputFormat2.configureIncrementalLoad(job,table,regionLocator);
  103. //提交作业并执行
  104. boolean b = job.waitForCompletion(true);
  105. if(b){
  106. System.out.println("====================== Hfile文件生成成功!! 在/bigdata29/hbase/out2目录下 ================================");
  107. }else {
  108. System.out.println("============= Hfile文件生成失败!! ==================");
  109. }
  110. }
  111. }

 这个命令在Linux运行

hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles /bigdata29/hbase/out2 dianxin_data_bulk

五 HBase的rowKey设计

5.1.相关概念

HBase是三维有序存储的,通过rowkey(行键),column key(column family和qualifier)和TimeStamp(时间戳)这个三个维度可以对HBase中的数据进行快速定位。

HBase中rowkey可以唯一标识一行记录,在HBase查询的时候,有两种方式:

通过get方式,指定rowkey获取唯一一条记录

通过scan方式,设置startRow和stopRow参数进行范围匹配

全表扫描,即直接扫描整张表中所有行记录

5.2.相关原则

1.长度不宜过长

rowkey是一个二进制码流,可以是任意字符串,最大长度 64kb ,实际应用中一般为10-100bytes,以 byte[] 形式保存,一般设计成定长。

建议越短越好,不要超过16个字节,原因如下:

数据的持久化文件HFile中是按照KeyValue存储的,如果rowkey过长,比如超过100字节,1000w行数据,光rowkey就要占用100*1000w=10亿个字节,将近1G数据,这样会极大影响HFile的存储效率;

MemStore将缓存部分数据到内存,如果rowkey字段过长,内存的有效利用率就会降低,系统不能缓存更多的数据,这样会降低检索效率。

目前操作系统都是64位系统,内存8字节对齐,控制在16个字节,8字节的整数倍利用了操作系统的最佳特性。

2.散列性

如果rowkey按照时间戳的方式递增,不要将时间放在二进制码的前面,建议将rowkey的高位作为散列字段,由程序随机生成,低位放时间字段,这样将提高数据均衡分布在每个RegionServer,以实现负载均衡的几率。如果没有散列字段,首字段直接是时间信息,所有的数据都会集中在一个RegionServer上,这样在数据检索的时候负载会集中在个别的RegionServer上,造成热点问题,会降低查询效率。

3.唯一性

必须在设计上保证其唯一性,rowkey是按照字典顺序排序存储的,因此,设计rowkey的时候,要充分利用这个排序的特点,将经常读取的数据存储到一块,将最近可能会被访问的数据放到一块。

5.3 设计原则

1.什么是热点

HBase中的行是按照rowkey的字典顺序排序的,这种设计优化了scan操作,可以将相关的行以及会被一起读取的行存取在临近位置,便于scan。然而糟糕的rowkey设计是热点的源头。 热点发生在大量的client直接访问集群的一个或极少数个节点(访问可能是读,写或者其他操作)。大量访问会使热点region所在的单个机器超出自身承受能力,引起性能下降甚至region不可用,这也会影响同一个RegionServer上的其他region,由于主机无法服务其他region的请求。 设计良好的数据访问模式以使集群被充分,均衡的利用。

为了避免写热点,设计rowkey使得不同行在同一个region,但是在更多数据情况下,数据应该被写入集群的多个region,而不是一个。

下面是一些常见的避免热点的方法以及它们的优缺点:

加盐

这里所说的加盐不是密码学中的加盐,而是在rowkey的前面增加随机数,具体就是给rowkey分配一个随机前缀以使得它和之前的rowkey的开头不同。分配的前缀种类数量应该和你想使用数据分散到不同的region的数量一致。加盐之后的rowkey就会根据随机生成的前缀分散到各个region上,以避免热点。

哈希

哈希会使同一行永远用一个前缀加盐。哈希也可以使负载分散到整个集群,但是读却是可以预测的。使用确定的哈希可以让客户端重构完整的rowkey,可以使用get操作准确获取某一个行数据

反转

第三种防止热点的方法时反转固定长度或者数字格式的rowkey。这样可以使得rowkey中经常改变的部分(最没有意义的部分)放在前面。这样可以有效的随机rowkey,但是牺牲了rowkey的有序性。

反转rowkey的例子以手机号为rowkey,可以将手机号反转后的字符串作为rowkey,这样的就避免了以手机号那样比较固定开头导致热点问题

时间戳反转

一个常见的数据处理问题是快速获取数据的最近版本,使用反转的时间戳作为rowkey的一部分对这个问题十分有用,可以用 Long.Max_Value - timestamp 追加到key的末尾,例如 [key]reverse_timestamp , [key] 的最新值可以通过scan [key]获得[key]的第一条记录,因为HBase中rowkey是有序的,第一条记录是最后录入的数据。

示例

  1. # 原数据:以时间戳_user_id作为rowkey
  2. # 时间戳高位变化不大,太连续,最终可能会导致热点问题
  3. 1638584124_user_id
  4. 1638584135_user_id
  5. 1638584146_user_id
  6. 1638584157_user_id
  7. 1638584168_user_id
  8. 1638584179_user_id
  9. ...
  10. # 解决方案:加盐、反转、哈希
  11. # 加盐
  12. # 加上随即前缀,随机的打散
  13. # 该过程无法预测 前缀时随机的
  14. 00_1638584124_user_id
  15. 05_1638584135_user_id
  16. 03_1638584146_user_id
  17. 04_1638584157_user_id
  18. 02_1638584168_user_id
  19. 06_1638584179_user_id
  20. # 反转
  21. # 适用于高位变化不大,低位变化大的rowkey
  22. 4214858361_user_id
  23. 5314858361_user_id
  24. 6414858361_user_id
  25. 7514858361_user_id
  26. 8614858361_user_id
  27. 9714858361_user_id
  28. # 散列 md5、sha1、sha256......
  29. 25531D7065AE158AAB6FA53379523979_user_id
  30. 60F9A0072C0BD06C92D768DACF2DFDC3_user_id
  31. D2EFD883A6C0198DA3AF4FD8F82DEB57_user_id
  32. A9A4C265D61E0801D163927DE1299C79_user_id
  33. 3F41251355E092D7D8A50130441B58A5_user_id
  34. 5E6043C773DA4CF991B389D200B77379_user_id
  35. # 时间戳"反转"
  36. # rowkey:时间戳_user_id
  37. # rowkey是字典升序的,那么越新的记录会被排在最后面,不容易被获取到
  38. # 需求:让最新的记录排在最前面
  39. # 大数:9999999999
  40. # 大数-小数
  41. 1638584124_user_id => 8361415875_user_id
  42. 1638584135_user_id => 8361415864_user_id
  43. 1638584146_user_id => 8361415853_user_id
  44. 1638584157_user_id => 8361415842_user_id
  45. 1638584168_user_id => 8361415831_user_id
  46. 1638584179_user_id => 8361415820_user_id
  47. 1638586193_user_id => 8361413806_user_id

5.4实战案例

电信案例

要求

  1. 手机号,网格编号,城市编号,区县编号,停留时间,进入时间,离开时间,时间分区
  2. D55433A437AEC8D8D3DB2BCA56E9E64392A9D93C,117210031795040,83401,8340104,301,20180503190539,20180503233517,20180503
  3. 将用户位置数据保存到hbase
  4.     查询需求
  5.         1、通过手机号查询用户最近10条位置记录
  6.         2、获取用户某一天在一个城市中的所有位置
  7.     怎么设计hbase表
  8.         1、rowkey
  9.         2、时间戳

代码

  1. package com.shujia.jinjie;
  2. import com.shujia.utils.HBaseUtils;
  3. import org.apache.hadoop.hbase.CompareOperator;
  4. import org.apache.hadoop.hbase.TableName;
  5. import org.apache.hadoop.hbase.client.*;
  6. import org.apache.hadoop.hbase.filter.BinaryComparator;
  7. import org.apache.hadoop.hbase.filter.FilterList;
  8. import org.apache.hadoop.hbase.filter.PrefixFilter;
  9. import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
  10. import org.apache.hadoop.hbase.util.Bytes;
  11. import java.io.BufferedReader;
  12. import java.io.FileReader;
  13. import java.util.Scanner;
  14. /**
  15. * 查询需求
  16. * 1、通过手机号查询用户最近10条位置记录
  17. *
  18. * 2、获取用户某一天在一个城市中的所有位置
  19. */
  20. public class DianXinRowKeyDemo {
  21. //创建数据库连接对象
  22. //创建数据库操作对象
  23. private static Connection conn = HBaseUtils.CONNECTION;
  24. private static Admin admin = HBaseUtils.ADMIN;
  25. public static void main(String[] args) throws Exception{
  26. //建表
  27. // HBaseUtils.createOneTable("dianxin_tb1","info");
  28. //读取文件数据,添加到hbase中的dianxin_tb1表中
  29. // putsData();
  30. //通过手机号查询用户最近10条位置记录
  31. Scanner sc = new Scanner(System.in);
  32. System.out.println("请输入要查询的手机号:");
  33. String phoneNum = sc.next();
  34. // findUser(phoneNum);
  35. //2、获取用户某一天在一个城市中的所有位置
  36. System.out.println("请输入要查询的城市编号:");
  37. String city = sc.next();
  38. findUserPosition(phoneNum,city);
  39. }
  40. public static void findUserPosition(String number,String city) throws Exception{
  41. //将表封装成一个TableName对象
  42. TableName tn = TableName.valueOf("dianxin_tb1");
  43. //获取表对象
  44. Table table = conn.getTable(tn);
  45. //scan
  46. Scan scan = new Scan();
  47. //创建一个行键前缀过滤器对象
  48. //public PrefixFilter(byte[] prefix)
  49. PrefixFilter prefixFilter = new PrefixFilter(Bytes.toBytes(number));
  50. //过滤城市
  51. SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("city"),
  52. CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes(city)));
  53. //创建过滤器集合
  54. FilterList filterList = new FilterList();
  55. filterList.addFilter(prefixFilter);
  56. filterList.addFilter(singleColumnValueFilter);
  57. scan.setFilter(filterList);
  58. ResultScanner resultScanner = table.getScanner(scan);
  59. for (Result result : resultScanner) {
  60. String rk = Bytes.toString(result.getRow());
  61. //47BE1E866CFC071DB19D5E1C056BE28AE24C16E7-9496768070
  62. String[] strings = rk.split("-");
  63. String phoneNum = strings[0];
  64. long startTime = 20190000000000L - Long.parseLong(strings[1]);
  65. String wg = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("wg")));
  66. String city2 = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("city")));
  67. String qx = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("qx")));
  68. String stayTime = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("stayTime")));
  69. String endTime = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("endTime")));
  70. String date = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("date")));
  71. System.out.println("手机号:"+phoneNum+",进入网格时间:"+startTime+",离开网格时间:"+endTime
  72. +",网格编号:"+wg+",城市:"
  73. +city2+",区县:"+qx+",停留时间:"
  74. +stayTime+",日期:"+date);
  75. }
  76. }
  77. //查询函数
  78. public static void findUser(String number) throws Exception{
  79. //将表封装成一个TableName对象
  80. TableName tn = TableName.valueOf("dianxin_tb1");
  81. //获取表对象
  82. Table table = conn.getTable(tn);
  83. //scan
  84. Scan scan = new Scan();
  85. //创建一个行键前缀过滤器对象
  86. //public PrefixFilter(byte[] prefix)
  87. PrefixFilter prefixFilter = new PrefixFilter(Bytes.toBytes(number));
  88. //设置过滤器
  89. //public Scan setFilter(Filter filter)
  90. scan.setFilter(prefixFilter);
  91. scan.setLimit(10);
  92. //创建结果对象
  93. ResultScanner resultScanner = table.getScanner(scan);
  94. for (Result result : resultScanner) {
  95. String rk = Bytes.toString(result.getRow());
  96. //47BE1E866CFC071DB19D5E1C056BE28AE24C16E7-9496768070
  97. String[] strings = rk.split("-");
  98. String phoneNum = strings[0];
  99. long startTime = 20190000000000L - Long.parseLong(strings[1]);
  100. String wg = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("wg")));
  101. String city = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("city")));
  102. String qx = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("qx")));
  103. String stayTime = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("stayTime")));
  104. String endTime = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("endTime")));
  105. String date = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("date")));
  106. System.out.println("手机号:"+phoneNum+",进入网格时间:"+startTime+",离开网格时间:"+endTime
  107. +",网格编号:"+wg+",城市:"
  108. +city+",区县:"+qx+",停留时间:"
  109. +stayTime+",日期:"+date);
  110. }
  111. }
  112. //添加函数
  113. public static void putsData() throws Exception{
  114. //将表封装成一个TableName对象
  115. TableName tn = TableName.valueOf("dianxin_tb1");
  116. //获取表对象
  117. Table table = conn.getTable(tn);
  118. //获取IO对象
  119. BufferedReader br = new BufferedReader(new FileReader("hbase/data/dianxin_data"));
  120. String line=null;
  121. while ((line= br.readLine())!=null){
  122. if (!(line.contains("shushan")) && !(line.contains("\\N"))){
  123. String[] strings = line.split("\t");
  124. String phoneNum = strings[0];
  125. String wg = strings[1];
  126. String city = strings[2];
  127. String qx = strings[3];
  128. String stayTime = strings[4];
  129. String startTime = strings[5];
  130. String endTime = strings[6];
  131. String date = strings[7];
  132. //我们为了需求的查询更加方便,以手机号与进入网格的时间进行拼接
  133. //因为要查询最近的,可以使用时间戳反转对rowkey进行设计
  134. //20180503173254
  135. //20190000000000
  136. long new_time = 20190000000000L - Long.parseLong(startTime);
  137. String rk = phoneNum + "-" + new_time;
  138. HBaseUtils.putOneDataToTable("dianxin_tb1",rk,"info","wg",wg);
  139. HBaseUtils.putOneDataToTable("dianxin_tb1",rk,"info","city",city);
  140. HBaseUtils.putOneDataToTable("dianxin_tb1",rk,"info","qx",qx);
  141. HBaseUtils.putOneDataToTable("dianxin_tb1",rk,"info","stayTime",stayTime);
  142. HBaseUtils.putOneDataToTable("dianxin_tb1",rk,"info","endTime",endTime);
  143. HBaseUtils.putOneDataToTable("dianxin_tb1",rk,"info","date",date);
  144. }
  145. }
  146. }
  147. }

六 二级索引

6.1相关概念

二级索引的本质就是建立各列值与行键之间的映射关系

Hbase的局限性:

  HBase本身只提供基于行键和全表扫描的查询,而行键索引单一,对于多维度的查询困难。

6.2 常见的二级索引

HBase的一级索引就是rowkey,我们只能通过rowkey进行检索。如果我们相对hbase里面列族的列列进行一些组合查询,就需要采用HBase的二级索引方案来进行多条件的查询。

  1. MapReduce方案

  2. ITHBASE(Indexed-Transanctional HBase)方案

  3. IHBASE(Index HBase)方案

  4. Hbase Coprocessor(协处理器)方案

  5. Solr+hbase方案 redis+hbase 方案

  6. CCIndex(complementalclustering index)方案

6.3 二级索引的种类

1、创建单列索引

  2、同时创建多个单列索引

  3、创建联合索引(最多同时支持3个列)

  4、只根据rowkey创建索引

6.4 Mapreduce 创建二级索引

使用整合MapReduce的方式创建hbase索引。主要的流程如下:

1.1扫描输入表,使用hbase继承类TableMapper

1.2获取rowkey和指定字段名称和字段值

1.3创建Put实例, value=” “, rowkey=班级,column=学号

1.4使用TableReducer将数据写入索引表

首先二级索引表必须先要存在

该Map继承的是TableMapper

该Reducer继承的是TableReducer

代码如下

  1. package com.shujia.jinjie;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.hbase.HBaseConfiguration;
  4. import org.apache.hadoop.hbase.client.Mutation;
  5. import org.apache.hadoop.hbase.client.Put;
  6. import org.apache.hadoop.hbase.client.Result;
  7. import org.apache.hadoop.hbase.client.Scan;
  8. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  9. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
  10. import org.apache.hadoop.hbase.mapreduce.TableMapper;
  11. import org.apache.hadoop.hbase.mapreduce.TableReducer;
  12. import org.apache.hadoop.hbase.util.Bytes;
  13. import org.apache.hadoop.io.NullWritable;
  14. import org.apache.hadoop.io.Text;
  15. import org.apache.hadoop.mapreduce.Job;
  16. import org.apache.hadoop.mapreduce.Mapper;
  17. import org.apache.hadoop.mapreduce.Reducer;
  18. import java.io.IOException;
  19. //因为我们现在要读取的数据来自于hbase中的hfile文件,与hdfs上普通的block块文件有所区别,不能直接继承Mapper类
  20. //要继承hbase读取数据专属的Mapper类 TableMapper
  21. //public abstract class TableMapper<KEYOUT, VALUEOUT> extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT>
  22. class MyIndexMapper extends TableMapper<Text,NullWritable>{
  23. @Override
  24. protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, NullWritable>.Context context) throws IOException, InterruptedException {
  25. //ImmutableBytesWritable key --相当于是读取到一行的行键
  26. //Result value --相当于读取到一行多列的封装
  27. //获取行键,
  28. String id = Bytes.toString(key.get());
  29. //获取姓名列
  30. // public byte[] getValue(byte[] family, byte[] qualifier)
  31. String name = Bytes.toString(value.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")));
  32. //将姓名与学号连接起来给到reduce
  33. context.write(new Text(id+"-"+name),NullWritable.get());
  34. }
  35. }
  36. //public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, Mutation>
  37. class MyIndexReducer extends TableReducer<Text,NullWritable,NullWritable>{
  38. @Override
  39. protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, NullWritable, Mutation>.Context context) throws IOException, InterruptedException {
  40. String string = key.toString();
  41. String id = string.split("-")[0];
  42. String name = string.split("-")[1];
  43. //将要添加的数据封装成Put类的对象
  44. Put put = new Put(Bytes.toBytes(name));
  45. //public Put addImmutable(byte[] family, byte[] qualifier, byte[] value)
  46. put.addColumn(Bytes.toBytes("info"),Bytes.toBytes(id),Bytes.toBytes(""));
  47. context.write(NullWritable.get(),put);
  48. }
  49. }
  50. public class HBaseIndexDemo1 {
  51. public static void main(String[] args) throws Exception {
  52. //获取集群配置文件,可以是hdfs集群也可以是hbase集群
  53. Configuration conf = HBaseConfiguration.create();
  54. //设置zookeeper集群信息
  55. conf.set("hbase.zookeeper.quorum","master:2181,node1:2181,node2:2181");
  56. //创建job对象
  57. Job job = Job.getInstance(conf);
  58. //给job任务取名字
  59. job.setJobName("给学生表创建二级索引表");
  60. //设置主类
  61. job.setJarByClass(HBaseIndexDemo1.class);
  62. //因为索引表的构建是建立列值与行键的映射关系,要获取所有的数据
  63. //scan扫描全表数据
  64. Scan scan = new Scan();
  65. //告诉输入的列值来自于哪一个列簇
  66. scan.addFamily(Bytes.toBytes("info"));
  67. /**
  68. * @param table The table name to read from.
  69. * @param scan The scan instance with the columns, time range etc.
  70. * @param mapper The mapper class to use.
  71. * @param outputKeyClass The class of the output key.
  72. * @param outputValueClass The class of the output value.
  73. * @param job The current job to adjust. Make sure the passed job is
  74. * carrying all necessary HBase configuration.
  75. * @throws IOException When setting up the details fails.
  76. *public static void initTableMapperJob
  77. * (String table,Scan scan,Class<? extends TableMapper> mapper,Class<?> outputKeyClass,Class<?> outputValueClass,Job job)
  78. */
  79. //初始化Map任务
  80. TableMapReduceUtil.initTableMapperJob("students2",scan,MyIndexMapper.class, Text.class, NullWritable.class,job);
  81. //初始化Reduce任务
  82. TableMapReduceUtil.initTableReducerJob("students2_index",MyIndexReducer.class,job);
  83. //提交作业到集群中允许
  84. boolean b = job.waitForCompletion(true);
  85. if (b) {
  86. System.out.println("================== students2索引表构建成功!!!============================");
  87. } else {
  88. System.out.println("================== students2索引表构建失败!!!============================");
  89. }
  90. }
  91. }

6.5 Redis创建二级索引

1.指定redis端口号

Jedis jedis = new Jedis("192.168.73.100", 12346);

2.在redis中构建映射关系(性别:学号)因为男生的学号有很多个,且不重复,所以我们在redis中采用set的数据类型存储

  1. public static void buildIndexInRedis() throws Exception {
  2. //获取要构建索引的原表
  3. Table students2 = conn.getTable(TableName.valueOf("students2"));
  4. Scan scan = new Scan();
  5. //获取男生的学号,放入到redis中
  6. //创建列值过滤器
  7. ValueFilter filter1 = new ValueFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("男")));
  8. scan.setFilter(filter1);
  9. ResultScanner resultScanner = students2.getScanner(scan);
  10. for (Result result : resultScanner) {
  11. //获取每一行的行键即可
  12. String id = Bytes.toString(result.getRow());
  13. //将学号以值的方式添加到redis键对应的值中
  14. //因为男生的学号有很多个,且不重复,所以我们在redis中采用set的数据类型存储
  15. jedis.sadd("性别:男", id);
  16. }
  17. //获取男生的学号,放入到redis中
  18. //创建列值过滤器
  19. ValueFilter filter2 = new ValueFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("女")));
  20. scan.setFilter(filter2);
  21. ResultScanner resultScanner2 = students2.getScanner(scan);
  22. for (Result result : resultScanner2) {
  23. //获取每一行的行键即可
  24. String id = Bytes.toString(result.getRow());
  25. //将学号以值的方式添加到redis键对应的值中
  26. //因为男生的学号有很多个,且不重复,所以我们在redis中采用set的数据类型存储
  27. jedis.sadd("性别:女", id);
  28. }

3.先通过查询redis中性别对应的学号,拿着学号去hbase原表中查询获取结果

  1. package com.shujia.jinjie;
  2. import com.shujia.utils.HBaseUtils;
  3. import com.shujia.utils.HBaseUtils;
  4. import org.apache.hadoop.hbase.CompareOperator;
  5. import org.apache.hadoop.hbase.TableName;
  6. import org.apache.hadoop.hbase.client.*;
  7. import org.apache.hadoop.hbase.filter.BinaryComparator;
  8. import org.apache.hadoop.hbase.filter.ValueFilter;
  9. import org.apache.hadoop.hbase.util.Bytes;
  10. import redis.clients.jedis.Jedis;
  11. import java.util.Scanner;
  12. import java.util.Set;
  13. /*
  14. 使用redis第三方的存储工具存储hbase索引(本质依旧是列值与行键产生映射关系)
  15. */
  16. public class HBaseWithRedisIndex {
  17. //1、获取hbase数据库连接对象和操作对象
  18. static Connection conn = HBaseUtils.CONNECTION;
  19. static Admin admin = HBaseUtils.ADMIN;
  20. //获取redis连接对象
  21. static Jedis jedis = new Jedis("192.168.73.100", 12346);
  22. public static void main(String[] args) throws Exception {
  23. //步骤1:在redis中构建映射关系(性别:学号)
  24. // buildIndexInRedis();
  25. //使用:先通过查询redis中性别对应的学号,拿着学号去hbase原表中查询获取结果
  26. Scanner sc = new Scanner(System.in);
  27. System.out.println("请输入您要查询的性别:");
  28. String gender = sc.next();
  29. selectGenderFromHbase(gender);
  30. }
  31. public static void selectGenderFromHbase(String gender) throws Exception {
  32. if ("男".equals(gender)) {
  33. selectIdFromRedis(gender);
  34. } else if ("女".equals(gender)) {
  35. selectIdFromRedis(gender);
  36. } else {
  37. System.out.println("没有该性别");
  38. }
  39. }
  40. //单独编写一个方法查询redis
  41. public static void selectIdFromRedis(String gender) throws Exception {
  42. Table students2 = conn.getTable(TableName.valueOf("students2"));
  43. //redis中set查询值的方法
  44. Set<String> ids = jedis.smembers("性别:"+gender);
  45. for (String id : ids) {
  46. // Result result = students2.get(new Get(Bytes.toBytes(id)).addColumn(Bytes.toBytes("info"), Bytes.toBytes("name")));
  47. Get get = new Get(Bytes.toBytes(id));
  48. //获取那一列的所有信息封装成一个结果对象
  49. get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"));
  50. Result result = students2.get(get);
  51. String name = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")));
  52. System.out.println("学号:" + id + ",姓名:" + name);
  53. }
  54. }
  55. public static void buildIndexInRedis() throws Exception {
  56. //获取要构建索引的原表
  57. Table students2 = conn.getTable(TableName.valueOf("students2"));
  58. Scan scan = new Scan();
  59. //获取男生的学号,放入到redis中
  60. //创建列值过滤器
  61. ValueFilter filter1 = new ValueFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("男")));
  62. scan.setFilter(filter1);
  63. ResultScanner resultScanner = students2.getScanner(scan);
  64. for (Result result : resultScanner) {
  65. //获取每一行的行键即可
  66. String id = Bytes.toString(result.getRow());
  67. //将学号以值的方式添加到redis键对应的值中
  68. //因为男生的学号有很多个,且不重复,所以我们在redis中采用set的数据类型存储
  69. jedis.sadd("性别:男", id);
  70. }
  71. //获取男生的学号,放入到redis中
  72. //创建列值过滤器
  73. ValueFilter filter2 = new ValueFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("女")));
  74. scan.setFilter(filter2);
  75. ResultScanner resultScanner2 = students2.getScanner(scan);
  76. for (Result result : resultScanner2) {
  77. //获取每一行的行键即可
  78. String id = Bytes.toString(result.getRow());
  79. //将学号以值的方式添加到redis键对应的值中
  80. //因为男生的学号有很多个,且不重复,所以我们在redis中采用set的数据类型存储
  81. jedis.sadd("性别:女", id);
  82. }
  83. }
  84. }

6.6 Phoenix二级索引

对于Hbase,如果想精确定位到某行记录,唯一的办法就是通过rowkey查询。如果不通过rowkey查找数据,就必须逐行比较每一行的值,对于较大的表,全表扫描的代价是不可接受的。

1.开启索引,配置相关文件

  1. # 关闭hbase集群
  2. stop-hbase.sh
  3. # 在/usr/local/soft/hbase-1.4.6/conf/hbase-site.xml中增加如下配置
  4. <property>
  5. <name>hbase.regionserver.wal.codec</name>
  6. <value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value>
  7. </property>
  8. <property>
  9. <name>hbase.rpc.timeout</name>
  10. <value>60000000</value>
  11. </property>
  12. <property>
  13. <name>hbase.client.scanner.timeout.period</name>
  14. <value>60000000</value>
  15. </property>
  16. <property>
  17. <name>phoenix.query.timeoutMs</name>
  18. <value>60000000</value>
  19. </property>
  20. # 同步到所有节点
  21. scp hbase-site.xml node1:`pwd`
  22. scp hbase-site.xml node2:`pwd`
  23. # 修改phoenix目录下的bin目录中的hbase-site.xml
  24. <property>
  25. <name>hbase.rpc.timeout</name>
  26. <value>60000000</value>
  27. </property>
  28. <property>
  29. <name>hbase.client.scanner.timeout.period</name>
  30. <value>60000000</value>
  31. </property>
  32. <property>
  33. <name>phoenix.query.timeoutMs</name>
  34. <value>60000000</value>
  35. </property>
  36. # 启动hbase
  37. start-hbase.sh
  38. # 重新进入phoenix客户端
  39. sqlline.py master,node1,node2

6.6.1 全局索引

全局索引适合读多写少的场景。如果使用全局索引,读数据基本不损耗性能,所有的性能损耗都来源于写数据。数据表的添加、删除和修改都会更新相关的索引表(数据删除了,索引表中的数据也会删除;数据增加了,索引表的数据也会增加)

注意: 对于全局索引在默认情况下,在查询语句中检索的列如果不在索引表中,Phoenix不会使用索引表将,除非使用hint。

1.导入数据

首先创建一个shell命令,里面有建表语句,取名为DIANXIN.sql

  1. CREATE TABLE IF NOT EXISTS DIANXIN (
  2. mdn VARCHAR ,
  3. start_date VARCHAR ,
  4. end_date VARCHAR ,
  5. county VARCHAR,
  6. x DOUBLE ,
  7. y DOUBLE,
  8. bsid VARCHAR,
  9. grid_id VARCHAR,
  10. biz_type VARCHAR,
  11. event_type VARCHAR ,
  12. data_source VARCHAR ,
  13. CONSTRAINT PK PRIMARY KEY (mdn,start_date)
  14. ) column_encoded_bytes=0;

2.准备数据

DIANXIN.csv

3.输入命令

# 导入数据,Linux命令
psql.py master,node1,node2 DIANXIN.sql DIANXIN.csv

4.导入数据好的截图

5.创建全局索引
CREATE INDEX DIANXIN_INDEX ON DIANXIN ( end_date );

6 查询数据 ( 索引未生效)
select * from DIANXIN where end_date = '20180503154014';

很明显查询需要2秒多,

7  强制使用索引 (索引生效) hint  语法糖

语法糖:/*+ INDEX(DIANXIN DIANXIN_INDEX) */
select /*+ INDEX(DIANXIN DIANXIN_INDEX) */  * from DIANXIN where end_date = '20180503154014';

很明显查询零点几秒,加了语法糖,查询速度加快

8 取索引列,(索引生效)

取我们设置的单级索引列,速度一样快
select end_date from DIANXIN where end_date = '20180503154014';

9.#创建多列索引
CREATE INDEX DIANXIN_INDEX1 ON DIANXIN ( end_date,COUNTY );

取我们多级索引的某一个索引列单独查询速度很快的

10 查询所有列 (索引未生效)
select  * from DIANXIN where end_date = '20180503154014'  and COUNTY = '8340104';

这个语法糖写错了,就当没有写,查询需要1秒多

11 查询所有列 (索引生效)
select /*+ INDEX(DIANXIN DIANXIN_INDEX1) */ * from DIANXIN where end_date = '20180503154014' and COUNTY = '8340104';

加了语法糖,只要零点几秒

12  单条件  (索引未生效)
select end_date from DIANXIN where  COUNTY = '8340103';
# 单条件  (索引生效) end_date 在前
select COUNTY from DIANXIN where end_date = '20180503154014';

因为创建多级索引是 CREATE INDEX DIANXIN_INDEX1 ON DIANXIN ( end_date,COUNTY )

end_date 在前面

13 删除索引
drop index 索引名 on DIANXIN;
drop index DIANXIN_INDEX on DIANXIN;

6.6.2 本地索引

1.相关概念

本地索引适合写多读少的场景,或者存储空间有限的场景。和全局索引一样,Phoenix也会在查询的时候自动选择是否使用本地索引。本地索引因为索引数据和原数据存储在同一台机器上,避免网络数据传输的开销,所以更适合写多的场景。由于无法提前确定数据在哪个Region上,所以在读数据的时候,需要检查每个Region上的数据从而带来一些性能损耗。

注意:对于本地索引,查询中无论是否指定hint或者是查询的列是否都在索引表中,都会使用索引表。

 2.创建本地索引
CREATE LOCAL INDEX DIANXIN_LOCAL_IDEX ON DIANXIN(grid_id);

3. 索引生效
select grid_id from dianxin where grid_id='117285031820040';

索引生效
select * from dianxin where grid_id='117285031820040';

6.6.3 覆盖索引

1.相关概念

覆盖索引是把原数据存储在索引数据表中,这样在查询时不需要再去HBase的原表获取数据就,直接返回查询结果。

注意:查询是 select 的列和 where 的列都需要在索引中出现。

2.创建覆盖索引
CREATE INDEX DIANXIN_INDEX_COVER ON DIANXIN ( x,y ) INCLUDE ( county );

3.

# 查询所有列 (索引未生效)
select * from DIANXIN where x=117.288 and y =31.822;

# 强制使用索引 (索引生效)
select /*+ INDEX(DIANXIN DIANXIN_INDEX_COVER) */ * from DIANXIN where x=117.288 and y =31.822;

# 查询索引中的列 (索引生效) mdn是DIANXIN表的RowKey中的一部分
select x,y,county from DIANXIN where x=117.288 and y =31.822;
select mdn,x,y,county from DIANXIN where x=117.288 and y =31.822;

# 查询条件必须放在索引中  select 中的列可以放在INCLUDE (将数据保存在索引中)
select /*+ INDEX(DIANXIN DIANXIN_INDEX_COVER) */ x,y,count(*) from DIANXIN group by x,y;

6.7 Phoenix JCBC

1.导入依赖

  1. <!-- https://mvnrepository.com/artifact/org.apache.phoenix/phoenix-hbase-compat-2.2.5 -->
  2. <dependency>
  3. <groupId>org.apache.phoenix</groupId>
  4. <artifactId>phoenix-hbase-compat-2.2.5</artifactId>
  5. <version>5.1.3</version>
  6. </dependency>
  7. <!-- https://mvnrepository.com/artifact/org.apache.phoenix/phoenix-core -->
  8. <dependency>
  9. <groupId>org.apache.phoenix</groupId>
  10. <artifactId>phoenix-core</artifactId>
  11. <version>5.1.3</version>
  12. </dependency>
  13. <dependency>
  14. <groupId>com.lmax</groupId>
  15. <artifactId>disruptor</artifactId>
  16. <version>3.4.2</version>
  17. </dependency>

代码

  1. package com.shujia.jdbc;
  2. import java.sql.Connection;
  3. import java.sql.DriverManager;
  4. import java.sql.PreparedStatement;
  5. import java.sql.ResultSet;
  6. public class PhoenixJDBC {
  7. public static void main(String[] args) throws Exception {
  8. //注册驱动
  9. Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
  10. //创建与phoenix连接对象
  11. Connection conn = DriverManager.getConnection("jdbc:phoenix:master,node1,node2:2181");
  12. PreparedStatement prep = conn.prepareStatement("select /*+ INDEX(DIANXIN DIANXIN_INDEX) */ * from DIANXIN where end_date = ? and start_date = ?");
  13. prep.setString(1,"20180503154014");
  14. prep.setString(2,"20180503154614");
  15. ResultSet resultSet = prep.executeQuery();
  16. while (resultSet.next()){
  17. /*
  18. mdn VARCHAR ,
  19. start_date VARCHAR ,
  20. end_date VARCHAR ,
  21. county VARCHAR,
  22. x DOUBLE ,
  23. y DOUBLE,
  24. bsid VARCHAR,
  25. grid_id VARCHAR,
  26. biz_type VARCHAR,
  27. event_type VARCHAR ,
  28. data_source VARCHAR ,
  29. */
  30. String mdn = resultSet.getString("mdn");
  31. String start_date = resultSet.getString("start_date");
  32. String end_date = resultSet.getString("end_date");
  33. String county = resultSet.getString("county");
  34. double x = resultSet.getDouble("x");
  35. double y = resultSet.getDouble("y");
  36. String bsid = resultSet.getString("bsid");
  37. String grid_id = resultSet.getString("grid_id");
  38. String biz_type = resultSet.getString("biz_type");
  39. String event_type = resultSet.getString("event_type");
  40. String data_source = resultSet.getString("data_source");
  41. System.out.println(mdn+","+start_date+","+end_date+","+county+","+x+","+y+","+bsid
  42. +","+grid_id+","+biz_type+","+event_type+","+data_source);
  43. }
  44. //释放资源
  45. prep.close();
  46. conn.close();
  47. }
  48. }

七 HBase的调优

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/510650
推荐阅读
相关标签
  

闽ICP备14008679号