赞
踩
1、热点现象:
某个时段内,对HBase的读写请求集中到极少数的Region上,导致这些region所在的RegionServer处理请求量骤增,负载量明显偏大,而其他的RgionServer明显空闲。
这种一般是由于某个regionserver性能有问题或者是业务写入量太大或者是region有数据热点引起。
如上图,Region1 上的数据是 Region 2 的5倍,这样会导致 Region1 的访问频率比较高,进而影响这个 Region 所在机器的其他 Region。
010ea7f2d793d3866fa035c65fa61f8a的Memstore会如此巨大?答案就是前面我们说到的造成消费延迟的原因之一:数据热点。下面从region大小来分析数据热点的问题:
从上图中可以看到,其他的region基本都在3G左右,而这个region却有276G,热点台明显,基本写入都集中在了这个region了,难怪会造成消费延迟。根本原因还是表结构设计的不合理。
HBase中的行是按照rowkey的字典顺序排序的,这种设计优化了scan操作,可以将相关的行以及会被一起读取的行存取在临近位置,便于scan。然而糟糕的rowkey设计是热点的源头。
热点发生在大量的client直接访问集群的一个或极少数个节点(访问可能是读,写或者其他操作)。大量访问会使热点region所在的单个机器超出自身承受能力,引起性能下降甚至region不可用,这也会影响同一个RegionServer上的其他region,由于主机无法服务其他region的请求。
数据热点是如何出现的,这得从HBase的存储结构说起,HBase的表会被划分为1个或多个Region,被托管在RegionServer中。
由上图我们可以看出,Region有两个重要的属性:StartKey和EndKey,表示这个Region维护的rowkey的范围,当我们要读写数据时,如果rowkey落在某个start-end key范围内,那么就会定位到目标region并且读写到相关的数据。
默认情况下,当我们通过hbaseAdmin来创建一张表时,刚开始的时候只有一个Region,start-endkey无边界,也就是说无论来什么,本Region统统收,如下图。
一个Region,无边界
所有的rowkey都写入到这个region里,然后数据越来越多,region的size越来越大时,大到一定的阀值,hbase就会将region一分为二,成为2个region,这个过程称为分裂(region-split)。
如果我们就这样默认建表,表里不断的put数据,一般情况我们的rowkey还是顺序增大的,这样,存在的缺点比较明显:我们总是向最大的startkey所在的region写数据,因为我们的rowkey总是会比之前的大,并且hbase的是按升序方式排序的。所以写操作总是被定位到无上界的那个region中,之前分裂出来的region不会被写数据,所以这样产生的结果是不利的。
如果在写比较频繁的场景下,数据增长太快,split的次数也会增多,由于split是比较耗费资源的,所以我们并不希望这种事情经常发生。
在集群中为了得到更好的并行性,让每个节点提供的请求都是均衡的,我们也不希望,region不要经常split,因为split会使server有一段时间的停顿,如何能做到呢?
为了避免写热点,设计rowkey使得不同行在同一个region,但是在更多数据情况下,数据应该被写入集群的多个region,而不是一个。常见的方法有以下这些:
加盐:在rowkey的前面增加随机数,使得它和之前的rowkey的开头不同。分配的前缀种类数量应该和你想使用数据分散到不同的region的数量一致。加盐之后的rowkey就会根据随机生成的前缀分散到各个region上,以避免热点。
哈希:哈希可以使负载分散到整个集群,但是读却是可以预测的。使用确定的哈希可以让客户端重构完整的rowkey,可以使用get操作准确获取某一个行数据
反转:第三种防止热点的方法时反转固定长度或者数字格式的rowkey。这样可以使得rowkey中经常改变的部分(最没有意义的部分)放在前面。这样可以有效的随机rowkey,但是牺牲了rowkey的有序性。反转rowkey的例子以手机号为rowkey,可以将手机号反转后的字符串作为rowkey,这样的就避免了以手机号那样比较固定开头导致热点问题
时间戳反转:一个常见的数据处理问题是快速获取数据的最近版本,使用反转的时间戳作为rowkey的一部分对这个问题十分有用,可以用 Long.Max_Value - timestamp 追加到key的末尾,例如[key][reverse_timestamp],[key]的最新值可以通过scan [key]获得[key]的第一条记录,因为HBase中rowkey是有序的,第一条记录是最后录入的数据。
比如需要保存一个用户的操作记录,按照操作时间倒序排序,在设计rowkey的时候,可以这样设计[userId反转] [Long.Max_Value - timestamp],在查询用户的所有操作记录数据的时候,直接指定反转后的userId,startRow是[userId反转][000000000000],stopRow是[userId反转][Long.Max_Value - timestamp]
如果需要查询某段时间的操作记录,startRow是[user反转][Long.Max_Value - 起始时间],stopRow是[userId反转][Long.Max_Value - 结束时间]
HBase建表预分区:创建HBase表时,就预先根据可能的RowKey划分出多个region而不是默认的一个,从而可以将后续的读写操作负载均衡到不同的region上,避免热点现象。
预分区一开始就预建好了一部分region,这些region都维护着自己的start-end keys,我们将rowkey做一些处理,比如RowKey%i,写数据能均衡的命中这些预建的region,就能解决上面的那些缺点,大大提供性能。
Hbase RowKey的字典排序
如下: 假设存在table:test,family:info
scan表得到结果如下:
- //列族默认VERSIONS值为1,可以通过命令:alter 'tableName','familyName',VERSINOS =>5来设置
- //不同version保存不同时间戳的数据,默认是只显示最新version数据。要想显示所有,scan时应该加上{VERSIONS =>5}
- hbase(main):009:0> scan 'test', {VERSIONS =>5}
- ROW COLUMN+CELL
- 12Aabb column=info:name,timestamp=1519957330893,value=123
- 3aabb column=info:name,timestamp=1519963129863,value=3234
- 3aabb column=info:name,timestamp=1519962895984,value=234
- 3aabb column=info:name,timestamp=1519962889791,value=234
- 3aabb column=info:name,timestamp=1519960252203,value=999
- Aabb column=info:address,timestamp=1519969857611,value=cccc
- Aabb column=info:friend,timestamp=1519969916299,value=jack
- Aabb column=info:name,timestamp=1519957330890,value=123
- aabb column=info:name,timestamp=1519957330859,value=123
- 4 row(s) in 0.0530 seconds
HBase中row key用来检索表中的记录,支持以下三种方式:
在HBase中,row key可以是任意字符串,最大长度64KB,实际应用中一般为10~100bytes,存为byte[]字节数组,一般设计成定长的。
row key是按照字典序存储,因此,设计row key时,要充分利用这个排序特点,将经常一起读取的数据存储到一块,将最近可能会被访问的数据放在一块。
举个例子:如果最近写入HBase表中的数据是最可能被访问的,可以考虑将时间戳作为row key的一部分,由于是字典序排序,所以可以使用Long.MAX_VALUE - timestamp作为row key,这样能保证新写入的数据在读取时可以被快速命中。
1、 长度越小越好
Rowkey是一个二进制码流,Rowkey的长度被很多开发者建议说设计在10~100个字节,不过建议是越短越好,不要超过16个字节。
长度原则:100字节以内,8的倍数最好,可能的情况下越短越好。因为HFile是按照 keyvalue 存储的,过长的rowkey会影响存储效率;其次,过长的rowkey在memstore中较大,影响缓冲效果,降低检索效率。最后,操作系统大多为64位,8的倍数,充分利用操作系统的最佳性能。
原因如下:
(1)数据的持久化文件HFile中是按照KeyValue存储的,如果Rowkey过长比如100个字节,1000万列数据光Rowkey就要占用100*1000万=10亿个字节,将近1G数据,这会极大影响HFile的存储效率;
(2)MemStore将缓存部分数据到内存,如果Rowkey字段过长内存的有效利用率会降低,系统将无法缓存更多的数据,这会降低检索效率。因此Rowkey的字节长度越短越好。
(3)目前操作系统是都是64位系统,内存8字节对齐。控制在16个字节,8字节的整数倍利用操作系统的最佳特性。
2、 散列性:
如果Rowkey是按时间戳的方式递增,不要将时间放在二进制码的前面,建议将Rowkey的高位作为散列字段,由程序循环生成,低位放时间字段,这样将提高数据均衡分布在每个Regionserver实现负载均衡的几率。如果没有散列字段,首字段直接是时间信息将产生所有新数据都在一个 RegionServer上堆积的热点现象,这样在做数据检索的时候负载将会集中在个别RegionServer,降低查询效率。
a、避免热点的方法 - Salting
|
现在,假如你需要将上面这个 region 分散到 4个 region。你可以用4个不同的盐:'a', 'b', 'c', 'd'.在这个方案下,每一个字母前缀都会在不同的 region 中。加盐之后,你有了下面的 rowkey:
|
b、避免热点的方法 - Hashing Hash前缀
|
我们使用 md5 计算这些 RowKey 的 hash 值,然后取前 6 位和原来的 RowKey 拼接得到新的 RowKey:
|
c、避免热点的方法 - Reversing
Reversing 的原理是反转一段固定长度或者全部的键。比如我们有以下 URL ,电话号码:
|
这些 URL 其实属于同一个域名,但是由于前面不一样,导致数据不在一起存放。我们可以对其进行反转,如下:
|
而将rowkey散列化就是避免rowkey自增,这样也能解决上面所说的缺点。
3、唯一性
HBase按指定的条件获取一批记录时,使用的就是scan方法。 scan方法有以下特点:
(1)scan可以通过setCaching与setBatch方法提高速度(以空间换时间);
(2)scan可以通过setStartRow与setEndRow来限定范围。范围越小,性能越高。
通过巧妙的RowKey设计使我们批量获取记录集合中的元素挨在一起(应该在同一个Region下),可以在遍历结果时获得很好的性能。
(3)scan可以通过setFilter方法添加过滤器,这也是分页、多条件查询的基础。
我们在设计RowKey时可以这样做:采用UserID + CreateTime + FileID组成rowKey,这样既能满足多条件查询,又能有很快的查询速度。
需要注意以下几点:
(1)每条记录的RowKey,每个字段都需要填充到相同长度。假如预期我们最多有10万量级的用户,则userID应该统一填充至6位,如000001,000002…
(2)结尾添加全局唯一的FileID的用意也是使每个文件对应的记录全局唯一。避免当UserID与CreateTime相同时的两个不同文件记录相互覆盖。
RowKey存储上述文件记录,在HBase表中是下面的结构:
rowKey(userID 6 + time 8 + fileID 6) name category ….
00000120120902000001
00000120120904000002
00000120120906000003
………………
怎样用这张表?
在建立一个scan对象后,我们setStartRow(00000120120901),setEndRow(00000120120914)。
这样,scan时只扫描userID=1的数据,且时间范围限定在这个指定的时间段内,满足了按用户以及按时间范围对结果的筛选。并且由于记录集中存储,性能很好。
然后使用SingleColumnValueFilter(org.apache.hadoop.hbase.filter.SingleColumnValueFilter),共4个,分别约束name的上下限,与category的上下限。满足按同时按文件名以及分类名的前缀匹配。
(注意:使用SingleColumnValueFilter会影响查询性能,在真正处理海量数据时会消耗很大的资源,且需要较长的时间。
在后续的博文中我将多举几种应用场景下rowKey的,可以满足简单条件下海量数据瞬时返回的查询功能)
如果需要分页还可以再加一个PageFilter限制返回记录的个数。
以上,我们完成了高性能的支持多条件查询的HBase表结构设计。
接触过HBase的同学都知道,HBase每张表在底层存储上是由至少一个Region组成,Region实际上就是HBase表的分区。HBase新建一张表时默认Region即分区的数量为1,一般在生产环境中我们都会手动给Table提前做 “预分区”,使用合适的分区策略创建好一定数量的分区并使分区均匀分布在不同regionserver上。一个分区在达到一定大小时会自动Split,一分为二。
通常情况下,生产环境的每个regionserver节点上会有很多Region存在,我们一般比较关心每个节点上的Region数量,主要为了防止HBase分区过多影响到集群的稳定性。
分区过多会带来很多不好的影响,主要体现在以下几个方面。
频繁刷写
我们知道Region的一个列族对应一个MemStore,假设HBase表都有统一的1个列族配置,则每个Region只包含一个MemStore。通常HBase的一个MemStore默认大小为128 MB,见参数hbase.hregion.memstore.flush.size。当可用内存足够时,每个MemStore可以分配128 MB空间。当可用内存紧张时,假设每个Region写入压力相同,则理论上每个MemStore会平均分配可用内存空间。
因此,当节点Region过多时,每个MemStore分到的内存空间就会很小。这个时候,写入很小的数据量就会被强制Flush到磁盘,将会导致频繁刷写。频繁刷写磁盘,会对集群HBase与HDFS造成很大的压力,可能会导致不可预期的严重后果。
压缩风暴
因Region过多导致的频繁刷写,将在磁盘上产生非常多的HFile小文件,当小文件过多的时候HBase为了优化查询性能就会做Compaction操作,合并HFile减少文件数量。当小文件一直很多的时候,就会出现 “压缩风暴”。Compaction非常消耗系统io资源,还会降低数据写入的速度,严重的会影响正常业务的进行。
MSLAB内存消耗较大
MSLAB(MemStore-local allocation buffer)存在于每个MemStore中,主要是为了解决HBase内存碎片问题,默认会分配 2 MB 的空间用于缓存最新数据。如果Region数量过多,MSLAB总的空间占用就会比较大。比如当前节点有1000个包含1个列族的Region,MSLAB就会使用1.95GB的堆内存,即使没有数据写入也会消耗这么多内存。
Master assign region时间较长
HBase Region过多时Master分配Region的时间将会很长。特别体现在重启HBase时Region上线时间较长,严重的会达到小时级,造成业务长时间等待的后果。
影响MapReduce并发数
当使用MapReduce操作HBase时,通常Region数量就是MapReduce的任务数,Region数量过多会导致并发数过多,产生过多的任务。任务太多将会占用大量资源,当操作包含很多Region的大表时,占用过多资源会影响其他任务的执行。
关于每个regionserver节点分区数量大致合理的范围,HBase官网上也给出了定义:
Generally less regions makes for a smoother running cluster (you can always manually split the big regions later (if necessary) to spread the data, or request load, over the cluster); 20-200 regions per RS is a reasonable range.
可见,通常情况下每个节点拥有20-200个Region是比较正常的。借鉴于20-200这个区间范围,我们接下来具体讨论。
实际上,每个regionserver的最大Region数量由总的MemStore内存大小决定。我们知道每个Region的每个列族对应一个MemStore,假设HBase表都有统一的1个列族配置,那么每个Region只包含一个MemStore。一个MemStore大小通常在128~256 MB,见参数hbase.hregion.memstore.flush.size。默认情况下,RegionServer会将自身堆内存的40%(见参数hbase.regionserver.global.memstore.size)供给节点上所有MemStore使用,如果所有MemStore的总大小达到该配置大小,新的更新将会被阻塞并且会强制刷写磁盘。因此,每个节点最理想的Region数量应该由以下公式计算(假设HBase表都有统一的列族配置):
((RS memory) * (total memstore fraction)) / ((memstore size)*(column families))
其中:
RS memory:表示regionserver堆内存大小,即HBASE_HEAPSIZE。
total memstore fraction:表示所有MemStore占HBASE_HEAPSIZE的比例,HBase0.98版本以后由hbase.regionserver.global.memstore.size参数控制,老版本由hbase.regionserver.global.memstore.upperLimit参数控制,默认值0.4。
memstore size:即每个MemStore的大小,原生HBase中默认128M。
column families:即表的列族数量,通常情况下只设置1个,最多不超过3个。
举个例子,假如一个集群中每个regionserver的堆内存是32GB,那么节点上最理想的Region数量应该是32768*0.4/128 ≈ 102,所以,当前环境中单节点理想情况下大概有102个Region。
这种最理想情况是假设每个Region上的填充率都一样,包括数据写入的频次、写入数据的大小,但实际上每个Region的负载各不相同,可能有的Region特别活跃负载特别高,有的Region则比较空闲。所以,通常我们认为23倍的理想Region数量也是比较合理的,针对上面举例来说,大概200300个Region算是合理的。
如果实际的Region数量比2~3倍的计算值还要多,就要实际观察Region的刷写、压缩情况了,Region越多则风险越大。经验告诉我们,如果单节点Region数量过千,集群可能存在较大风险。
通过上述分析,我们大概知道在生产环境中,如果一个regionserver节点的Region数量在 20~200 我们认为是比较正常的,但是我们也要重点参考理论合理计算值。如果每个Region的负载比较均衡,分区数量在2~3倍的理论合理计算值通常认为也是比较正常的。
假设我们集群单节点Region数量比2~3倍计算值还要多,因为实际存在单节点分区数达到1000+/2000+的集群,遇到这种情况我们就要密切观察Region的刷写压缩情况了,主要从日志上分析,因为Region越多HBase集群的风险越大。经验告诉我们,如果单节点Region数量过千,集群可能存在较大风险。
分多少个:这个需要我们就要根据我们集群规模来进行安排,假设我们有5个regionServer,每个regionServer有20个region,那么总共就是100个region,最后的工作就是将000-fff分成100份。
最近一次发现导入10T数据到HBase(自动分配regions模式)表中,该表只占用55个regions匪夷所思, 根据每个regions存储文件的大小10G( hbase.hregion.max.filesize设置的值是10G), hbase表压缩方式为:“SNAPPY”格式.此类压缩比在60%左右. 根据以上计算,数据表最少分配600个regions. 实际通过查看hbase表各个resion的文件大小,发现有的regions文件有600G的数据量,这种数据存储严重异常.
通过数天排查(每个regionserver最大可以分配的regions个数),测试环境的集群中各种尝、验证,最终使用指定分隔点个数去分配regions的方式成功导入数据,并且数据分配也很均衡. (分隔点如何确定:根据全量的rowkey分割600份,然后抽取每份的最后一个rowkey值作为分割点)
此次排查思路主要参考了之前成功导入数据表的meta信息,
查看某个表的meta信息:
echo "scan 'hbase:meta'" | hbase shell | grep 'hbase_table_name' , 然后观察每个regions的 startKey 及 endKey
根据hbase的建表手册, 创建hbase表时预分regions有俩种方式:
法一:
1、如果整个导入的数据集是已知的,并且也知道所有Hbase表的Rowkey的分布情况,通过Region的startkey和endkey的方式预分区,此种方式完全可以满足存储在每个regions上的数据均衡分布, 这种方式有2种建表方式, 如果分割点比较少,可以在建表语句中直接指定,
1.1、例如:
- create 'card_active_quota',
- {NAME =>'n',VERSIONS => 1, COMPRESSION => 'SNAPPY'},
- {SPLITS => ['10', '20', '30', '40']}
-
-
- --VERSIONS为1,也就是说,默认情况只会存取一个版本的列数据,当再次插入的时候,后面的值会覆盖前面的值。
以上的分割点是:
第一regions是 startkey=>'' endKey=>'10',
第二个regions的startKey => '10' endKey => '20'
依次类推, 第五个regions的startkey=>'40' endKey=>''.
规律是第一个region是没有startkey的,最后一个region是没有stopkey的
- Java API
- 创建一个
- byte[][] splitKeys = {{1,2,3},{4,5,6}}
- admin.createTable(tableDesc,splitKeys)
要进行预分区,首先要明确rowkey的取值范围或构成逻辑,以我的rowkey组成为例:两位随机数+时间戳+客户号,两位随机数的范围从00-99,于是我划分了10个region来存储数据,每个region对应的rowkey范围如下:
-10,10-20,20-30,30-40,40-50,50-60,60-70,70-80,80-90,90-
如果分割点比较多,可以通过splits_file的文件行数预分区regions个数,例如像我们此次通过计算可能需要599个分割点,那么通过把分割点(文件中每一行代表一个分割点)写入文件中,创建表时直接引用分割文件即可,
例如:
- create 'card_active_quota',
- {NAME =>'n',VERSIONS => 1, COMPRESSION => 'SNAPPY'},
- {SPLITS_FILE => '/home/part/splits.txt'}
注意:splits.txt 内容格式:
每一行都认为是分隔点:
400258AD77AD659C7D9B8BB2D718488A016D9074DD39F2AC391AB573C2908017
7FF9001D147A700B73BDD18378C62C47C8D22680718503A7F6E078186086029A
BFEDF91AFE392EDF60CE378C8D2E5CAFDB8D6F0B249CC9A8AF4962788B1D8108
.......
注意:如果 进入hbase shell 是包含 splits.txt 的目录下,那么可以在建表语句中使用
SPLITS_FILE => 'splits.txt' ,同样也可以指定本地文件的绝对路径.
- create 'split_table_test',{NAME =>'cf', COMPRESSION => 'SNAPPY'}, {SPLITS_FILE => 'region_split_info.txt'}
-
- region_split_info.txt 内容:
-
- 0001|
- 0002|
- 0003|
- 0004|
- 0005|
- 0006|
- 0007|
- 0008|
- 0009|
-
- create 'testTable',
- {
- NAME => 'cf',
- DATA_BLOCK_ENCODING => 'NONE',
- BLOOMFILTER => 'ROW',
- REPLICATION_SCOPE=> '0',
- VERSIONS => '1',
- COMPRESSION => 'snappy',
- MIN_VERSIONS =>'0',
- TTL => '15552000',
- KEEP_DELETED_CELLS => 'false',
- BLOCKSIZE =>'65536',
- IN_MEMORY => 'false',
- BLOCKCACHE => 'true',
- METADATA =>{'ENCODE_ON_DISK' => 'true'}
- },
-
- {SPLITS_FILE=>'/app/soft/test/region_split_info.txt'}
下面,我们登陆一下master的web页面<Hmaster:60010>,查看一下hbase的表信息,找到刚刚新建的预分区表,进入查看region信息:
我们看到第一个region是没有startkey的,最后一个region是没有stopkey的。
1、第一种设计rowkey方式:随机数+messageId,如果想让最近的数据快速get到,可以将时间戳加上,我这里的region是0001|到0009|开头的,因为hbase的数据是字典序排序的,所以如果我生成的 rowkey=0002rer4343343422,则当前这条数据就会保存到0001|~0002|这个region里,因为我的messageId都是字母+数字,“|”的ASCII值大于字母、数字。
2、第二种设计rowkey的方式:通过messageId映射regionNo,这样既可以让数据均匀分布到各个region中,同时可以根据startkey和endkey可以get到同一批数据,messageId映射regionNo,使用一致性hash算法解决,一致性哈希算法在1997年由麻省理工学院的Karger等人在解决分布式Cache中提出的,设计目标是为了解决因特网中的热点(Hot spot)问题,第2种比较好
法二:
2、导入的数据集是规则的,那么可以通过Hbase的分区算法方式进行分割:目前系统有2种:1、HexStringSplit,2、UniformSplit,3 或在自定义一个实现类的方式进行预分区.如果将来的数据存储是十六进制的那么比较使用 “HexStringSplit”,作为pre-split的算法.
建表语句: rowkey的散列:HexStringSplit 预分区:15
- create 'card_active_quota',
- {NAME => 'n', VERSIONS => 1, COMPRESSION => 'SNAPPY'},
- {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}"
分区过多会带来很多不好的影响,主要体现在以下几个方面。
1、频繁刷写
我们知道 Region 的一个列族对应一个 MemStore ,假设 HBase 表都有统一的1个列族配置,则每个 Region 只包含一个 MemStore 。通常 HBase 的一个 MemStore 默认大小为 128 MB,见参数 hbase.hregion.memstore.flush.size 。当可用内存足够时,每个 MemStore 可以分配 128 MB空间。当可用内存紧张时,假设每个 Region 写入压力相同,则理论上每个 MemStore 会平均分配可用内存空间。
因此,当节点 Region 过多时,每个 MemStore 分到的内存空间就会很小。这个时候,写入很小的数据量就会被强制 Flush 到磁盘,将会导致频繁刷写。频繁刷写磁盘,会对集群 HBase 与 HDFS 造成很大的压力,可能会导致不可预期的严重后果。
2、压缩风暴
因 Region 过多导致的频繁刷写,将在磁盘上产生非常多的 HFile 小文件,当小文件过多时,HBase 为了优化查询性能就会做 Compaction 操作,合并 HFile 减少文件数量。当小文件一直很多的时候,就会出现 "压缩风暴"。Compaction 非常消耗系统 io 资源,还会降低数据写入的速度,严重的会影响正常业务的进行。
3、MSLAB 内存消耗较大
MSLAB(MemStore-local allocation buffer)存在于每个 MemStore 中,主要是为了解决 HBase 内存碎片问题,默认会分配 2 MB 的空间用于缓存最新数据。如果 Region 数量过多,MSLAB 总的空间占用就会比较大。比如当前节点有 1000 个包含 1 个列族的 Region ,MSLAB 就会使用 1.95GB 的堆内存,即使没有数据写入也会消耗这么多内存。
4、Master assign region 时间较长
HBase Region 过多时,Master 分配 Region 的时间将会很长。特别体现在重启 HBase 时 Region 上线时间较长,严重的会达到小时级,造成业务长时间等待的后果。
5、影响 MapReduce 并发数
当使用 MapReduce 操作 HBase 时,通常 Region 数量就是 MapReduce 的任务数,Region 数量过多会导致并发数过多,产生过多的任务。任务太多将会占用大量资源,当操作包含很多 Region 的大表时,占用过多资源会影响其他任务的执行。
关于每个 RegionServer 节点分区数量大致合理的范围,HBase 官网上也给出了定义:
Generally less regions makes for a smoother running cluster (you can always manually split the big regions later (if necessary) to spread the data, or request load, over the cluster); 20-200 regions per RS is a reasonable range.
可见,通常情况下每个节点拥有 20~200 个 Region 是比较正常的。借鉴于 20~200 这个区间范围,我们接下来具体讨论。
实际上,每个 RegionServer 的最大 Region 数量由总的 MemStore 内存大小决定。我们知道每个 Region 的每个列族对应一个 MemStore ,假设 HBase 表都有统一的 1 个列族配置,那么每个 Region 只包含一个 MemStore 。一个 MemStore 大小通常在 128~256 MB,见参数 hbase.hregion.memstore.flush.size 。默认情况下,RegionServer会将自身堆内存的40%(见参数hbase.regionserver.global.memstore.size)供给节点上所有MemStore使用,如果所有 MemStore 的总大小达到该配置大小,新的更新将会被阻塞并且会强制刷写磁盘。因此,每个节点最理想的 Region 数量应该由以下公式计算(假设 HBase 表都有统一的列族配置):
((RS memory) * (total memstore fraction)) / ((memstore size) * (column families))
其中:
RS memory:表示 RegionServer 堆内存大小,即 HBASE_HEAPSIZE 。
total memstore fraction:表示所有MemStore占HBASE_HEAPSIZE 的比例,HBase 0.98 版本以后由 hbase.regionserver.global.memstore.size 参数控制,老版本由 hbase.regionserver.global.memstore.upperLimit 参数控制,默认值 0.4 。
memstore size:即每个 MemStore 的大小,原生 HBase 中默认 128 M。
column families:即表的列族数量,通常情况下只设置 1 个,最多不超过 3 个。
举个例子,假如一个集群中每个 RegionServer 的堆内存是 32 GB,那么节点上最理想的 Region 数量应该是 32768 * 0.4 / 128 * 1 ≈ 102 ,所以,当前环境中单节点理想情况下大概有 102 个 Region 。
这种最理想情况是假设每个 Region 上的填充率都一样,包括数据写入的频次、写入数据的大小,但实际上每个 Region 的负载各不相同,可能有的 Region 特别活跃负载特别高,有的 Region 则比较空闲。所以,通常我们认为 2~3 倍的理想 Region 数量也是比较合理的,针对上面举例来说,大概 200~300 个 Region 算是合理的。
如果实际的 Region 数量比 2~3 倍的计算值还要多,就要实际观察 Region 的刷写、压缩情况了,Region 越多则风险越大。经验告诉我们,如果单节点 Region 数量过千,集群可能存在较大风险。
四、总结
通过上述分析,我们大概知道在生产环境中,如果一个 RegionServer 节点的Region 数量在 20~200 我们认为是比较正常的,但是我们也要重点参考理论合理计算值。如果每个 Region 的负载比较均衡,分区数量在 2~3 倍的理论合理计算值通常认为也是比较正常的。
假设我们集群单节点 Region 数量比 2~3 倍计算值还要多,因为实际存在单节点分区数达到 1000+/2000+ 的集群,遇到这种情况我们就要密切观察 Region 的刷写压缩情况了,主要从日志上分析,因为 Region 越多 HBase 集群的风险越大。经验告诉我们,如果单节点 Region 数量过千,集群可能存在较大风险。
不要在一张表里定义太多的column family。目前Hbase并不能很好的处理超过2~3个column family的表。
因为某个column family在flush的时候,它邻近的column family也会因关联效应被触发flush,最终导致系统产生更多的I/O。感兴趣的同学可以对自己的HBase集群进行实际测试,从得到的测试结果数据验证一下。
family过多,获取单个cell数据时候,会扫描同一个key的所有数据(列存储),性能明显降低
创建表的时候,可以通过HColumnDescriptor.setMaxVersions(int maxVersions)设置表中数据的最大版本,如果只需要保存最新版本的数据,那么可以设置setMaxVersions(1)。
Hbase中多版本(version)数据获取办法
从上面的表结构中,我们可以看到,VERSIONS为1,也就是说,默认情况只会存取一个版本的列数据,当再次插入的时候,后面的值会覆盖前面的值。
3、修改表结构,让Hbase表支持存储3个VERSIONS的版本列数据
再次查看表结构:
发现VERSIONS已经修改成了3.
4、插入3行数据
从上面可以看出,插入了3行数据到表中,并且3行数据的rowkey一致,然后使用get命令来获取这一行数据,发现只返回了最新的一行数据。
5、获取多行数据方法
1.7 Compact & Split
在HBase中,数据在更新时首先写入WAL 日志(HLog)和内存(MemStore)中,MemStore中的数据是排序的,当MemStore累计到一定阈值时,就会创建一个新的MemStore,并且将老的MemStore添加到flush队列,由单独的线程flush到磁盘上,成为一个StoreFile。于此同时, 系统会在zookeeper中记录一个redo point,表示这个时刻之前的变更已经持久化了(minor compact)。
StoreFile是只读的,一旦创建后就不可以再修改。因此Hbase的更新其实是不断追加的操作。当一个Store中的StoreFile达到一定的阈值后,就会进行一次合并(major compact),将对同一个key的修改合并到一起,形成一个大的StoreFile,当StoreFile的大小达到一定阈值后,又会对 StoreFile进行分割(split),等分为两个StoreFile。
由于对表的更新是不断追加的,处理读请求时,需要访问Store中全部的StoreFile和MemStore,将它们按照row key进行合并,由于StoreFile和MemStore都是经过排序的,并且StoreFile带有内存中索引,通常合并过程还是比较快的。
实际应用中,可以考虑必要时手动进行major compact,将同一个row key的修改进行合并形成一个大的StoreFile。同时,可以将StoreFile设置大些,减少split的发生。
hbase为了防止小文件(被刷到磁盘的menstore)过多,以保证保证查询效率,hbase需要在必要的时候将这些小的store file合并成相对较大的store file,这个过程就称之为compaction。在hbase中,主要存在两种类型的compaction:minor compaction和major compaction。
minor compaction:的是较小、很少文件的合并。
major compaction 的功能是将所有的store file合并成一个,触发major compaction的可能条件有:major_compact 命令、majorCompact() API、region server自动运行(相关参数:hbase.hregion.majoucompaction 默认为24 小时、hbase.hregion.majorcompaction.jetter 默认值为0.2 防止region server 在同一时间进行major compaction)。
hbase.hregion.majorcompaction.jetter参数的作用是:对参数hbase.hregion.majoucompaction 规定的值起到浮动的作用,假如两个参数都为默认值24和0,2,那么major compact最终使用的数值为:19.2~28.8 这个范围。
1、 关闭自动major compaction
2、 手动编程major compaction
Timer类,contab
minor compaction的运行机制要复杂一些,它由一下几个参数共同决定:
hbase.hstore.compaction.min :默认值为 3,表示至少需要三个满足条件的store file时,minor compaction才会启动
hbase.hstore.compaction.max 默认值为10,表示一次minor compaction中最多选取10个store file
hbase.hstore.compaction.min.size 表示文件大小小于该值的store file 一定会加入到minor compaction的store file中
hbase.hstore.compaction.max.size 表示文件大小大于该值的store file 一定会被minor compaction排除
hbase.hstore.compaction.ratio 将store file 按照文件年龄排序(older to younger),minor compaction总是从older store file开始选择
原则:在合理范围内能尽量少的减少列簇就尽量减少列簇,因为列簇是共享region的,每个列簇数据相差太大导致查询效率低下。
最优:将所有相关性很强的 key-value 都放在同一个列簇下,这样既能做到查询效率最高,也能保持尽可能少的访问不同的磁盘文件。以用户信息为例,可以将必须的基本信息存放在一个列族,而一些附加的额外信息可以放在另一列族。
创建多个HTable客户端用于写操作,提高写数据的吞吐量,一个例子:
- static final Configuration conf = HBaseConfiguration.create();
- static final String table_log_name = “user_log”;
- wTableLog = new HTable[tableN];
- for (int i = 0; i < tableN; i++) {
- wTableLog[i] = new HTable(conf, table_log_name);
- wTableLog[i].setWriteBufferSize(5 * 1024 * 1024); //5MB
- wTableLog[i].setAutoFlush(false);
2.2.1 Auto Flush
通过调用HTable.setAutoFlush(false)方法可以将HTable写客户端的自动flush关闭,这样可以批量写入数据到HBase,而不是有一条put就执行一次更新,只有当put填满客户端写缓存时,才实际向HBase服务端发起写请求。默认情况下auto flush是开启的。
2.2.2 Write Buffer
通过调用HTable.setWriteBufferSize(writeBufferSize)方法可以设置HTable客户端的写buffer大小,如果新设置的buffer小于当前写buffer中的数据时,buffer将会被flush到服务端。其中,writeBufferSize的单位是byte字节数,可以根据实际写入数据量的多少来设置该值。
3.2.1 Scanner Caching
hbase.client.scanner.caching配置项可以设置HBase scanner一次从服务端抓取的数据条数,默认情况下一次一条。通过将其设置成一个合理的值,可以减少scan过程中next()的时间开销,代价是scanner需要通过客户端的内存来维持这些被cache的行记录。
有三个地方可以进行配置:1)在HBase的conf配置文件中进行配置;(一般不用次全局配置!!!)2)通过调用HTable.setScannerCaching(int scannerCaching)进行配置;3)通过调用Scan.setCaching(int caching)进行配置。三者的优先级越来越高。
3.2.2 Scan Attribute Selection
scan时指定需要的Column Family,可以减少网络传输数据量,否则默认scan操作会返回整行所有Column Family的数据。
3.2.3 Close ResultScanner
通过scan取完数据后,记得要关闭ResultScanner,否则RegionServer可能会出现问题(对应的Server资源无法释放)。
通过调用HTable.put(Put)方法可以将一个指定的row key记录写入HBase,同样HBase提供了另一个方法:通过调用HTable.put(List<Put>)方法可以将指定的row key列表,批量写入多行记录,这样做的好处是批量执行,只需要一次网络I/O开销,这对于对数据实时性要求高,网络传输RTT高的情景下可能带来明显的性能提升。
通过调用HTable.get(Get)方法可以根据一个指定的row key获取一行记录,同样HBase提供了另一个方法:通过调用HTable.get(List<Get>)方法可以根据一个指定的row key列表,批量获取多行记录,这样做的好处是批量执行,只需要一次网络I/O开销,这对于对数据实时性要求高而且网络传输RTT高的情景下可能带来明显的性能提升。
在客户端开启多个HTable写线程,每个写线程负责一个HTable对象的flush操作,这样结合定时flush和写buffer(writeBufferSize),可以既保证在数据量小的时候,数据可以在较短时间内被flush(如1秒内),同时又保证在数据量大的时候,写buffer一满就及时进行flush。下面给个具体的例子:
- 、、多线程并发写
- for (int i = 0; i < threadN; i++) {
- Thread th = new Thread() {
- public void run() {
- while (true) {
- try {
- sleep(1000); //1 second
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- synchronized (wTableLog[i]) {
- try {
- wTableLog[i].flushCommits();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
- };
- th.setDaemon(true);
- th.start();
- }
- 、、3.4 多线程并发读
- public class DataReaderServer {
- //获取店铺一天内各分钟PV值的入口函数
- public static ConcurrentHashMap<String, String> getUnitMinutePV(long uid, long startStamp, long endStamp){
- long min = startStamp;
- int count = (int)((endStamp - startStamp) / (60*1000));
- List<String> lst = new ArrayList<String>();
- for (int i = 0; i <= count; i++) {
- min = startStamp + i * 60 * 1000;
- lst.add(uid + "_" + min);
- }
- return parallelBatchMinutePV(lst);
- }
- //多线程并发查询,获取分钟PV值
- private static ConcurrentHashMap<String, String> parallelBatchMinutePV(List<String> lstKeys){
- ConcurrentHashMap<String, String> hashRet = new ConcurrentHashMap<String, String>();
- int parallel = 3;
- List<List<String>> lstBatchKeys = null;
- if (lstKeys.size() < parallel ){
- lstBatchKeys = new ArrayList<List<String>>(1);
- lstBatchKeys.add(lstKeys);
- }
- else{
- lstBatchKeys = new ArrayList<List<String>>(parallel);
- for(int i = 0; i < parallel; i++ ){
- List<String> lst = new ArrayList<String>();
- lstBatchKeys.add(lst);
- }
-
- for(int i = 0 ; i < lstKeys.size() ; i ++ ){
- lstBatchKeys.get(i%parallel).add(lstKeys.get(i));
- }
- }
-
- List<Future< ConcurrentHashMap<String, String> >> futures = new ArrayList<Future< ConcurrentHashMap<String, String> >>(5);
-
- ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
- builder.setNameFormat("ParallelBatchQuery");
- ThreadFactory factory = builder.build();
- ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(lstBatchKeys.size(), factory);
-
- for(List<String> keys : lstBatchKeys){
- Callable< ConcurrentHashMap<String, String> > callable = new BatchMinutePVCallable(keys);
- FutureTask< ConcurrentHashMap<String, String> > future = (FutureTask< ConcurrentHashMap<String, String> >) executor.submit(callable);
- futures.add(future);
- }
- executor.shutdown();
-
- // Wait for all the tasks to finish
- try {
- boolean stillRunning = !executor.awaitTermination(
- 5000000, TimeUnit.MILLISECONDS);
- if (stillRunning) {
- try {
- executor.shutdownNow();
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- } catch (InterruptedException e) {
- try {
- Thread.currentThread().interrupt();
- } catch (Exception e1) {
- // TODO Auto-generated catch block
- e1.printStackTrace();
- }
- }
-
- // Look for any exception
- for (Future f : futures) {
- try {
- if(f.get() != null)
- {
- hashRet.putAll((ConcurrentHashMap<String, String>)f.get());
- }
- } catch (InterruptedException e) {
- try {
- Thread.currentThread().interrupt();
- } catch (Exception e1) {
- // TODO Auto-generated catch block
- e1.printStackTrace();
- }
- } catch (ExecutionException e) {
- e.printStackTrace();
- }
- }
-
- return hashRet;
- }
- //一个线程批量查询,获取分钟PV值
- protected static ConcurrentHashMap<String, String> getBatchMinutePV(List<String> lstKeys){
- ConcurrentHashMap<String, String> hashRet = null;
- List<Get> lstGet = new ArrayList<Get>();
- String[] splitValue = null;
- for (String s : lstKeys) {
- splitValue = s.split("_");
- long uid = Long.parseLong(splitValue[0]);
- long min = Long.parseLong(splitValue[1]);
- byte[] key = new byte[16];
- Bytes.putLong(key, 0, uid);
- Bytes.putLong(key, 8, min);
- Get g = new Get(key);
- g.addFamily(fp);
- lstGet.add(g);
- }
- Result[] res = null;
- try {
- res = tableMinutePV[rand.nextInt(tableN)].get(lstGet);
- } catch (IOException e1) {
- logger.error("tableMinutePV exception, e=" + e1.getStackTrace());
- }
-
- if (res != null && res.length > 0) {
- hashRet = new ConcurrentHashMap<String, String>(res.length);
- for (Result re : res) {
- if (re != null && !re.isEmpty()) {
- try {
- byte[] key = re.getRow();
- byte[] value = re.getValue(fp, cp);
- if (key != null && value != null) {
- hashRet.put(String.valueOf(Bytes.toLong(key,
- Bytes.SIZEOF_LONG)), String.valueOf(Bytes
- .toLong(value)));
- }
- } catch (Exception e2) {
- logger.error(e2.getStackTrace());
- }
- }
- }
- }
-
- return hashRet;
- }
- }
- //调用接口类,实现Callable接口
- class BatchMinutePVCallable implements Callable<ConcurrentHashMap<String, String>>{
- private List<String> keys;
-
- public BatchMinutePVCallable(List<String> lstKeys ) {
- this.keys = lstKeys;
- }
-
- public ConcurrentHashMap<String, String> call() throws Exception {
- return DataReadServer.getBatchMinutePV(keys);
- }
基于MR思想,直接生成Hbase底层的数据文件,不写wa,降低IO,几乎不影响读取
适合如克实时性要求不高的场景
hive导入hbase批量入库----单条put 、批量put 、Mapreduce、 bluckload_.-CSDN博客
对于频繁查询HBase的应用场景,可以考虑在应用程序中做缓存,当有新的查询请求时,首先在缓存中查找,如果存在则直接返回,不再查询HBase;否则对HBase发起读请求查询,然后在应用程序中将查询结果缓存起来。至于缓存的替换策略,可以考虑LRU等常用的策略。
HBase上Regionserver的内存分为两个部分,一部分作为Memstore,主要用来写;另外一部分作为BlockCache,主要用于读。
写请求会先写入Memstore,Regionserver会给每个region提供一个Memstore,当Memstore满64MB以后,会启动 flush刷新到磁盘。当Memstore的总大小超过限制时(heapsize * hbase.regionserver.global.memstore.upperLimit * 0.9),会强行启动flush进程,从最大的Memstore开始flush直到低于限制。
读请求先到Memstore中查数据,查不到就到BlockCache中查,再查不到就会到磁盘上读,并把读的结果放入BlockCache。由于BlockCache采用的是LRU策略,因此BlockCache达到上限(heapsize * hfile.block.cache.size * 0.85)后,会启动淘汰机制,淘汰掉最老的一批数据。
一个Regionserver上有一个BlockCache和N个Memstore,它们的大小之和不能大于等于heapsize * 0.8,否则HBase不能启动。默认BlockCache为0.2,而Memstore为0.4。对于注重读响应时间的系统,可以将 BlockCache设大些,比如设置BlockCache=0.4,Memstore=0.39,以加大缓存的命中率。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。