赞
踩
B+树是B树的一种变种,因此若想了解B+树,首先要了解B树的定义。B树又称多路平衡查找树,B树中所有结点的孩子个数的最大值称为B树的阶,通常用M表示。一般从查找效率考虑,通常要求M>=3。一棵M阶B树,有如下特性:
下图是一个B树的例子:
为了适应磁盘IO访问的特点以及适应范围查询的需求,B+树对B树进行了改进。对于一棵m阶的B+树,有如下特性:
下图是一个B+树的例子:
B+树和B树相比,主要有以下区别:
B+树通常用于数据库索引,例如Mysql的InnoDB存储引擎以及MyISAM存储引擎的索引文件中使用的就是B+树。一般来说,数据库的索引都比较大,不可能全部存储在内存中,因此索引往往以文件的形式存储的磁盘上。系统从磁盘读取文件到内存时,需要定位到文件所在的位置:文件所在柱面号,磁盘号,扇区号。这个操作时非常耗时的,远高于内存操作。考虑到磁盘IO是非常高昂的操作,操作系统在读取文件时做了一些优化,系统从磁盘读取文件到内存时是以磁盘块(block)为基本单位的,位于同一个磁盘块中的数据会被一次性读取出来,而不是需要什么取什么。每一次IO读取的数据我们称之为一页(page)。具体一页有多大数据跟操作系统有关,一般为4k或8k。
由于磁盘IO非常耗时,因此评价一个数据结构作为索引的优劣最重要的指标就是在查找过程中是否能够有效减少磁盘I/O的操作次数。Mysql选择使用B+树作为索引文件的数据结构,主要基于B+树的以下特点:
在InnoDB中,表数据文件本身就是按B+树组织的一个索引结构,它使用数据库主键作为Key,叶节点保存了完整的数据记录。InnoDB中有页(Page)的概念,页是其磁盘管理的最小单位。InnoDB中默认每个页的大小为16KB,可通过参数innodb_page_size将页的大小设置为4K、8K、16K。InnoDB中,B+Tree的高度一般都在24层。由于根节点常驻内存的,因此查找某一键值的行记录时最多只需要13次磁盘I/O操作。因为InnoDB的数据文件本身要按主键聚集,所以InnoDB要求表必须有主键(MyISAM可以没有),如果没有显式指定,则MySQL系统会自动选择一个可以唯一标识数据记录的列作为主键,如果不存在这种列,则MySQL自动为InnoDB表生成一个隐含字段作为主键,这个字段长度为6个字节,类型为长整形。聚集索引这种实现方式使得按主键的搜索十分高效,但是辅助索引搜索需要检索两遍索引:首先检索辅助索引获得主键,然后用主键到主索引中检索获得记录。
在B+树中插入数据时,需要注意以下几点:
数据插入说明:
下图是插入关键字:15后的结果:
上图中插入关键字:9后结点的关键字数量为:4,超过了B+树阶数M,因此需要进行结点分裂操作,分裂后的结果为:
在 B+树中做删除关键字的操作,采取如下的步骤:
数据删除说明:
删除关键字后,如果若当前结点中关键字个数小于>=[M/2],则直接完成删除操作。
上图中删除关键字:8,删除后的结果如下:
在删除关键字后,如果导致其结点中关键字个数<[M/2],若其兄弟结点中含有多余的关键字,可以从兄弟结点中借关键字。
上图中删除关键字:21,由于删除后结点只有一个关键字:25<[M/2],因此需要从兄弟结点中借用一个关键字:17,删除后的结果如下:
上图中删除关键字:9,由于删除后结点只有一个关键字:11<[M/2],并且兄弟结点也没有多余的关键字,因此需要与兄弟结点进行合并,删除后的结果如下:
接下来我们给出B+树的Go语言的实现,目前代码已经上传到github中,下载地址
首先给出B+树结点的定义,在此叶子结点与索引结点使用了相同的数据结构:
type BPItem struct {
Key int64
Val interface{}
}
type BPNode struct {
MaxKey int64
Nodes []*BPNode
Items []BPItem
Next *BPNode
}
其中:
type BPTree struct {
mutex sync.RWMutex
root *BPNode
width int
halfw int
其中:
func NewBPTree(width int) *BPTree { if width < 3 { width = 3 } var bt = &BPTree{} bt.root = NewLeafNode(width) bt.width = width bt.halfw = (bt.width + 1) / 2 return bt } //申请width+1是因为插入时可能暂时出现节点key大于申请width的情况,待后期再分裂处理 func NewLeafNode(width int) *BPNode { var node = &BPNode{} node.Items = make([]BPItem, width+1) node.Items = node.Items[0:0] return node }
func (t *BPTree) Get(key int64) interface{} { t.mutex.Lock() defer t.mutex.Unlock() node := t.root for i := 0; i < len(node.Nodes); i++ { if key <= node.Nodes[i].MaxKey { node = node.Nodes[i] i = 0 } } //没有到达叶子结点 if len(node.Nodes) > 0 { return nil } for i := 0; i < len(node.Items); i++ { if node.Items[i].Key == key { return node.Items[i].Val } } return nil }
func (t *BPTree) Set(key int64, value interface{}) { t.mutex.Lock() defer t.mutex.Unlock() t.setValue(nil, t.root, key, value) } func (t *BPTree) setValue(parent *BPNode, node *BPNode, key int64, value interface{}) { for i:=0; i < len(node.Nodes); i++ { if key <= node.Nodes[i].MaxKey || i== len(node.Nodes)-1 { t.setValue(node, node.Nodes[i], key, value) break } } //叶子结点,添加数据 if len(node.Nodes) < 1 { node.setValue(key, value) } //结点分裂 node_new := t.splitNode(node) if node_new != nil { //若父结点不存在,则创建一个父节点 if parent == nil { parent = NewIndexNode(t.width) parent.addChild(node) t.root = parent } //添加结点到父亲结点 parent.addChild(node_new) } }
代码中使用递归调用实现数据的插入。插入时首先定位到叶子结点,然后调用 BPNode.setValue()
来设置关键字:
func (node *BPNode) setValue(key int64, value interface{}) { item := BPItem{key, value} num := len(node.Items) if num < 1 { node.Items = append(node.Items, item) node.MaxKey = item.Key return } else if key < node.Items[0].Key { node.Items = append([]BPItem{item}, node.Items...) return } else if key > node.Items[num-1].Key { node.Items = append(node.Items, item) node.MaxKey = item.Key return } for i:=0; i < num; i++ { if node.Items[i].Key > key { node.Items = append(node.Items, BPItem{}) copy(node.Items[i+1:], node.Items[i:]) node.Items[i] = item return } else if node.Items[i].Key == key { node.Items[i] = item return } } }
添加关键字后若数量超过:BPTree.width
,则需要调用 BPNode.splitNode()
来进行结点分裂处理:
func (t *BPTree) splitNode(node *BPNode) *BPNode { if len(node.Nodes) > t.width { //创建新结点 halfw := t.width / 2 + 1 node2 := NewIndexNode(t.width) node2.Nodes = append(node2.Nodes, node.Nodes[halfw : len(node.Nodes)]...) node2.MaxKey = node2.Nodes[len(node2.Nodes)-1].MaxKey //修改原结点数据 node.Nodes = node.Nodes[0:halfw] node.MaxKey = node.Nodes[len(node.Nodes)-1].MaxKey return node2 } else if len(node.Items) > t.width { //创建新结点 halfw := t.width / 2 + 1 node2 := NewLeafNode(t.width) node2.Items = append(node2.Items, node.Items[halfw: len(node.Items)]...) node2.MaxKey = node2.Items[len(node2.Items)-1].Key //修改原结点数据 node.Next = node2 node.Items = node.Items[0:halfw] node.MaxKey = node.Items[len(node.Items)-1].Key return node2 } return nil }
func (t *BPTree) Remove(key int64) { t.mutex.Lock() defer t.mutex.Unlock() t.deleteItem(nil, t.root, key) } func (t *BPTree) deleteItem(parent *BPNode, node *BPNode, key int64) { for i:=0; i < len(node.Nodes); i++ { if key <= node.Nodes[i].MaxKey { t.deleteItem(node, node.Nodes[i], key) break } } if len(node.Nodes) < 1 { //删除记录后若结点的子项<m/2,则从兄弟结点移动记录,或者合并结点 node.deleteItem(key) if len(node.Items) < t.halfw { t.itemMoveOrMerge(parent, node) } } else { //若结点的子项<m/2,则从兄弟结点移动记录,或者合并结点 node.MaxKey = node.Nodes[len(node.Nodes)-1].MaxKey if len(node.Nodes) < t.halfw { t.childMoveOrMerge(parent, node) } } }
代码中使用递归调用实现删除操作。删除时首先定位到叶子结点,若找到则调用 BPNode.deleteItem()
来删除关键字:
func (node *BPNode) deleteItem(key int64) bool {
num := len(node.Items)
for i:=0; i < num; i++ {
if node.Items[i].Key > key {
return false
} else if node.Items[i].Key == key {
copy(node.Items[i:], node.Items[i+1:])
node.Items = node.Items[0:len(node.Items)-1]
node.MaxKey = node.Items[len(node.Items)-1].Key
return true
}
}
return false
}
删除关键字后,若关键字的数量小于BPTree.halfw
,则需要调用BPNode.itemMoveOrMerge()
函数。
BPNode.itemMoveOrMerge()
函数首先获取兄弟结点,并判断是否可以借用关键字,若可以进行关键字借用,否则与兄弟结点进行合并:
func (t *BPTree) itemMoveOrMerge(parent *BPNode, node *BPNode) { //获取兄弟结点 var node1 *BPNode = nil var node2 *BPNode = nil for i:=0; i < len(parent.Nodes); i++ { if parent.Nodes[i] == node { if i < len(parent.Nodes)-1 { node2 = parent.Nodes[i+1] } else if i > 0 { node1 = parent.Nodes[i-1] } break } } //将左侧结点的记录移动到删除结点 if node1 != nil && len(node1.Items) > t.halfw { item := node1.Items[len(node1.Items)-1] node1.Items = node1.Items[0:len(node1.Items)-1] node1.MaxKey = node1.Items[len(node1.Items)-1].Key node.Items = append([]BPItem{item}, node.Items...) return } //将右侧结点的记录移动到删除结点 if node2 != nil && len(node2.Items) > t.halfw { item := node2.Items[0] node2.Items = node1.Items[1:] node.Items = append(node.Items, item) node.MaxKey = node.Items[len(node.Items)-1].Key return } //与左侧结点进行合并 if node1 != nil && len(node1.Items) + len(node.Items) <= t.width { node1.Items = append(node1.Items, node.Items...) node1.Next = node.Next node1.MaxKey = node1.Items[len(node1.Items)-1].Key parent.deleteChild(node) return } //与右侧结点进行合并 if node2 != nil && len(node2.Items) + len(node.Items) <= t.width { node.Items = append(node.Items, node2.Items...) node.Next = node2.Next node.MaxKey = node.Items[len(node.Items)-1].Key parent.deleteChild(node2) return } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。