赞
踩
Hbase读取数据的流程:
1)是由客户端发起读取数据的请求,首先会与zookeeper建立连接
2)从zookeeper中获取一个hbase:meta表位置信息,被哪一个regionserver所管理着
hbase:meta表:hbase的元数据表,在这个表中存储了自定义表相关的元数据,包括表名,表有哪些列簇,表有哪些region,每个region存储的位置,每个region被哪个regionserver所管理,这个表也是存储在某一个region上的,并且这个meta表只会被一个regionserver所管理。这个表的位置信息只有zookeeper知道。
3)连接这个meta表对应的regionserver,从meta表中获取当前你要读取的这个表对应的regionsever是谁。
当一个表多个region怎么办呢?
如果我们获取数据是以get的方式,只会返回一个regionserver
如果我们获取数据是以scan的方式,会将所有的region对应的regionserver的地址全部返回。
4)连接要读取表的对应的regionserver,从regionserver上的开始读取数据:
读取顺序:memstore-->blockcache-->storefile-->Hfile中
注意:如果是scan操作,就不仅仅去blockcache了,而是所有都会去找。
Hbase的写入数据流程:
1)由客户端发起写数据请求,首先会与zookeeper建立连接
2)从zookeeper中获取hbase:meta表被哪一个regionserver所管理
3)连接hbase:meta表中获取对应的regionserver地址 (从meta表中获取当前要写入数据的表对应的region所管理的regionserver) 只会返回一个regionserver地址
4)与要写入数据的regionserver建立连接,然后开始写入数据,将数据首先会写入到HLog,然后将数据写入到对应store模块中的memstore中
(可能会写多个),当这两个地方都写入完成之后,表示数据写入完成。
-------------------------后面的步骤是服务器内部的操作-----------------
异步操作
5)随着客户端不断地写入数据,memstore中的数据会越来多,当内存中的数据达到阈值(128M/1h)的时候,放入到blockchache中,生成新的memstore接收用户过来的数据,然后当blockcache的大小达到一定阈值(0.85)的时候,开始触发flush机制,将数据最终刷新到HDFS中形成小的Hfile文件。
6)随着不断地刷新,storefile不断地在HDFS上生成小HFIle文件,当小的HFile文件达到阈值的时候(3个及3个以上),就会触发Compaction机制,将小的HFile合并成一个大的HFile.
7)随着不断地合并,大的HFile文件会越来越大,当达到一定阈值(2.0版本之后最终10G)的时候,会触发分裂机制(split),将大的HFile文件进行一分为二,同时管理这个大的HFile的region也会被一分为二,形成两个新的region和两个新的HFile文件,一对一的进行管理,将原来旧的region和分裂之前大的HFile文件慢慢地就会下线处理。
region中存储的是一张表的数据,当region中的数据条数过多的时候,会直接影响查询效率。当region过大的时候,region会被拆分为两个region,HMaster会将分裂的region分配到不同的regionserver上,这样可以让请求分散到不同的RegionServer上,已达到负载均衡 , 这也是HBase的一个优点 。
ConstantSizeRegionSplitPolicy
0.94版本前,HBase region的默认切分策略
当region中最大的store大小超过某个阈值(hbase.hregion.max.filesize=10G)之后就会触发切分,一个region等分为2个region。
但是在生产线上这种切分策略却有相当大的弊端(切分策略对于大表和小表没有明显的区分):
阈值(hbase.hregion.max.filesize)设置较大对大表比较友好,但是小表就有可能不会触发分裂,极端情况下可能就1个,形成热点,这对业务来说并不是什么好事。
如果设置较小则对小表友好,但一个大表就会在整个集群产生大量的region,这对于集群的管理、资源使用、failover来说都不是一件好事。
IncreasingToUpperBoundRegionSplitPolicy
0.94版本~2.0版本默认切分策略
总体看和ConstantSizeRegionSplitPolicy思路相同,一个region中最大的store大小大于设置阈值就会触发切分。 但是这个阈值并不像ConstantSizeRegionSplitPolicy是一个固定的值,而是会在一定条件下不断调整,调整规则和region所属表在当前regionserver上的region个数有关系.
region split阈值的计算公式是:
设regioncount:是region所属表在当前regionserver上的region的个数
阈值 = regioncount^3 * 128M * 2,当然阈值并不会无限增长,最大不超过MaxRegionFileSize(10G),当region中最大的store的大小达到该阈值的时候进行region split
例如:
第一次split阈值 = 1^3 * 256 = 256MB
第二次split阈值 = 2^3 * 256 = 2048MB
第三次split阈值 = 3^3 * 256 = 6912MB
第四次split阈值 = 4^3 * 256 = 16384MB > 10GB,因此取较小的值10GB
后面每次split的size都是10GB了
特点
相比ConstantSizeRegionSplitPolicy,可以自适应大表、小表;
在集群规模比较大的情况下,对大表的表现比较优秀
对小表不友好,小表可能产生大量的小region,分散在各regionserver上
小表达不到多次切分条件,导致每个split都很小,所以分散在各个regionServer上
SteppingSplitPolicy
2.0版本默认切分策略
相比 IncreasingToUpperBoundRegionSplitPolicy 简单了一些 region切分的阈值依然和待分裂region所属表在当前regionserver上的region个数有关系
如果region个数等于1,切分阈值为flush size 128M
否则为MaxRegionFileSize。
这种切分策略对于大集群中的大表、小表会比 IncreasingToUpperBoundRegionSplitPolicy 更加友好,小表不会再产生大量的小region,而是适可而止。
KeyPrefixRegionSplitPolicy
根据rowKey的前缀对数据进行分区,这里是指定rowKey的前多少位作为前缀,比如rowKey都是16位的,指定前5位是前缀,那么前5位相同的rowKey在相同的region中。
DelimitedKeyPrefixRegionSplitPolicy
保证相同前缀的数据在同一个region中,例如rowKey的格式为:userid_eventtype_eventid,指定的delimiter为 _ ,则split的的时候会确保userid相同的数据在同一个region中。 按照分隔符进行切分,而KeyPrefixRegionSplitPolicy是按照指定位数切分。
BusyRegionSplitPolicy
按照一定的策略判断Region是不是Busy状态,如果是即进行切分
如果你的系统常常会出现热点Region,而你对性能有很高的追求,那么这种策略可能会比较适合你。它会通过拆分热点Region来缓解热点Region的压力,但是根据热点来拆分Region也会带来很多不确定性因素,因为你也不知道下一个被拆分的Region是哪个。
DisabledRegionSplitPolicy
不启用自动拆分, 需要指定手动拆分
Minor Compaction:
指选取一些小的、相邻的StoreFile将他们合并成一个更大的StoreFile,在这个过程中不会处理已经Deleted或Expired的Cell。一次 Minor Compaction 的结果是更少并且更大的StoreFile。
Major Compaction:
指将所有的StoreFile合并成一个StoreFile,这个过程会清理三类没有意义的数据:被删除的数据、TTL过期数据、版本号超过设定版本号的数据。另外,一般情况下,major compaction时间会持续比较长,整个过程会消耗大量系统资源,对上层业务有比较大的影响。因此线上业务都会将关闭自动触发major compaction功能,改为手动在业务低峰期触发。
HBase适合存储PB级别的海量数据(百亿千亿量级条记录),如果根据记录主键Rowkey来查询,能在几十到百毫秒内返回数据。
那么HBase是如何做到的呢?
接下来,简单阐述一下数据的查询思路和过程。
查询过程
第1步:
项目有100亿业务数据,存储在一个HBase集群上(由多个服务器数据节点构成),每个数据节点上有若干个Region(区域),每个Region实际上就是HBase中一批数据的集合(一段连续范围rowkey的数据)。
我们现在开始根据主键RowKey来查询对应的记录,通过meta表可以帮我们迅速定位到该记录所在的数据节点,以及数据节点中的Region,目前我们有100亿条记录,占空间10TB。所有记录被切分成5000个Region,那么现在,每个Region就是2G。
由于记录在1个Region中,所以现在我们只要查询这2G的记录文件,就能找到对应记录。
第2步:
由于HBase存储数据是按照列族存储的。比如一条记录有400个字段,前100个字段是人员信息相关,这是一个列簇(列的集合);中间100个字段是公司信息相关,是一个列簇。另外100个字段是人员交易信息相关,也是一个列簇;最后还有100个字段是其他信息,也是一个列簇
这四个列簇是分开存储的,这时,假设2G的Region文件中,分为4个列族,那么每个列族就是500M。
到这里,我们只需要遍历这500M的列簇就可以找到对应的记录。
第3步:
如果要查询的记录在其中1个列族上,1个列族在HDFS中会包含1个或者多个HFile。
如果一个HFile一般的大小为100M,那么该列族包含5个HFile在磁盘上或内存中。
由于HBase的内存进入磁盘中的数据是排好序(字典顺序)的,要查询的记录有可能在最前面,也有可能在最后面,按平均来算,我们只需遍历2.5个HFile共250M,即可找到对应的记录。
第4步:
每个HFile中,是以键值对(key/value)方式存储,只要遍历文件中的key位置并判断符合条件即可
一般key是有限的长度,假设key/value比是1:24,最终只需要10M的数据量,就可获取的对应的记录。
如果数据在机械磁盘上,按其访问速度100M/S,只需0.1秒即可查到。
如果是SSD的话,0.01秒即可查到。
当然,扫描HFile时还可以通过布隆过滤器快速定位到对应的HFile,以及HBase是有内存缓存机制的,如果数据在内存中,效率会更高。
HBase与Hive的对比
hive:
数据仓库:Hive的本质其实就相当于将HDFS中已经存储的文件在Mysql中做了一个双射关系,以方便使用HQL去管理查询。
用于数据分析、清洗:Hive适用于离线的数据分析和清洗,延迟较高。
基于HDFS、MapReduce:Hive存储的数据依旧在DataNode上,编写的HQL语句终将是转换为MapReduce代码执行。
HBase
数据库:是一种面向列族存储的非关系型数据库。
用于存储结构化和非结构化的数据:适用于单表非关系型数据的存储,不适合做关联查询,类似JOIN等操作。
基于HDFS:数据持久化存储的体现形式是HFile,存放于DataNode中,被ResionServer以region的形式进行管理。
延迟较低,接入在线业务使用:面对大量的企业数据,HBase可以直线单表大量数据的存储,同时提供了高效的数据访问速度。
1.配置hive-site.xml
文件
- <property>
- <name>hive.zookeeper.quorum</name>
- <value>master,node1,node2</value>
- </property>
-
- <property>
- <name>hive.zookeeper.client.port</name>
- <value>2181</value>
- </property>
1.
HBase中已经存储了某一张表,在Hive中创建一个外部表来关联HBase中的这张表
建立外部表的字段名要和hbase中的列名一致
前提是hbase中已经有表了
示例,以后模仿这个
- create external table students_hbase
- (
- id string,
- name string,
- age string,
- gender string,
- clazz string
- )
- stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
- with serdeproperties ("hbase.columns.mapping" = "
- :key,
- info:name,
- info:age,
- info:gender,
- info:clazz
- ")
- tblproperties("hbase.table.name" = "default:students");
1.准备好文件,上传到Linux中
2.解压 tar -zxvf phoenix-hbase-2.2-5.1.3-bin.tar.gz -C ../
1.配置环境变量
/etc/profile
2.将phoenix-server-hbase-2.2-5.1.3.jar复制到所有节点的hbase lib目录下
1.先要启动hbase
2.sqlline.py master,node1,node2
1.建表语句
必须要设置主键
- CREATE TABLE IF NOT EXISTS student_p1 (
- id VARCHAR NOT NULL PRIMARY KEY,
- name VARCHAR,
- age BIGINT,
- gender VARCHAR ,
- clazz VARCHAR
- );
2.插入语句
语法 upsert into 表名 values(内容)
- upsert into STUDENT values('1500100004','葛德曜',24,'男','理科三班');
- upsert into STUDENT values('1500100005','宣谷芹',24,'男','理科六班');
- upsert into STUDENT values('1500100006','羿彦昌',24,'女','理科三班');
3.查询语句
跟sql语句差不多
4.删除数据
语法 delete from 表名 where 主键名=主键值;
delete from student_p1 where id='1500100004';
5. 删除表
drop table student_p1;
6.退出
!quit
1.在phoenix中创建表之后,我们可以使用 !table或者show table 查看所有的表,并以大写的形式展现给我们,我们使用sql语句查询的时候不分表的大小写。
2.在phoenix中创建表之后,可以在hbase中可以通过scan加大写的表名查看phoenix中的表,但是在hbase中创建的表,在phoenix就无法查看。如果想查看,则需要在phoenix中进行表的映射。映射方式有两种:视图映射和表映射
3、如何在phoneix中使用hbase原本的数据表呢?
视图映射:视图并不是真正意义上的表,而是在phoneix创建一个映射关系,以表的形式将hbase中原本数据映射过来,可以在基础之上编写sql语句进行分析,需要注意的是,我们在视图上sql分析的时候,表名和列名需要加双引号。删除视图不会影响原本hbase中的数据,视图无法做修改,只能查询,视图在phoneix中被看作成一个只读表。
表映射:建表的语法来说与视图映射相差一个单词,其他的没啥区别。使用上,表映射可以直接在phoneix中对表数据进行增删改查。将phoneix中表映射删了,原来hbase中的表也对应删除。
4、映射查询的时候,主键可以不用加双引号,非主键的列必须加双引号
1.Phoenix创建的视图是只读的,所以只能用来做查询,无法通过视图对源数据进行修改等操作
2.在phoenix创建视图, primary key 对应到hbase中的rowkey
3.创建的映射要与想要映射的hbase中的表名一致,且视图要被双引号包起来。
4.视图映射相关建表案例
创建的是映射,所以是view
- CREATE view "students" (
- id VARCHAR NOT NULL PRIMARY KEY,
- "info"."name" VARCHAR,
- "info"."age" VARCHAR,
- "info"."gender" VARCHAR,
- "info"."clazz" VARCHAR
- ) column_encoded_bytes=0;
5.查询相关事项
a.映射创建好之后,想要查看映射内容,必须使用sql语句的时候将视图名用双引号包起来,才能查看
b.映射查询的时候,主键可以不用加双引号,非主键的列必须加双引号
c.删除视图不会影响原本hbase中的数据,视图无法做修改,只能查询,视图在phoneix中被看作成一个只读表。
drop view "视图名";
1.建表映射
1) 当HBase中已经存在表时,可以以类似创建视图的方式创建关联表,只需要将create view改为create table即可。
2)当HBase中不存在表时,可以直接使用create table指令创建需要的表,并且在创建指令中可以根据需要对HBase表结构进行显示的说明。
3)创建的映射要与想要映射的hbase中的表名一致,且表要被双引号包起来。
- CREATE table "students" (
- id VARCHAR NOT NULL PRIMARY KEY,
- "info"."name" VARCHAR,
- "info"."age" VARCHAR,
- "info"."gender" VARCHAR,
- "info"."clazz" VARCHAR
- ) column_encoded_bytes=0;
2.使用create table创建的关联表,如果对表进行了修改,源数据也会改变,同时如果关联表被删除,源表也会被删除。但是视图就不会,如果删除视图,源数据不会发生改变。
1.过滤出文科一班的学生总分前10的学生信息
过滤出文科一班的学生
select ID as id,"name" as name,"clazz" as clazz from "students" where "clazz"='文科一班';
将成绩表做切分转换
select regexp_split(ID,'-')[1] as student_id,regexp_split(ID,'-')[2] as subject_id,"score" as score from "scores";
在第二步的基础之上求每个学生的总分
select t1.student_id as student_id,sum(to_number(t1.score)) as sum_score from (select regexp_split(ID,'-')[1] as student_id,regexp_split(ID,'-')[2] as subject_id,"score" as score from "scores") t1 group by t1.student_id;
与步骤1的文科学生进行关联
select b1.id as student_id,b1.name as name,b1.clazz as clazz,to_char(b2.sum_score) as sum_score from (select ID as id,"name" as name,"clazz" as clazz from "students" where "clazz"='文科一班') b1 join (select t1.student_id as student_id,sum(to_number(t1.score)) as sum_score from (select regexp_split(ID,'-')[1] as student_id,regexp_split(ID,'-')[2] as subject_id,"score" as score from "scores") t1 group by t1.student_id) b2 on (b1.id=b2.student_id) order by b2.sum_score desc limit 10;
2.注意事项
通过这个例子遇到的注意点:
1、切分字符串的函数不是split,而是regexp_split
2、Phoenix中,数组的索引是从1开始的
3、给字段起别名之后的嵌套查询,就不需要再加双引号了,主键本身就可以不用加
4、sum函数中的数据类型必须是数值类型,如果是10的整数倍,会以科学计数法进行标识 580->5.8E+2(5.8*10^2)
5、to_number()转数值 to_char()转字符串
1.优点:
如果我们一次性入库hbase巨量数据,处理速度慢不说,还特别占用Region资源, 一个比较高效便捷的方法就是使用 “Bulk Loading”方法,即HBase提供的HFileOutputFormat类。
它是利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接生成这种hdfs内存储的数据格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。配合mapreduce完成,高效便捷,而且不占用region资源,增添负载。
2.限制:
仅适合初次数据导入,即表内数据为空,或者每次入库表内都无数据的情况。
HBase集群与Hadoop集群为同一集群,即HBase所基于的HDFS为生成HFile的MR的集群
代码编写:
提前在Hbase中创建好表
生成Hfile基本流程:
设置Mapper的输出KV类型:
K: ImmutableBytesWritable(代表行键)
V: KeyValue (代表cell)
2. 开发Mapper
读取你的原始数据,按你的需求做处理
输出rowkey作为K,输出一些KeyValue(Put)作为V
3. 配置job参数
a. Zookeeper的连接地址
b. 配置输出的OutputFormat为HFileOutputFormat2,并为其设置参数
4. 提交job
导入HFile到RegionServer的流程
构建一个表描述对象
构建一个region定位工具
然后用LoadIncrementalHFiles来doBulkload操作
- package com.shujia.jinjie;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.KeyValue;
- import org.apache.hadoop.hbase.TableName;
- import org.apache.hadoop.hbase.client.*;
- import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
- import org.apache.hadoop.hbase.mapreduce.CellSortReducer;
- import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
- import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
- import org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner;
- import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
- import org.apache.hadoop.hbase.util.Bytes;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
- import java.io.FileInputStream;
- import java.io.IOException;
-
- class MyBulkLoadMapper extends Mapper<LongWritable,Text,ImmutableBytesWritable,KeyValue>{
- /*
- ==> tl_hefei_shushan_503.txt <==
- D55433A437AEC8D8D3DB2BCA56E9E64392A9D93C 117210031795040 83401 8340104 301 20180503190539 20180503233517 20180503
- 8827F3196977C6F752680505FEC0C7D3A18D4DFC \N \N \N \N \N \N \N
- */
- protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>.Context context) throws IOException, InterruptedException {
- String line = value.toString();
- //处理脏数据
- if (!(line.contains("shushan")) && !(line.contains("\\N"))){
- String[] strings = line.split("\t");
- String phoneNum = strings[0];
- String wg = strings[1];
- String city = strings[2];
- String qx = strings[3];
- String stayTime = strings[4];
- String startTime = strings[5];
- String endTime = strings[6];
- String date = strings[7];
-
- //因为手机号代表一个人,一个人可能会出现在多个区域,不能直接将手机号作为rk,为了数据的完整性
- //可以将手机号和进入网格的时间拼接
- byte[] rk = Bytes.toBytes(phoneNum + "-" + startTime);
- byte[] family = Bytes.toBytes("info");
-
- //创建输出key对象
- // ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable(rk);
- // ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable(rk);
- ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable(rk);
- //将其余的6列封装成单元格
- KeyValue keyValue1 = new KeyValue(rk, family, Bytes.toBytes("wg"), Bytes.toBytes(wg));
- context.write(immutableBytesWritable,keyValue1);
-
- KeyValue keyValue2 = new KeyValue(rk, family, Bytes.toBytes("city"), Bytes.toBytes(city));
- context.write(immutableBytesWritable,keyValue2);
-
- KeyValue keyValue3 = new KeyValue(rk, family, Bytes.toBytes("qx"), Bytes.toBytes(qx));
- context.write(immutableBytesWritable,keyValue3);
-
- KeyValue keyValue4 = new KeyValue(rk, family, Bytes.toBytes("stayTime"), Bytes.toBytes(stayTime));
- context.write(immutableBytesWritable,keyValue4);
-
- KeyValue keyValue5 = new KeyValue(rk, family, Bytes.toBytes("endTime"), Bytes.toBytes(endTime));
- context.write(immutableBytesWritable,keyValue5);
-
- KeyValue keyValue6 = new KeyValue(rk, family, Bytes.toBytes("date"), Bytes.toBytes(date));
- context.write(immutableBytesWritable,keyValue6);
-
- }
-
- }
- }
-
- public class BulkLoadingDemo {
- public static void main(String[] args) throws Exception{
- //获取集群配置文件,可以是hdfs集群也可以是hbase集群
- Configuration conf = HBaseConfiguration.create();
- //设置zookeeper集群信息
- conf.set("hbase.zookeeper.quorum","master:2181,node1:2181,node2:2181");
-
- //创建Job作业
- Job job = Job.getInstance(conf);
-
- //给job作业起一个名字
- job.setJobName("hbase使用bulkloading方式批量加载数据作业");
-
- //设置主类
- job.setJarByClass(BulkLoadingDemo.class);
- //设置Map类
- job.setMapperClass(MyBulkLoadMapper.class);
-
- //无需设置自定义reduce类,使用hbase中的reduce类
- job.setOutputFormatClass(HFileOutputFormat2.class);
-
- //设置map的输出key和value类型
- //key -- 行键
- //value -- 单元格(列)
- job.setMapOutputKeyClass(ImmutableBytesWritable.class);
- job.setMapOutputValueClass(KeyValue.class);
-
- //设置region中字典排序的过程的类
- job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
- job.setReducerClass(KeyValueSortReducer.class);
-
- //设置文件的输入输出路径
- FileInputFormat.setInputPaths(job,new Path("/bigdata29/data/dianxin_data"));
- //设置hfile文件
- FileOutputFormat.setOutputPath(job,new Path("/bigdata29/hbase/out2"));
-
- //获取数据库连接对象
- Connection conn = ConnectionFactory.createConnection(conf);
- //获取数据库操作对象
- Admin admin = conn.getAdmin();
- TableName tn = TableName.valueOf("dianxin_data_bulk");
- Table table = conn.getTable(tn);
- RegionLocator regionLocator = conn.getRegionLocator(tn);
-
- //使用HFileOutputFormat2类创建Hfile文件
- HFileOutputFormat2.configureIncrementalLoad(job,table,regionLocator);
-
- //提交作业并执行
- boolean b = job.waitForCompletion(true);
- if(b){
- System.out.println("====================== Hfile文件生成成功!! 在/bigdata29/hbase/out2目录下 ================================");
- }else {
- System.out.println("============= Hfile文件生成失败!! ==================");
- }
-
-
-
- }
-
-
-
-
-
-
- }
这个命令在Linux运行
hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles /bigdata29/hbase/out2 dianxin_data_bulk
HBase是三维有序存储的,通过rowkey(行键),column key(column family和qualifier)和TimeStamp(时间戳)这个三个维度可以对HBase中的数据进行快速定位。
HBase中rowkey可以唯一标识一行记录,在HBase查询的时候,有两种方式:
通过get方式,指定rowkey获取唯一一条记录
通过scan方式,设置startRow和stopRow参数进行范围匹配
全表扫描,即直接扫描整张表中所有行记录
1.长度不宜过长
rowkey是一个二进制码流,可以是任意字符串,最大长度 64kb ,实际应用中一般为10-100bytes,以 byte[] 形式保存,一般设计成定长。
建议越短越好,不要超过16个字节,原因如下:
数据的持久化文件HFile中是按照KeyValue存储的,如果rowkey过长,比如超过100字节,1000w行数据,光rowkey就要占用100*1000w=10亿个字节,将近1G数据,这样会极大影响HFile的存储效率;
MemStore将缓存部分数据到内存,如果rowkey字段过长,内存的有效利用率就会降低,系统不能缓存更多的数据,这样会降低检索效率。
目前操作系统都是64位系统,内存8字节对齐,控制在16个字节,8字节的整数倍利用了操作系统的最佳特性。
2.散列性
如果rowkey按照时间戳的方式递增,不要将时间放在二进制码的前面,建议将rowkey的高位作为散列字段,由程序随机生成,低位放时间字段,这样将提高数据均衡分布在每个RegionServer,以实现负载均衡的几率。如果没有散列字段,首字段直接是时间信息,所有的数据都会集中在一个RegionServer上,这样在数据检索的时候负载会集中在个别的RegionServer上,造成热点问题,会降低查询效率。
3.唯一性
必须在设计上保证其唯一性,rowkey是按照字典顺序排序存储的,因此,设计rowkey的时候,要充分利用这个排序的特点,将经常读取的数据存储到一块,将最近可能会被访问的数据放到一块。
1.什么是热点
HBase中的行是按照rowkey的字典顺序排序的,这种设计优化了scan操作,可以将相关的行以及会被一起读取的行存取在临近位置,便于scan。然而糟糕的rowkey设计是热点的源头。 热点发生在大量的client直接访问集群的一个或极少数个节点(访问可能是读,写或者其他操作)。大量访问会使热点region所在的单个机器超出自身承受能力,引起性能下降甚至region不可用,这也会影响同一个RegionServer上的其他region,由于主机无法服务其他region的请求。 设计良好的数据访问模式以使集群被充分,均衡的利用。
为了避免写热点,设计rowkey使得不同行在同一个region,但是在更多数据情况下,数据应该被写入集群的多个region,而不是一个。
下面是一些常见的避免热点的方法以及它们的优缺点:
加盐
这里所说的加盐不是密码学中的加盐,而是在rowkey的前面增加随机数,具体就是给rowkey分配一个随机前缀以使得它和之前的rowkey的开头不同。分配的前缀种类数量应该和你想使用数据分散到不同的region的数量一致。加盐之后的rowkey就会根据随机生成的前缀分散到各个region上,以避免热点。
哈希
哈希会使同一行永远用一个前缀加盐。哈希也可以使负载分散到整个集群,但是读却是可以预测的。使用确定的哈希可以让客户端重构完整的rowkey,可以使用get操作准确获取某一个行数据
反转
第三种防止热点的方法时反转固定长度或者数字格式的rowkey。这样可以使得rowkey中经常改变的部分(最没有意义的部分)放在前面。这样可以有效的随机rowkey,但是牺牲了rowkey的有序性。
反转rowkey的例子以手机号为rowkey,可以将手机号反转后的字符串作为rowkey,这样的就避免了以手机号那样比较固定开头导致热点问题
时间戳反转
一个常见的数据处理问题是快速获取数据的最近版本,使用反转的时间戳作为rowkey的一部分对这个问题十分有用,可以用 Long.Max_Value - timestamp 追加到key的末尾,例如 [key]reverse_timestamp , [key] 的最新值可以通过scan [key]获得[key]的第一条记录,因为HBase中rowkey是有序的,第一条记录是最后录入的数据。
示例
- # 原数据:以时间戳_user_id作为rowkey
- # 时间戳高位变化不大,太连续,最终可能会导致热点问题
- 1638584124_user_id
- 1638584135_user_id
- 1638584146_user_id
- 1638584157_user_id
- 1638584168_user_id
- 1638584179_user_id
- ...
-
- # 解决方案:加盐、反转、哈希
-
- # 加盐
- # 加上随即前缀,随机的打散
- # 该过程无法预测 前缀时随机的
- 00_1638584124_user_id
- 05_1638584135_user_id
- 03_1638584146_user_id
- 04_1638584157_user_id
- 02_1638584168_user_id
- 06_1638584179_user_id
-
- # 反转
- # 适用于高位变化不大,低位变化大的rowkey
- 4214858361_user_id
- 5314858361_user_id
- 6414858361_user_id
- 7514858361_user_id
- 8614858361_user_id
- 9714858361_user_id
-
- # 散列 md5、sha1、sha256......
- 25531D7065AE158AAB6FA53379523979_user_id
- 60F9A0072C0BD06C92D768DACF2DFDC3_user_id
- D2EFD883A6C0198DA3AF4FD8F82DEB57_user_id
- A9A4C265D61E0801D163927DE1299C79_user_id
- 3F41251355E092D7D8A50130441B58A5_user_id
- 5E6043C773DA4CF991B389D200B77379_user_id
-
- # 时间戳"反转"
- # rowkey:时间戳_user_id
- # rowkey是字典升序的,那么越新的记录会被排在最后面,不容易被获取到
- # 需求:让最新的记录排在最前面
-
- # 大数:9999999999
- # 大数-小数
-
- 1638584124_user_id => 8361415875_user_id
- 1638584135_user_id => 8361415864_user_id
- 1638584146_user_id => 8361415853_user_id
- 1638584157_user_id => 8361415842_user_id
- 1638584168_user_id => 8361415831_user_id
- 1638584179_user_id => 8361415820_user_id
-
- 1638586193_user_id => 8361413806_user_id
电信案例
要求
- 手机号,网格编号,城市编号,区县编号,停留时间,进入时间,离开时间,时间分区
- D55433A437AEC8D8D3DB2BCA56E9E64392A9D93C,117210031795040,83401,8340104,301,20180503190539,20180503233517,20180503
-
-
- 将用户位置数据保存到hbase
- 查询需求
- 1、通过手机号查询用户最近10条位置记录
-
- 2、获取用户某一天在一个城市中的所有位置
-
- 怎么设计hbase表
- 1、rowkey
- 2、时间戳
代码
- package com.shujia.jinjie;
-
-
- import com.shujia.utils.HBaseUtils;
- import org.apache.hadoop.hbase.CompareOperator;
- import org.apache.hadoop.hbase.TableName;
- import org.apache.hadoop.hbase.client.*;
- import org.apache.hadoop.hbase.filter.BinaryComparator;
- import org.apache.hadoop.hbase.filter.FilterList;
- import org.apache.hadoop.hbase.filter.PrefixFilter;
- import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
- import org.apache.hadoop.hbase.util.Bytes;
-
- import java.io.BufferedReader;
- import java.io.FileReader;
- import java.util.Scanner;
-
- /**
- * 查询需求
- * 1、通过手机号查询用户最近10条位置记录
- *
- * 2、获取用户某一天在一个城市中的所有位置
- */
- public class DianXinRowKeyDemo {
-
- //创建数据库连接对象
- //创建数据库操作对象
- private static Connection conn = HBaseUtils.CONNECTION;
- private static Admin admin = HBaseUtils.ADMIN;
- public static void main(String[] args) throws Exception{
-
- //建表
- // HBaseUtils.createOneTable("dianxin_tb1","info");
-
- //读取文件数据,添加到hbase中的dianxin_tb1表中
- // putsData();
-
- //通过手机号查询用户最近10条位置记录
- Scanner sc = new Scanner(System.in);
- System.out.println("请输入要查询的手机号:");
- String phoneNum = sc.next();
- // findUser(phoneNum);
-
- //2、获取用户某一天在一个城市中的所有位置
- System.out.println("请输入要查询的城市编号:");
- String city = sc.next();
- findUserPosition(phoneNum,city);
-
-
- }
-
- public static void findUserPosition(String number,String city) throws Exception{
- //将表封装成一个TableName对象
- TableName tn = TableName.valueOf("dianxin_tb1");
-
- //获取表对象
- Table table = conn.getTable(tn);
-
- //scan
- Scan scan = new Scan();
- //创建一个行键前缀过滤器对象
- //public PrefixFilter(byte[] prefix)
- PrefixFilter prefixFilter = new PrefixFilter(Bytes.toBytes(number));
-
- //过滤城市
- SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("city"),
- CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes(city)));
-
- //创建过滤器集合
- FilterList filterList = new FilterList();
- filterList.addFilter(prefixFilter);
- filterList.addFilter(singleColumnValueFilter);
-
- scan.setFilter(filterList);
- ResultScanner resultScanner = table.getScanner(scan);
- for (Result result : resultScanner) {
- String rk = Bytes.toString(result.getRow());
- //47BE1E866CFC071DB19D5E1C056BE28AE24C16E7-9496768070
- String[] strings = rk.split("-");
- String phoneNum = strings[0];
- long startTime = 20190000000000L - Long.parseLong(strings[1]);
- String wg = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("wg")));
- String city2 = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("city")));
- String qx = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("qx")));
- String stayTime = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("stayTime")));
- String endTime = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("endTime")));
- String date = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("date")));
-
- System.out.println("手机号:"+phoneNum+",进入网格时间:"+startTime+",离开网格时间:"+endTime
- +",网格编号:"+wg+",城市:"
- +city2+",区县:"+qx+",停留时间:"
- +stayTime+",日期:"+date);
- }
-
- }
-
- //查询函数
- public static void findUser(String number) throws Exception{
- //将表封装成一个TableName对象
- TableName tn = TableName.valueOf("dianxin_tb1");
-
- //获取表对象
- Table table = conn.getTable(tn);
-
- //scan
- Scan scan = new Scan();
- //创建一个行键前缀过滤器对象
- //public PrefixFilter(byte[] prefix)
- PrefixFilter prefixFilter = new PrefixFilter(Bytes.toBytes(number));
- //设置过滤器
- //public Scan setFilter(Filter filter)
- scan.setFilter(prefixFilter);
- scan.setLimit(10);
- //创建结果对象
- ResultScanner resultScanner = table.getScanner(scan);
- for (Result result : resultScanner) {
- String rk = Bytes.toString(result.getRow());
- //47BE1E866CFC071DB19D5E1C056BE28AE24C16E7-9496768070
- String[] strings = rk.split("-");
- String phoneNum = strings[0];
- long startTime = 20190000000000L - Long.parseLong(strings[1]);
- String wg = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("wg")));
- String city = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("city")));
- String qx = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("qx")));
- String stayTime = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("stayTime")));
- String endTime = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("endTime")));
- String date = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("date")));
-
- System.out.println("手机号:"+phoneNum+",进入网格时间:"+startTime+",离开网格时间:"+endTime
- +",网格编号:"+wg+",城市:"
- +city+",区县:"+qx+",停留时间:"
- +stayTime+",日期:"+date);
- }
-
- }
-
-
-
- //添加函数
- public static void putsData() throws Exception{
-
-
- //将表封装成一个TableName对象
- TableName tn = TableName.valueOf("dianxin_tb1");
-
- //获取表对象
- Table table = conn.getTable(tn);
-
- //获取IO对象
- BufferedReader br = new BufferedReader(new FileReader("hbase/data/dianxin_data"));
- String line=null;
- while ((line= br.readLine())!=null){
- if (!(line.contains("shushan")) && !(line.contains("\\N"))){
- String[] strings = line.split("\t");
- String phoneNum = strings[0];
- String wg = strings[1];
- String city = strings[2];
- String qx = strings[3];
- String stayTime = strings[4];
- String startTime = strings[5];
- String endTime = strings[6];
- String date = strings[7];
-
- //我们为了需求的查询更加方便,以手机号与进入网格的时间进行拼接
- //因为要查询最近的,可以使用时间戳反转对rowkey进行设计
- //20180503173254
- //20190000000000
- long new_time = 20190000000000L - Long.parseLong(startTime);
- String rk = phoneNum + "-" + new_time;
- HBaseUtils.putOneDataToTable("dianxin_tb1",rk,"info","wg",wg);
- HBaseUtils.putOneDataToTable("dianxin_tb1",rk,"info","city",city);
- HBaseUtils.putOneDataToTable("dianxin_tb1",rk,"info","qx",qx);
- HBaseUtils.putOneDataToTable("dianxin_tb1",rk,"info","stayTime",stayTime);
- HBaseUtils.putOneDataToTable("dianxin_tb1",rk,"info","endTime",endTime);
- HBaseUtils.putOneDataToTable("dianxin_tb1",rk,"info","date",date);
-
-
- }
-
- }
-
-
- }
- }
二级索引的本质就是建立各列值与行键之间的映射关系
Hbase的局限性:
HBase本身只提供基于行键和全表扫描的查询,而行键索引单一,对于多维度的查询困难。
HBase的一级索引就是rowkey,我们只能通过rowkey进行检索。如果我们相对hbase里面列族的列列进行一些组合查询,就需要采用HBase的二级索引方案来进行多条件的查询。
MapReduce方案
ITHBASE(Indexed-Transanctional HBase)方案
IHBASE(Index HBase)方案
Hbase Coprocessor(协处理器)方案
Solr+hbase方案 redis+hbase 方案
CCIndex(complementalclustering index)方案
1、创建单列索引
2、同时创建多个单列索引
3、创建联合索引(最多同时支持3个列)
4、只根据rowkey创建索引
使用整合MapReduce的方式创建hbase索引。主要的流程如下:
1.1扫描输入表,使用hbase继承类TableMapper
1.2获取rowkey和指定字段名称和字段值
1.3创建Put实例, value=” “, rowkey=班级,column=学号
1.4使用TableReducer将数据写入索引表
首先二级索引表必须先要存在
该Map继承的是TableMapper
该Reducer继承的是TableReducer
代码如下
- package com.shujia.jinjie;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.client.Mutation;
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.hbase.client.Result;
- import org.apache.hadoop.hbase.client.Scan;
- import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
- import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
- import org.apache.hadoop.hbase.mapreduce.TableMapper;
- import org.apache.hadoop.hbase.mapreduce.TableReducer;
- import org.apache.hadoop.hbase.util.Bytes;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
-
- import java.io.IOException;
-
- //因为我们现在要读取的数据来自于hbase中的hfile文件,与hdfs上普通的block块文件有所区别,不能直接继承Mapper类
- //要继承hbase读取数据专属的Mapper类 TableMapper
- //public abstract class TableMapper<KEYOUT, VALUEOUT> extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT>
-
- class MyIndexMapper extends TableMapper<Text,NullWritable>{
- @Override
- protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, NullWritable>.Context context) throws IOException, InterruptedException {
- //ImmutableBytesWritable key --相当于是读取到一行的行键
- //Result value --相当于读取到一行多列的封装
- //获取行键,
-
- String id = Bytes.toString(key.get());
- //获取姓名列
- // public byte[] getValue(byte[] family, byte[] qualifier)
- String name = Bytes.toString(value.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")));
- //将姓名与学号连接起来给到reduce
- context.write(new Text(id+"-"+name),NullWritable.get());
-
-
- }
- }
-
-
- //public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, Mutation>
- class MyIndexReducer extends TableReducer<Text,NullWritable,NullWritable>{
- @Override
- protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, NullWritable, Mutation>.Context context) throws IOException, InterruptedException {
- String string = key.toString();
- String id = string.split("-")[0];
- String name = string.split("-")[1];
-
- //将要添加的数据封装成Put类的对象
- Put put = new Put(Bytes.toBytes(name));
- //public Put addImmutable(byte[] family, byte[] qualifier, byte[] value)
- put.addColumn(Bytes.toBytes("info"),Bytes.toBytes(id),Bytes.toBytes(""));
- context.write(NullWritable.get(),put);
-
- }
- }
-
- public class HBaseIndexDemo1 {
- public static void main(String[] args) throws Exception {
- //获取集群配置文件,可以是hdfs集群也可以是hbase集群
- Configuration conf = HBaseConfiguration.create();
- //设置zookeeper集群信息
- conf.set("hbase.zookeeper.quorum","master:2181,node1:2181,node2:2181");
- //创建job对象
- Job job = Job.getInstance(conf);
- //给job任务取名字
- job.setJobName("给学生表创建二级索引表");
-
- //设置主类
- job.setJarByClass(HBaseIndexDemo1.class);
-
- //因为索引表的构建是建立列值与行键的映射关系,要获取所有的数据
- //scan扫描全表数据
- Scan scan = new Scan();
- //告诉输入的列值来自于哪一个列簇
- scan.addFamily(Bytes.toBytes("info"));
-
-
-
-
- /**
- * @param table The table name to read from.
- * @param scan The scan instance with the columns, time range etc.
- * @param mapper The mapper class to use.
- * @param outputKeyClass The class of the output key.
- * @param outputValueClass The class of the output value.
- * @param job The current job to adjust. Make sure the passed job is
- * carrying all necessary HBase configuration.
- * @throws IOException When setting up the details fails.
- *public static void initTableMapperJob
- * (String table,Scan scan,Class<? extends TableMapper> mapper,Class<?> outputKeyClass,Class<?> outputValueClass,Job job)
- */
- //初始化Map任务
- TableMapReduceUtil.initTableMapperJob("students2",scan,MyIndexMapper.class, Text.class, NullWritable.class,job);
- //初始化Reduce任务
- TableMapReduceUtil.initTableReducerJob("students2_index",MyIndexReducer.class,job);
- //提交作业到集群中允许
- boolean b = job.waitForCompletion(true);
- if (b) {
- System.out.println("================== students2索引表构建成功!!!============================");
- } else {
- System.out.println("================== students2索引表构建失败!!!============================");
- }
-
- }
- }
1.指定redis端口号
Jedis jedis = new Jedis("192.168.73.100", 12346);
2.在redis中构建映射关系(性别:学号)因为男生的学号有很多个,且不重复,所以我们在redis中采用set的数据类型存储
- public static void buildIndexInRedis() throws Exception {
-
- //获取要构建索引的原表
- Table students2 = conn.getTable(TableName.valueOf("students2"));
-
- Scan scan = new Scan();
- //获取男生的学号,放入到redis中
- //创建列值过滤器
- ValueFilter filter1 = new ValueFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("男")));
- scan.setFilter(filter1);
- ResultScanner resultScanner = students2.getScanner(scan);
- for (Result result : resultScanner) {
- //获取每一行的行键即可
- String id = Bytes.toString(result.getRow());
- //将学号以值的方式添加到redis键对应的值中
- //因为男生的学号有很多个,且不重复,所以我们在redis中采用set的数据类型存储
- jedis.sadd("性别:男", id);
- }
-
- //获取男生的学号,放入到redis中
- //创建列值过滤器
- ValueFilter filter2 = new ValueFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("女")));
- scan.setFilter(filter2);
- ResultScanner resultScanner2 = students2.getScanner(scan);
- for (Result result : resultScanner2) {
- //获取每一行的行键即可
- String id = Bytes.toString(result.getRow());
- //将学号以值的方式添加到redis键对应的值中
- //因为男生的学号有很多个,且不重复,所以我们在redis中采用set的数据类型存储
- jedis.sadd("性别:女", id);
- }
3.先通过查询redis中性别对应的学号,拿着学号去hbase原表中查询获取结果
- package com.shujia.jinjie;
-
- import com.shujia.utils.HBaseUtils;
- import com.shujia.utils.HBaseUtils;
- import org.apache.hadoop.hbase.CompareOperator;
- import org.apache.hadoop.hbase.TableName;
- import org.apache.hadoop.hbase.client.*;
- import org.apache.hadoop.hbase.filter.BinaryComparator;
- import org.apache.hadoop.hbase.filter.ValueFilter;
- import org.apache.hadoop.hbase.util.Bytes;
- import redis.clients.jedis.Jedis;
-
- import java.util.Scanner;
- import java.util.Set;
-
- /*
- 使用redis第三方的存储工具存储hbase索引(本质依旧是列值与行键产生映射关系)
- */
- public class HBaseWithRedisIndex {
- //1、获取hbase数据库连接对象和操作对象
- static Connection conn = HBaseUtils.CONNECTION;
- static Admin admin = HBaseUtils.ADMIN;
-
- //获取redis连接对象
- static Jedis jedis = new Jedis("192.168.73.100", 12346);
-
- public static void main(String[] args) throws Exception {
- //步骤1:在redis中构建映射关系(性别:学号)
- // buildIndexInRedis();
-
- //使用:先通过查询redis中性别对应的学号,拿着学号去hbase原表中查询获取结果
- Scanner sc = new Scanner(System.in);
- System.out.println("请输入您要查询的性别:");
- String gender = sc.next();
- selectGenderFromHbase(gender);
- }
-
- public static void selectGenderFromHbase(String gender) throws Exception {
- if ("男".equals(gender)) {
- selectIdFromRedis(gender);
- } else if ("女".equals(gender)) {
- selectIdFromRedis(gender);
- } else {
- System.out.println("没有该性别");
- }
- }
-
- //单独编写一个方法查询redis
- public static void selectIdFromRedis(String gender) throws Exception {
- Table students2 = conn.getTable(TableName.valueOf("students2"));
- //redis中set查询值的方法
- Set<String> ids = jedis.smembers("性别:"+gender);
- for (String id : ids) {
- // Result result = students2.get(new Get(Bytes.toBytes(id)).addColumn(Bytes.toBytes("info"), Bytes.toBytes("name")));
- Get get = new Get(Bytes.toBytes(id));
- //获取那一列的所有信息封装成一个结果对象
- get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"));
- Result result = students2.get(get);
-
- String name = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")));
- System.out.println("学号:" + id + ",姓名:" + name);
- }
- }
-
- public static void buildIndexInRedis() throws Exception {
-
- //获取要构建索引的原表
- Table students2 = conn.getTable(TableName.valueOf("students2"));
-
- Scan scan = new Scan();
- //获取男生的学号,放入到redis中
- //创建列值过滤器
- ValueFilter filter1 = new ValueFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("男")));
- scan.setFilter(filter1);
- ResultScanner resultScanner = students2.getScanner(scan);
- for (Result result : resultScanner) {
- //获取每一行的行键即可
- String id = Bytes.toString(result.getRow());
- //将学号以值的方式添加到redis键对应的值中
- //因为男生的学号有很多个,且不重复,所以我们在redis中采用set的数据类型存储
- jedis.sadd("性别:男", id);
- }
-
- //获取男生的学号,放入到redis中
- //创建列值过滤器
- ValueFilter filter2 = new ValueFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("女")));
- scan.setFilter(filter2);
- ResultScanner resultScanner2 = students2.getScanner(scan);
- for (Result result : resultScanner2) {
- //获取每一行的行键即可
- String id = Bytes.toString(result.getRow());
- //将学号以值的方式添加到redis键对应的值中
- //因为男生的学号有很多个,且不重复,所以我们在redis中采用set的数据类型存储
- jedis.sadd("性别:女", id);
- }
-
-
- }
- }
对于Hbase,如果想精确定位到某行记录,唯一的办法就是通过rowkey查询。如果不通过rowkey查找数据,就必须逐行比较每一行的值,对于较大的表,全表扫描的代价是不可接受的。
1.开启索引,配置相关文件
- # 关闭hbase集群
- stop-hbase.sh
-
- # 在/usr/local/soft/hbase-1.4.6/conf/hbase-site.xml中增加如下配置
-
- <property>
- <name>hbase.regionserver.wal.codec</name>
- <value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value>
- </property>
- <property>
- <name>hbase.rpc.timeout</name>
- <value>60000000</value>
- </property>
- <property>
- <name>hbase.client.scanner.timeout.period</name>
- <value>60000000</value>
- </property>
- <property>
- <name>phoenix.query.timeoutMs</name>
- <value>60000000</value>
- </property>
-
-
- # 同步到所有节点
- scp hbase-site.xml node1:`pwd`
- scp hbase-site.xml node2:`pwd`
-
- # 修改phoenix目录下的bin目录中的hbase-site.xml
- <property>
- <name>hbase.rpc.timeout</name>
- <value>60000000</value>
- </property>
- <property>
- <name>hbase.client.scanner.timeout.period</name>
- <value>60000000</value>
- </property>
- <property>
- <name>phoenix.query.timeoutMs</name>
- <value>60000000</value>
- </property>
-
-
- # 启动hbase
- start-hbase.sh
- # 重新进入phoenix客户端
- sqlline.py master,node1,node2
全局索引适合读多写少的场景。如果使用全局索引,读数据基本不损耗性能,所有的性能损耗都来源于写数据。数据表的添加、删除和修改都会更新相关的索引表(数据删除了,索引表中的数据也会删除;数据增加了,索引表的数据也会增加)
注意: 对于全局索引在默认情况下,在查询语句中检索的列如果不在索引表中,Phoenix不会使用索引表将,除非使用hint。
1.导入数据
首先创建一个shell命令,里面有建表语句,取名为DIANXIN.sql
- CREATE TABLE IF NOT EXISTS DIANXIN (
- mdn VARCHAR ,
- start_date VARCHAR ,
- end_date VARCHAR ,
- county VARCHAR,
- x DOUBLE ,
- y DOUBLE,
- bsid VARCHAR,
- grid_id VARCHAR,
- biz_type VARCHAR,
- event_type VARCHAR ,
- data_source VARCHAR ,
- CONSTRAINT PK PRIMARY KEY (mdn,start_date)
- ) column_encoded_bytes=0;
2.准备数据
DIANXIN.csv
3.输入命令
# 导入数据,Linux命令
psql.py master,node1,node2 DIANXIN.sql DIANXIN.csv
4.导入数据好的截图
5.创建全局索引
CREATE INDEX DIANXIN_INDEX ON DIANXIN ( end_date );
6 查询数据 ( 索引未生效)
select * from DIANXIN where end_date = '20180503154014';
很明显查询需要2秒多,
7 强制使用索引 (索引生效) hint 语法糖
语法糖:/*+ INDEX(DIANXIN DIANXIN_INDEX) */
select /*+ INDEX(DIANXIN DIANXIN_INDEX) */ * from DIANXIN where end_date = '20180503154014';
很明显查询零点几秒,加了语法糖,查询速度加快
8 取索引列,(索引生效)
取我们设置的单级索引列,速度一样快
select end_date from DIANXIN where end_date = '20180503154014';
9.#创建多列索引
CREATE INDEX DIANXIN_INDEX1 ON DIANXIN ( end_date,COUNTY );
取我们多级索引的某一个索引列单独查询速度很快的
10 查询所有列 (索引未生效)
select * from DIANXIN where end_date = '20180503154014' and COUNTY = '8340104';
这个语法糖写错了,就当没有写,查询需要1秒多
11 查询所有列 (索引生效)
select /*+ INDEX(DIANXIN DIANXIN_INDEX1) */ * from DIANXIN where end_date = '20180503154014' and COUNTY = '8340104';
加了语法糖,只要零点几秒
12 单条件 (索引未生效)
select end_date from DIANXIN where COUNTY = '8340103';
# 单条件 (索引生效) end_date 在前
select COUNTY from DIANXIN where end_date = '20180503154014';
因为创建多级索引是 CREATE INDEX DIANXIN_INDEX1 ON DIANXIN ( end_date,COUNTY )
end_date 在前面
13 删除索引
drop index 索引名 on DIANXIN;
drop index DIANXIN_INDEX on DIANXIN;
1.相关概念
本地索引适合写多读少的场景,或者存储空间有限的场景。和全局索引一样,Phoenix也会在查询的时候自动选择是否使用本地索引。本地索引因为索引数据和原数据存储在同一台机器上,避免网络数据传输的开销,所以更适合写多的场景。由于无法提前确定数据在哪个Region上,所以在读数据的时候,需要检查每个Region上的数据从而带来一些性能损耗。
注意:对于本地索引,查询中无论是否指定hint或者是查询的列是否都在索引表中,都会使用索引表。
2.创建本地索引
CREATE LOCAL INDEX DIANXIN_LOCAL_IDEX ON DIANXIN(grid_id);
3. 索引生效
select grid_id from dianxin where grid_id='117285031820040';
索引生效
select * from dianxin where grid_id='117285031820040';
1.相关概念
覆盖索引是把原数据存储在索引数据表中,这样在查询时不需要再去HBase的原表获取数据就,直接返回查询结果。
注意:查询是 select 的列和 where 的列都需要在索引中出现。
2.创建覆盖索引
CREATE INDEX DIANXIN_INDEX_COVER ON DIANXIN ( x,y ) INCLUDE ( county );
3.
# 查询所有列 (索引未生效)
select * from DIANXIN where x=117.288 and y =31.822;
# 强制使用索引 (索引生效)
select /*+ INDEX(DIANXIN DIANXIN_INDEX_COVER) */ * from DIANXIN where x=117.288 and y =31.822;
# 查询索引中的列 (索引生效) mdn是DIANXIN表的RowKey中的一部分
select x,y,county from DIANXIN where x=117.288 and y =31.822;
select mdn,x,y,county from DIANXIN where x=117.288 and y =31.822;
# 查询条件必须放在索引中 select 中的列可以放在INCLUDE (将数据保存在索引中)
select /*+ INDEX(DIANXIN DIANXIN_INDEX_COVER) */ x,y,count(*) from DIANXIN group by x,y;
1.导入依赖
- <!-- https://mvnrepository.com/artifact/org.apache.phoenix/phoenix-hbase-compat-2.2.5 -->
- <dependency>
- <groupId>org.apache.phoenix</groupId>
- <artifactId>phoenix-hbase-compat-2.2.5</artifactId>
- <version>5.1.3</version>
- </dependency>
-
- <!-- https://mvnrepository.com/artifact/org.apache.phoenix/phoenix-core -->
- <dependency>
- <groupId>org.apache.phoenix</groupId>
- <artifactId>phoenix-core</artifactId>
- <version>5.1.3</version>
- </dependency>
-
- <dependency>
- <groupId>com.lmax</groupId>
- <artifactId>disruptor</artifactId>
- <version>3.4.2</version>
- </dependency>
代码
- package com.shujia.jdbc;
-
- import java.sql.Connection;
- import java.sql.DriverManager;
- import java.sql.PreparedStatement;
- import java.sql.ResultSet;
-
- public class PhoenixJDBC {
- public static void main(String[] args) throws Exception {
- //注册驱动
- Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
-
- //创建与phoenix连接对象
- Connection conn = DriverManager.getConnection("jdbc:phoenix:master,node1,node2:2181");
-
- PreparedStatement prep = conn.prepareStatement("select /*+ INDEX(DIANXIN DIANXIN_INDEX) */ * from DIANXIN where end_date = ? and start_date = ?");
-
- prep.setString(1,"20180503154014");
- prep.setString(2,"20180503154614");
-
- ResultSet resultSet = prep.executeQuery();
- while (resultSet.next()){
- /*
- mdn VARCHAR ,
- start_date VARCHAR ,
- end_date VARCHAR ,
- county VARCHAR,
- x DOUBLE ,
- y DOUBLE,
- bsid VARCHAR,
- grid_id VARCHAR,
- biz_type VARCHAR,
- event_type VARCHAR ,
- data_source VARCHAR ,
- */
- String mdn = resultSet.getString("mdn");
- String start_date = resultSet.getString("start_date");
- String end_date = resultSet.getString("end_date");
- String county = resultSet.getString("county");
- double x = resultSet.getDouble("x");
- double y = resultSet.getDouble("y");
- String bsid = resultSet.getString("bsid");
- String grid_id = resultSet.getString("grid_id");
- String biz_type = resultSet.getString("biz_type");
- String event_type = resultSet.getString("event_type");
- String data_source = resultSet.getString("data_source");
-
- System.out.println(mdn+","+start_date+","+end_date+","+county+","+x+","+y+","+bsid
- +","+grid_id+","+biz_type+","+event_type+","+data_source);
- }
-
- //释放资源
- prep.close();
- conn.close();
-
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。