当前位置:   article > 正文

从OpenTsdb来分析rowkey设计

opentsdb rowkey设计
讨论此问题前,先理解一个问题。

关于Hbase热点问题

当处理由连续事件得到的数据时,即时间上连续的数据。这些数据可能来自于某个传感器网络、证券交易或者一个监控系统。它们显著的特点就是rowkey中含有事件发生时间。带来的一个问题便是HBase对于row的不均衡分布,它们被存储在一个唯一的rowkey区间中,被称为region,区间的范围被称为Start Key和End Key。
对于单调递增的时间类型数据,很容易被散列到同一个Region中,这样它们会被存储在同一个服务器上,从而所有的访问和更新操作都会集中到这一台服务器上,从而在集群中形成一个hotspot,从而不能将集群的整体性能发挥出来。
要解决这个问题是非常容易的,只需要将所有的数据散列到全部的Region上即可。这是可以做到的,比如,在rowkey前面加上一个非线程序列。

OpenTsdb 数据库设计
我们来看一下它的建表语句
TSDB_TABLE=${TSDB_TABLE-'tsdb'}UID_TABLE=${UID_TABLE-'tsdb-uid'}create '$UID_TABLE',  {NAME => 'id', COMPRESSION => '$COMPRESSION', BLOOMFILTER => '$BLOOMFILTER'},  {NAME => 'name', COMPRESSION => '$COMPRESSION', BLOOMFILTER => '$BLOOMFILTER'}create '$TSDB_TABLE',  {NAME => 't', VERSIONS => 1, COMPRESSION => '$COMPRESSION', BLOOMFILTER => '$BLOOMFILTER'}

表 tsdb包含一个列族“T”,tsdb-uid包含两个列族 id和name,在hbase中列族名字越短越有利于存储,因为它占用的字节数越短。
tsdb 有标签的概念:我理解为就是指标,例如mysql.bytes_sent就是一个指标,每个指标在tsdb表中有一个唯一的标识unique ID 简称UID.

tsdb_uid是 tsdb 的辅助表,用于查找uid对应的指标或者指标对应的uid

新增一个指标,会在这个表中增加两行记录,一行rowkey是指标名,id是uid,另外一行rowkey是uid,name是指标名.
在tsdb_uid表中列族 id 和name 都分别包含三列
metrics 指标名或者指标uid
tagk 标签名或者标签名uid
tagv 标签值或者标签值uid

id 列族:用来将字符串映射到UID
name 列族:用来将一个UID映射到一个字符串

例如 我要存储 host=web、user=admin、project=uc 的web访问量(web.pv)
[img]http://dl2.iteye.com/upload/attachment/0111/4566/5d344f2d-fcc7-3184-a442-aa26f0b07c39.bmp[/img]


tsdb表中其实 仅用到和rowkey 列族T是没有用到的,知识因为HBase规定必须至少有一个列族。tsdb表的rowkey设计很值得我们去了解。
[table]
|监控指标UID(3byte)|部分时间戳(4byte)|标签1名UID(3byte)|标签1值UID(3byte)|....|
[/table]
在OpenTSDB中,所有数据存储在tsdb的表中,这是为了充分利用hbase有序和region分布式的特点。所有的值都保存在列族t中。
rowkey为<metric_uid><timestamp><tagk1><tagv1>[...<tagkN><tagvN>],
这样设计的目的是:OpenTsdb以监控指标为中心的查询进行优化,所以监控指标UID排在最前面,在同一个监控指标中,按时间戳进行排序,行健里时间戳存储四舍五入到60分钟,所以1个小时内统一监控指标的数据会存储在一起。
OpenTSDB的tsdb启动之后,会监控指定的socket端口(默认为4242),接收到监控数据,包括指标、时间戳、数据、tag标签,tag标签包括tag名称ID和tag值ID。例如:
继续用上面的例子
web.pv 1292148123 42 host=web user=admin project=uc
对于指标web.pv的ID为:[0, 0, 1],host标签名称的ID为:[0, 0, 2], web标签值的ID为:[0, 0, 3], 标签名称user的ID为:[0, 0, 4] admin标签值的ID为:[0, 0, 5],标签名称project的ID为:[0, 0, 6] uc标签值的ID为:[0, 0, 7],他们组成rowkey:
[color=red]
[0,0,1, 77,4,-99,32, 0,0,2, 0,0,3, 0,0,4, 0,0,5, 0,0,6, 0,0,7]
metricid timestamp tagk1 tagv1 tagk2 tagv2 tagk3 tagv3
[/color]

[b]可以看到,对于metric + tags相同的数据都会连续存放,且metic相同的数据也会连续存放,这样对于scan以及做aggregation都非常有帮助[/b]
对于时间戳为1292148123的数据点来说,其转换为以小时为单位的基准时间(去掉小时后的秒)为129214800,偏移为123,转换为二进制为1111011,因为该值为整数且长度为8位(对应为2byte,故最后3bit为100),故其对应的列族名为:0000011110110100,将其转换为十六进制为07B4

[b]OPENTSDB两种查询数据下hbase表设计优势的体现:[/b]
1:自动补全功能
[color=red]了解此功能前首先了解一下hbase 的block cache 功能[/color]

HBase上Regionserver的内存分为两个部分,一部分作为Memstore,主要用来写;另外一部分作为BlockCache,主要用于读。
写请求会先写入Memstore,Regionserver会给每个region提供一个Memstore,当Memstore满64MB以后,会启动 flush刷新到磁盘。当Memstore的总大小超过限制时(heapsize * hbase.regionserver.global.memstore.upperLimit * 0.9),会强行启动flush进程,从最大的Memstore开始flush直到低于限制。
读请求先到Memstore中查数据,查不到就到BlockCache中查,再查不到就会到磁盘上读,并把读的结果放入BlockCache。由于BlockCache采用的是LRU策略,因此BlockCache达到上限(heapsize * hfile.block.cache.size * 0.85)后,会启动淘汰机制,淘汰掉最老的一批数据。

hbase 在每张表的rowkey上建立索引,定位起始位置会非常快,然后通过block cache 查询数据,连续的数据例如 mysql_xxx 会连续存放在一起,scan tsdb_uid 表的 name_uid映射记录即可快速查询。

2:读取时间序列数据
使用一种建立在行健上的正则表达式
可以试着读一下源代码。
/**   * Sets the server-side regexp filter on the scanner.   * In order to find the rows with the relevant tags, we use a   * server-side filter that matches a regular expression on the row key.   * @param scanner The scanner on which to add the filter.   */  private void createAndSetFilter(final Scanner scanner) {    if (group_bys != null) {      Collections.sort(group_bys, Bytes.MEMCMP);    }    final short name_width = tsdb.tag_names.width();    final short value_width = tsdb.tag_values.width();    final short tagsize = (short) (name_width + value_width);    // Generate a regexp for our tags.  Say we have 2 tags: { 0 0 1 0 0 2 }    // and { 4 5 6 9 8 7 }, the regexp will be:    // "^.{7}(?:.{6})*\\Q\000\000\001\000\000\002\\E(?:.{6})*\\Q\004\005\006\011\010\007\\E(?:.{6})*$"    final StringBuilder buf = new StringBuilder(        15  // "^.{N}" + "(?:.{M})*" + "$"        + ((13 + tagsize) // "(?:.{M})*\\Q" + tagsize bytes + "\\E"           * (tags.size() + (group_bys == null ? 0 : group_bys.size() * 3))));    // In order to avoid re-allocations, reserve a bit more w/ groups ^^^    // Alright, let's build this regexp.  From the beginning...    buf.append("(?s)"  // Ensure we use the DOTALL flag.               + "^.{")       // ... start by skipping the metric ID and timestamp.       .append(tsdb.metrics.width() + Const.TIMESTAMP_BYTES)       .append("}");    final Iterator<byte[]> tags = this.tags.iterator();    final Iterator<byte[]> group_bys = (this.group_bys == null                                        ? new ArrayList<byte[]>(0).iterator()                                        : this.group_bys.iterator());    byte[] tag = tags.hasNext() ? tags.next() : null;    byte[] group_by = group_bys.hasNext() ? group_bys.next() : null;    // Tags and group_bys are already sorted.  We need to put them in the    // regexp in order by ID, which means we just merge two sorted lists.    do {      // Skip any number of tags.      buf.append("(?:.{").append(tagsize).append("})*\\Q");      if (isTagNext(name_width, tag, group_by)) {        addId(buf, tag);        tag = tags.hasNext() ? tags.next() : null;      } else {  // Add a group_by.        addId(buf, group_by);        final byte[][] value_ids = (group_by_values == null                                    ? null                                    : group_by_values.get(group_by));        if (value_ids == null) {  // We don't want any specific ID...          buf.append(".{").append(value_width).append('}');  // Any value ID.        } else {  // We want specific IDs.  List them: /(AAA|BBB|CCC|..)/          buf.append("(?:");          for (final byte[] value_id : value_ids) {            buf.append("\\Q");            addId(buf, value_id);            buf.append('|');          }          // Replace the pipe of the last iteration.          buf.setCharAt(buf.length() - 1, ')');        }        group_by = group_bys.hasNext() ? group_bys.next() : null;      }    } while (tag != group_by);  // Stop when they both become null.    // Skip any number of tags before the end.    buf.append("(?:.{").append(tagsize).append("})*$");    scanner.setKeyRegexp(buf.toString(), CHARSET);   }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/561393
推荐阅读
  

闽ICP备14008679号