BoltDB 受LMDB启发,是基于append only B+ tree实现的KV存储。
page实现¶
boltdb没有实现自己的page buffer管理器,而是直接用mmap来从磁盘读取page。page的大小和OS的内存页大小相同。
page header结构如下,从ptr开始为page页的存储内容。
1
2
3
4
5
6
7
8
9
|
type pgid uint64
type page struct {
id pgid // page id
flags uint16 // page 类型
count uint16 // 元素的个数
overflow uint32 // 是否有overflow页
ptr uintptr // ptr开始为数据存储。
}
|
page.flags
取值如下
1
2
3
4
5
6
|
const (
branchPageFlag = 0x01 // 内部页
leafPageFlag = 0x02 // leaf 页
metaPageFlag = 0x04 // meta 页
freelistPageFlag = 0x10 // 空闲page列表页
)
|
通过boltdb学到了如何用go写底层代码,如将一个 byte[]
作为page访问是这样的
1
2
3
4
|
// pageInBuffer retrieves a page reference from a given byte array based on the current page size.
func (db *DB) pageInBuffer(b []byte, id pgid) *page {
return (*page)(unsafe.Pointer(&b[id*pgid(db.pageSize)]))
}
|
boltdb数据库的前两个页为meta页,存储元数据。
1
2
3
4
5
6
7
8
9
10
|
type meta struct {
magic uint32 // 0xED0CDAED, 用于文件校验
version uint32
pageSize uint32 // 页大小
flags uint32 // 貌似未使用
root bucket // root bucket
freelist pgid // 空闲page
pgid pgid // 数据库文件的最大page id,超过需要重新mmap或者说明有错误。
txid txid // 用来分配写事务的id。id更大的meta页是最新的。
checksum uint64 // 文件损坏校验
|
meta
存储在 page
结构的 page.ptr
位置,所以从page访问meta是这样的
1
2
3
4
|
// meta returns a pointer to the metadata section of the page.
func (p *page) meta() *meta {
return (*meta)(unsafe.Pointer(&p.ptr))
}
|
boltdb的两个meta页随着写入的进行会来回切换,最新的meta页包含当前的最新状态。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
// meta retrieves the current meta page reference.
func (db *DB) meta() *meta {
// We have to return the meta with the highest txid which doesn't fail
// validation. Otherwise, we can cause errors when in fact the database is
// in a consistent state. metaA is the one with the higher txid.
metaA := db.meta0
metaB := db.meta1
if db.meta1.txid > db.meta0.txid {
metaA = db.meta1
metaB = db.meta0
}
// Use higher meta page if valid. Otherwise fallback to previous, if valid.
if err := metaA.validate(); err == nil {
return metaA
} else if err := metaB.validate(); err == nil {
return metaB
}
// This should never be reached, because both meta1 and meta0 were validated
// on mmap() and we do fsync() on every write.
panic("bolt.DB.meta(): invalid meta pages")
}
|
branch页¶
branch页存储B+树的内部节点数据,由branchPageElement
数组和key数据构成。
1
2
3
4
5
6
|
// branchPageElement represents a node on a branch page.
type branchPageElement struct {
pos uint32 // key 存储位置相对branchPageElement存储位置的偏移量
ksize uint32 // key size
pgid pgid // page id
}
|
branchPageElement数组同样存储在page
结构的 page.ptr
位置
1
2
3
4
5
6
7
8
9
10
11
12
|
// branchPageElement retrieves the branch node by index
func (p *page) branchPageElement(index uint16) *branchPageElement {
return &((*[0x7FFFFFF]branchPageElement)(unsafe.Pointer(&p.ptr)))[index]
}
// branchPageElements retrieves a list of branch nodes.
func (p *page) branchPageElements() []branchPageElement {
if p.count == 0 {
return nil
}
return ((*[0x7FFFFFF]branchPageElement)(unsafe.Pointer(&p.ptr)))[:]
}
|
branchPageElement.pos
存储的是key存储位置相对branchPageElement存储位置的偏移量,所以得到key的实现如下
1
2
3
4
5
6
|
// key returns a byte slice of the node key.
func (n *branchPageElement) key() []byte {
buf := (*[maxAllocSize]byte)(unsafe.Pointer(n))
// n的地址加上n.pos得到key的起始地址
return (*[maxAllocSize]byte)(unsafe.Pointer(&buf[n.pos]))[:n.ksize]
}
|
leaf页¶
leaf页存储B+树的页节点数据,布局和branch页类似,只不过多了value。leaf页由leafPageElement
数组和key, value数据构成。
1
2
3
4
5
6
7
|
// leafPageElement represents a node on a leaf page.
type leafPageElement struct {
flags uint32 // 为1表示子bucket
pos uint32 // key, value存储位置的相对偏移量
ksize uint32 // key size
vsize uint32 // value size
}
|
从page根据下标访问
1
2
3
4
5
|
// leafPageElement retrieves the leaf node by index
func (p *page) leafPageElement(index uint16) *leafPageElement {
n := &((*[0x7FFFFFF]leafPageElement)(unsafe.Pointer(&p.ptr)))[index]
return n
}
|
访问key, value实现
1
2
3
4
5
6
7
8
9
10
11
12
|
// key returns a byte slice of the node key.
func (n *leafPageElement) key() []byte {
buf := (*[maxAllocSize]byte)(unsafe.Pointer(n))
// 最后的:n.ksize用来限制slice的compacity
return (*[maxAllocSize]byte)(unsafe.Pointer(&buf[n.pos]))[:n.ksize:n.ksize]
}
// value returns a byte slice of the node value.
func (n *leafPageElement) value() []byte {
buf := (*[maxAllocSize]byte)(unsafe.Pointer(n))
return (*[maxAllocSize]byte)(unsafe.Pointer(&buf[n.pos+n.ksize]))[:n.vsize:n.vsize]
}
|
freelist页¶
freelist页存储空闲的页表,用于重用。
1
2
3
4
5
6
7
8
9
|
// freelist represents a list of all pages that are available for allocation.
// It also tracks pages that have been freed but are still in use by open transactions.
type freelist struct {
ids []pgid // all free and available free page ids.
// 记录执行中的事务用到的页表,用于安全的重用页表。
pending map[txid][]pgid // mapping of soon-to-be free page ids by tx.
// 记录页是否空闲
cache map[pgid]bool // fast lookup of all free and pending page ids.
}
|
磁盘上只需要存储 freelist.ids
,而pending和cache只在内存中维护。下面的加载函数,从磁盘上加载空闲页的id数组,然后通过reindex来重建cache字段。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
// read initializes the freelist from a freelist page.
func (f *freelist) read(p *page) {
// If the page.count is at the max uint16 value (64k) then it's considered
// an overflow and the size of the freelist is stored as the first element.
idx, count := 0, int(p.count)
if count == 0xFFFF {
idx = 1
count = int(((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[0])
}
// Copy the list of page ids from the freelist.
if count == 0 {
f.ids = nil
} else {
ids := ((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[idx:count]
f.ids = make([]pgid, len(ids))
copy(f.ids, ids)
// Make sure they're sorted.
sort.Sort(pgids(f.ids))
}
// Rebuild the page cache.
f.reindex()
}
|
在allocate函数中,会检查是否存在n个连续的页,如果存在的返回起始页的page id,否则返回0.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
// allocate returns the starting page id of a contiguous list of pages of a given size.
// If a contiguous block cannot be found then 0 is returned.
func (f *freelist) allocate(n int) pgid {
if len(f.ids) == 0 {
return 0
}
var initial, previd pgid
for i, id := range f.ids {
if id <= 1 {
panic(fmt.Sprintf("invalid page allocation: %d", id))
}
// Reset initial page if this is not contiguous.
if previd == 0 || id-previd != 1 {
initial = id
}
// If we found a contiguous block then remove it and return it.
if (id-initial)+1 == pgid(n) {
// If we're allocating off the beginning then take the fast path
// and just adjust the existing slice. This will use extra memory
// temporarily but the append() in free() will realloc the slice
// as is necessary.
if (i + 1) == n {
f.ids = f.ids[i+1:]
} else {
copy(f.ids[i-n+1:], f.ids[i+1:])
f.ids = f.ids[:len(f.ids)-n]
}
// Remove from the free cache.
for i := pgid(0); i < pgid(n); i++ {
delete(f.cache, initial+i)
}
return initial
}
previd = id
}
return 0
}
|
node结构¶
page结构体和磁盘上存储的结构直接对应,使用时并不方便,比如上面介绍的 page.meta()
函数就是为了方便访问元数据字段的。boltdb中最多的还是branch页和leaf页,因此专门定义了node来表示加载到内存中的page,便于访问branch页和leaf页的数据。写事务执行的插入和删除也是先更新到内存中的node中,在事务提交时才刷新到磁盘。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
// inode represents an internal node inside of a node.
// It can be used to point to elements in a page or point
// to an element which hasn't been added to a page yet.
// inode 统一表示中间节点和叶节点
type inode struct
flags uint32 // 对应leafPageElement.flags
pgid pgid
key []byte
value []byte // 中间节点的value为nil
}
type inodes []inode
// node represents an in-memory, deserialized page.
type node struct {
bucket *Bucket
isLeaf bool
unbalanced bool // 有删除时会设置为true
spilled bool // 是否已经分裂和分配脏页
key []byte // 第0个key
pgid pgid
parent *node
children nodes
// 叶子节点的key/value数组,中间节点的key数组
inodes inodes
}
type nodes []*node
|
node和page的相互转换逻辑如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
|
// read initializes the node from a page.
func (n *node) read(p *page) {
n.pgid = p.id
n.isLeaf = ((p.flags & leafPageFlag) != 0)
n.inodes = make(inodes, int(p.count))
for i := 0; i < int(p.count); i++ {
inode := &n.inodes[i]
if n.isLeaf {
elem := p.leafPageElement(uint16(i))
inode.flags = elem.flags
// 从page读到key/value,写入inode结构体
inode.key = elem.key()
inode.value = elem.value()
} else {
elem := p.branchPageElement(uint16(i))
inode.pgid = elem.pgid
inode.key = elem.key()
}
_assert(len(inode.key) > 0, "read: zero-length inode key")
}
// Save first key so we can find the node in the parent when we spill.
if len(n.inodes) > 0 {
n.key = n.inodes[0].key
_assert(len(n.key) > 0, "read: zero-length node key")
} else {
n.key = nil
}
}
// write writes the items onto one or more pages.
func (n *node) write(p *page) {
// Initialize page.
if n.isLeaf {
p.flags |= leafPageFlag
} else {
p.flags |= branchPageFlag
}
if len(n.inodes) >= 0xFFFF {
panic(fmt.Sprintf("inode overflow: %d (pgid=%d)", len(n.inodes), p.id))
}
p.count = uint16(len(n.inodes))
// Stop here if there are no items to write.
if p.count == 0 {
return
}
// Loop over each item and write it to the page.
// b指向key/value或key的起始位置。
b := (*[maxAllocSize]byte)(unsafe.Pointer(&p.ptr))[n.pageElementSize()*len(n.inodes):]
for i, item := range n.inodes {
_assert(len(item.key) > 0, "write: zero-length inode key")
// Write the page element.
if n.isLeaf {
elem := p.leafPageElement(uint16(i))
// pos为b与elem的地址差
elem.pos = uint32(uintptr(unsafe.Pointer(&b[0])) - uintptr(unsafe.Pointer(elem)))
elem.flags = item.flags
elem.ksize = uint32(len(item.key))
elem.vsize = uint32(len(item.value))
} else {
elem := p.branchPageElement(uint16(i))
elem.pos = uint32(uintptr(unsafe.Pointer(&b[0])) - uintptr(unsafe.Pointer(elem)))
elem.ksize = uint32(len(item.key))
elem.pgid = item.pgid
_assert(elem.pgid != p.id, "write: circular dependency occurred")
}
// If the length of key+value is larger than the max allocation size
// then we need to reallocate the byte array pointer.
//
// See: https://github.com/boltdb/bolt/pull/335
klen, vlen := len(item.key), len(item.value)
if len(b) < klen+vlen {
b = (*[maxAllocSize]byte)(unsafe.Pointer(&b[0]))[:]
}
// Write data for the element to the end of the page.
// 复制key和value,然后更新b到下一个key/value或key的位置
copy(b[0:], item.key)
b = b[klen:]
copy(b[0:], item.value)
b = b[vlen:]
}
// DEBUG ONLY: n.dump()
}
|
在插入删除KV时会先修改在内存中node结构,此时B+树可能是不平衡的,在事务提交时会进行rebalance和写盘。在删除key时,将二分找到的下标位置的元素删除,只会修改内存,在事务提交时才会实际刷新到磁盘。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
// del removes a key from the node.
func (n *node) del(key []byte) {
// Find index of key.
index := sort.Search(len(n.inodes), func(i int) bool { return bytes.Compare(n.inodes[i].key, key) != -1 })
// Exit if the key isn't found.
if index >= len(n.inodes) || !bytes.Equal(n.inodes[index].key, key) {
return
}
// Delete inode from the node.
// 从inodes中删除
n.inodes = append(n.inodes[:index], n.inodes[index+1:]...)
// Mark the node as needing rebalancing.
n.unbalanced = true
}
|
插入key的实现类似,同样只会修改内存。
Bucket实现¶
boltdb中的Bucket相当于mysql的表,可以存储KV对。Bucket也支持嵌套。boltdb中有一个唯一的根Bucket,其他的Bucket都是它的子孙Bucket。在 meta.root
中存储根Bucket的root页id。在事务初始化时会用来设置事务的root.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
// init initializes the transaction.
func (tx *Tx) init(db *DB) {
tx.db = db
tx.pages = nil
// Copy the meta page since it can be changed by the writer.
tx.meta = &meta{}
db.meta().copy(tx.meta)
// Copy over the root bucket.
tx.root = newBucket(tx)
tx.root.bucket = &bucket{}
// 设置root
*tx.root.bucket = tx.meta.root
// Increment the transaction id and add a page cache for writable transactions.
if tx.writable {
tx.pages = make(map[pgid]*page)
tx.meta.txid += txid(1)
}
}
|
每个子Bucket都存储在父Bucket的leaf页中,此时leafPageElement.flags
会设置为bucketLeafFlag
。
1
2
3
|
const (
bucketLeafFlag = 0x01
)
|
磁盘上的bucket结构如下,主要字段为root页的page id。Bucket没有嵌套的Bucket且空间比较小时,不会分配一个完整的page来存储,而是直接存储到父Bucket的leaf页中,此时 bucket.root
为0.
1
2
3
4
5
6
7
8
|
// bucket represents the on-file representation of a bucket.
// This is stored as the "value" of a bucket key. If the bucket is small enough,
// then its root page can be stored inline in the "value", after the bucket
// header. In the case of inline buckets, the "root" will be 0.
type bucket struct {
root pgid // page id of the bucket's root-level page
sequence uint64 // monotonically incrementing, used by NextSequence()
}
|
内存中的Bucket结构如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
// Bucket represents a collection of key/value pairs inside the database.
type Bucket struct {
*bucket
tx *Tx // the associated transaction
buckets map[string]*Bucket // subbucket cache
page *page // inline page reference
rootNode *node // materialized node for the root page.
nodes map[pgid]*node // node cache
// Sets the threshold for filling nodes when they split. By default,
// the bucket will fill to 50% but it can be useful to increase this
// amount if you know that your write workloads are mostly append-only.
//
// This is non-persisted across transactions so it must be set in every Tx.
FillPercent float64
}
|
创建Bucket
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
// CreateBucket creates a new bucket at the given key and returns the new bucket.
// Returns an error if the key already exists, if the bucket name is blank, or if the bucket name is too long.
// The bucket instance is only valid for the lifetime of the transaction.
func (b *Bucket) CreateBucket(key []byte) (*Bucket, error) {
if b.tx.db == nil {
return nil, ErrTxClosed
} else if !b.tx.writable {
return nil, ErrTxNotWritable
} else if len(key) == 0 {
return nil, ErrBucketNameRequired
}
// Move cursor to correct position.
c := b.Cursor()
k, _, flags := c.seek(key)
// Return an error if there is an existing key.
if bytes.Equal(key, k) {
// key已存在时报错,根据是否是bucket返回不同的错误
if (flags & bucketLeafFlag) != 0 {
return nil, ErrBucketExists
}
return nil, ErrIncompatibleValue
}
// Create empty, inline bucket.
var bucket = Bucket{
bucket: &bucket{},
rootNode: &node{isLeaf: true},
FillPercent: DefaultFillPercent,
}
var value = bucket.write()
// Insert into node.
key = cloneBytes(key)
// 写入到node中
c.node().put(key, key, value, 0, bucketLeafFlag)
// Since subbuckets are not allowed on inline buckets, we need to
// dereference the inline page, if it exists. This will cause the bucket
// to be treated as a regular, non-inline bucket for the rest of the tx.
b.page = nil
// 打开Bucket以返回Bucket对象。
return b.Bucket(key), nil
}
|
创建Bucket的key和普通的KV在同一namespace中,不允许和普通的key重复。在实现中,将创建的Bucket转为byte[]然后写入到所在node中,然后再调用 b.Bucket(key)
返回Bucket对象。
打开Bucket的逻辑如下
- 检查是否存在key,以及是否为Bucket
- 调用openBucket,反序列化为Bucket结构体。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
// Bucket retrieves a nested bucket by name.
// Returns nil if the bucket does not exist.
// The bucket instance is only valid for the lifetime of the transaction.
func (b *Bucket) Bucket(name []byte) *Bucket {
if b.buckets != nil {
// 如果在缓存中,直接返回
if child := b.buckets[string(name)]; child != nil {
return child
}
}
// Move cursor to key.
c := b.Cursor()
k, v, flags := c.seek(name)
// Return nil if the key doesn't exist or it is not a bucket.
if !bytes.Equal(name, k) || (flags&bucketLeafFlag) == 0 {
return nil
}
// Otherwise create a bucket and cache it.
var child = b.openBucket(v)
if b.buckets != nil {
// 设置到缓存
b.buckets[string(name)] = child
}
return child
}
// Helper method that re-interprets a sub-bucket value
// from a parent into a Bucket
func (b *Bucket) openBucket(value []byte) *Bucket {
var child = newBucket(b.tx)
// If unaligned load/stores are broken on this arch and value is
// unaligned simply clone to an aligned byte array.
unaligned := brokenUnaligned && uintptr(unsafe.Pointer(&value[0]))&3 != 0
if unaligned {
value = cloneBytes(value)
}
// If this is a writable transaction then we need to copy the bucket entry.
// Read-only transactions can point directly at the mmap entry.
if b.tx.writable && !unaligned {
// 写事务或者未对齐时,会分配新的内存并拷贝,否则直接指向mmap的内存。
child.bucket = &bucket{}
*child.bucket = *(*bucket)(unsafe.Pointer(&value[0]))
} else {
child.bucket = (*bucket)(unsafe.Pointer(&value[0]))
}
// Save a reference to the inline page if the bucket is inline.
// 处理inline page。刚创建的Bucket必然为inline page。
if child.root == 0 {
child.page = (*page)(unsafe.Pointer(&value[bucketHeaderSize]))
}
return &child
}
|
B+树操作及维护¶
Cursor实现¶
在boltdb访问数据时,使用Cursor来查找数据或者实现遍历功能。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
// elemRef represents a reference to an element on a given page/node.
type elemRef struct {
page *page // 页面
node *node // 打开的页面
index int // 当前访问到的成员下标
}
// Cursor represents an iterator that can traverse over all key/value pairs in a bucket in sorted order.
// Cursors see nested buckets with value == nil.
type Cursor struct {
bucket *Bucket // 指向的Bucket
stack []elemRef // 访问栈
}
|
Put 实现如下,基本逻辑为利用B+树的性质不断二分查找,直到到达leaf页,然后插入。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
// Put sets the value for a key in the bucket.
// If the key exist then its previous value will be overwritten.
// Supplied value must remain valid for the life of the transaction.
// Returns an error if the bucket was created from a read-only transaction, if the key is blank, if the key is too large, or if the value is too large.
func (b *Bucket) Put(key []byte, value []byte) error {
if b.tx.db == nil {
return ErrTxClosed
} else if !b.Writable() {
return ErrTxNotWritable
} else if len(key) == 0 {
return ErrKeyRequired
} else if len(key) > MaxKeySize {
return ErrKeyTooLarge
} else if int64(len(value)) > MaxValueSize {
return ErrValueTooLarge
}
// Move cursor to correct position.
c := b.Cursor()
// 定位到指定的key
k, _, flags := c.seek(key)
// Return an error if there is an existing key with a bucket value.
// 如果key位置存在Bucket,则报错
if bytes.Equal(key, k) && (flags&bucketLeafFlag) != 0 {
return ErrIncompatibleValue
}
// Insert into node.
// 之所以拷贝key不拷贝value,是因为value要求在事务结束前保持有效,而key没有这样的要求。
key = cloneBytes(key)
// 更新或者插入
c.node().put(key, key, value, 0, 0)
return nil
}
|
将Cursor定位到指定的key的实现如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
// seek moves the cursor to a given key and returns it.
// If the key does not exist then the next key is used.
func (c *Cursor) seek(seek []byte) (key []byte, value []byte, flags uint32) {
_assert(c.bucket.tx.db != nil, "tx closed")
// Start from root page/node and traverse to correct page.
// 清空栈
c.stack = c.stack[:0]
c.search(seek, c.bucket.root)
ref := &c.stack[len(c.stack)-1]
// If the cursor is pointing to the end of page/node then return nil.
if ref.index >= ref.count() {
return nil, nil, 0
}
// If this is a bucket then return a nil value.
return c.keyValue()
}
// search recursively performs a binary search against a given page/node until it finds a given key.
func (c *Cursor) search(key []byte, pgid pgid) {
// 打开page,可能返回page或者node结构,
// 因此后面分别有从page或者从node检索的逻辑。
// 这里有个疑问是为什么没有统一为node,看node.read(p *page) 开销也不大。
// 我理解是Bucket.node()实现会将其记录到Bucket.nodes中,用于B+树的平衡操作。
// 对于读的场景,不需要做这些操作,因此没有统一逻辑。
p, n := c.bucket.pageNode(pgid)
if p != nil && (p.flags&(branchPageFlag|leafPageFlag)) == 0 {
panic(fmt.Sprintf("invalid page type: %d: %x", p.id, p.flags))
}
e := elemRef{page: p, node: n}
// 加入到栈
c.stack = append(c.stack, e)
// If we're on a leaf page/node then find the specific node.
if e.isLeaf() {
c.nsearch(key)
return
}
if n != nil {
c.searchNode(key, n)
return
}
c.searchPage(key, p)
}
// nsearch searches the leaf node on the top of the stack for a key.
func (c *Cursor) nsearch(key []byte) {
e := &c.stack[len(c.stack)-1]
p, n := e.page, e.node
// If we have a node then search its inodes.
if n != nil {
// 二分查找
index := sort.Search(len(n.inodes), func(i int) bool {
return bytes.Compare(n.inodes[i].key, key) != -1
})
// 设置index
e.index = index
return
}
// If we have a page then search its leaf elements.
inodes := p.leafPageElements()
// 使用page来二分查找
index := sort.Search(int(p.count), func(i int) bool {
return bytes.Compare(inodes[i].key(), key) != -1
})
// 设置index
e.index = index
}
|
Next实现如下,将Cursor移动到下一个key。boltdb的B+树中没有指针来指向相邻的叶子page,因此是通过栈回溯来找到相邻的叶子page的,主要还是为了避免无谓的page修改。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
// Next moves the cursor to the next item in the bucket and returns its key and value.
// If the cursor is at the end of the bucket then a nil key and value are returned.
// The returned key and value are only valid for the life of the transaction.
func (c *Cursor) Next() (key []byte, value []byte) {
_assert(c.bucket.tx.db != nil, "tx closed")
k, v, flags := c.next()
if (flags & uint32(bucketLeafFlag)) != 0 {
return k, nil
}
return k, v
}
// next moves to the next leaf element and returns the key and value.
// If the cursor is at the last leaf element then it stays there and returns nil.
func (c *Cursor) next() (key []byte, value []byte, flags uint32) {
for {
// Attempt to move over one element until we're successful.
// Move up the stack as we hit the end of each page in our stack.
var i int
for i = len(c.stack) - 1; i >= 0; i-- {
elem := &c.stack[i]
if elem.index < elem.count()-1 {
// 如果任何一层还没完全访问,则将其index+1
elem.index++
break
}
}
// If we've hit the root page then stop and return. This will leave the
// cursor on the last element of the last page.
if i == -1 {
// 所有层都访问过了,即cursor在最后一个节点上
return nil, nil, 0
}
// Otherwise start from where we left off in the stack and find the
// first element of the first leaf page.
c.stack = c.stack[:i+1]
// 移动到下面的第一个leaf节点
c.first()
// If this is an empty page then restart and move back up the stack.
// https://github.com/boltdb/bolt/issues/450
if c.stack[len(c.stack)-1].count() == 0 {
continue
}
return c.keyValue()
}
}
// first moves the cursor to the first leaf element under the last page in the stack.
func (c *Cursor) first() {
for {
// Exit when we hit a leaf page.
var ref = &c.stack[len(c.stack)-1]
// leaf则break
if ref.isLeaf() {
break
}
// Keep adding pages pointing to the first element to the stack.
var pgid pgid
if ref.node != nil {
pgid = ref.node.inodes[ref.index].pgid
} else {
pgid = ref.page.branchPageElement(uint16(ref.index)).pgid
}
p, n := c.bucket.pageNode(pgid)
// 访问下一层节点,设置下一层节点的index为0,从第一个开始访问。
c.stack = append(c.stack, elemRef{page: p, node: n, index: 0})
}
}
|
Cursor的其他函数不再赘述,基本都是B+树的标准操作。
B+树 rebalance¶
在插入或删除key时,都是通过cursor.node()操作的.
1
2
3
4
5
|
// 删除
c.node().del(key)
// 插入
c.node().put(key, key, value, 0, 0)
|
根据B+树的性质,访问路径上的page都有可能被修改,因此cursor.node()会将这些page都解析为node,并缓存到Bucket.nodes中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
func (c *Cursor) node() *node {
_assert(len(c.stack) > 0, "accessing a node with a zero-length cursor stack")
// If the top of the stack is a leaf node then just return it.
if ref := &c.stack[len(c.stack)-1]; ref.node != nil && ref.isLeaf() {
return ref.node
}
// Start from root and traverse down the hierarchy.
var n = c.stack[0].node
if n == nil {
n = c.bucket.node(c.stack[0].page.id, nil)
}
for _, ref := range c.stack[:len(c.stack)-1] {
_assert(!n.isLeaf, "expected branch node")
n = n.childAt(int(ref.index))
}
_assert(n.isLeaf, "expected leaf node")
return n
}
// node creates a node from a page and associates it with a given parent.
func (b *Bucket) node(pgid pgid, parent *node) *node {
_assert(b.nodes != nil, "nodes map expected")
// Retrieve node if it's already been created.
if n := b.nodes[pgid]; n != nil {
return n
}
// Otherwise create a node and cache it.
n := &node{bucket: b, parent: parent}
if parent == nil {
b.rootNode = n
} else {
parent.children = append(parent.children, n)
}
// Use the inline page if this is an inline bucket.
var p = b.page
if p == nil {
p = b.tx.page(pgid)
}
// Read the page into the node and cache it.
n.read(p)
// 纪录到缓存中
b.nodes[pgid] = n
// Update statistics.
b.tx.stats.NodeCount++
return n
}
|
由上面的分析可见,在rebalance时只需要检查Bucket.nodes缓存中的node是否需要分裂或合并即可。
1
2
3
4
5
6
7
8
9
10
11
|
// rebalance attempts to balance all nodes.
func (b *Bucket) rebalance() {
for _, n := range b.nodes {
// 依次调用node.rebalance()
n.rebalance()
}
for _, child := range b.buckets {
// 处理每个子Bucket
child.rebalance()
}
}
|
node rebalance实现
rebalance中会将过小的page和兄弟page合并,并不是通常意义上说的B+树的再平衡。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
|
// rebalance attempts to combine the node with sibling nodes if the node fill
// size is below a threshold or if there are not enough keys.
func (n *node) rebalance() {
// 在delete key时会设置unbalanced为true,而insert并不会设置。
// 因此经过node.rebalance()后,node有可能会超过page大小,是在另外的地方处理的。
if !n.unbalanced {
return
}
n.unbalanced = false
// Update statistics.
n.bucket.tx.stats.Rebalance++
// Ignore if node is above threshold (25%) and has enough keys.
var threshold = n.bucket.tx.db.pageSize / 4
// node大小超过25%且有足够的key(leaf至少为1个,否则至少为2个)
if n.size() > threshold && len(n.inodes) > n.minKeys() {
return
}
// Root node has special handling.
if n.parent == nil {
// If root node is a branch and only has one node then collapse it.
if !n.isLeaf && len(n.inodes) == 1 {
// Move root's child up.
// 将child移动到root节点
child := n.bucket.node(n.inodes[0].pgid, n)
n.isLeaf = child.isLeaf
n.inodes = child.inodes[:]
n.children = child.children
// Reparent all child nodes being moved.
for _, inode := range n.inodes {
if child, ok := n.bucket.nodes[inode.pgid]; ok {
child.parent = n
}
}
// Remove old child.
child.parent = nil
delete(n.bucket.nodes, child.pgid)
child.free()
}
return
}
// If node has no keys then just remove it.
if n.numChildren() == 0 {
n.parent.del(n.key)
n.parent.removeChild(n)
delete(n.bucket.nodes, n.pgid)
n.free()
n.parent.rebalance()
return
}
_assert(n.parent.numChildren() > 1, "parent must have at least 2 children")
// Destination node is right sibling if idx == 0, otherwise left sibling.
var target *node
var useNextSibling = (n.parent.childIndex(n) == 0)
if useNextSibling {
target = n.nextSibling()
} else {
target = n.prevSibling()
}
// If both this node and the target node are too small then merge them.
// 直接合并,后面再做split
if useNextSibling {
// Reparent all child nodes being moved.
for _, inode := range target.inodes {
if child, ok := n.bucket.nodes[inode.pgid]; ok {
child.parent.removeChild(child)
child.parent = n
child.parent.children = append(child.parent.children, child)
}
}
// Copy over inodes from target and remove target.
n.inodes = append(n.inodes, target.inodes...)
n.parent.del(target.key)
n.parent.removeChild(target)
delete(n.bucket.nodes, target.pgid)
target.free()
} else {
// Reparent all child nodes being moved.
for _, inode := range n.inodes {
if child, ok := n.bucket.nodes[inode.pgid]; ok {
child.parent.removeChild(child)
child.parent = target
child.parent.children = append(child.parent.children, child)
}
}
// Copy over inodes to target and remove node.
target.inodes = append(target.inodes, n.inodes...)
n.parent.del(n.key)
n.parent.removeChild(n)
delete(n.bucket.nodes, n.pgid)
n.free()
}
// Either this node or the target node was deleted from the parent so rebalance it.
// 经过删除后,parent也需要rebalance
n.parent.rebalance()
}
|
上面的node.rebalance完成后,有可能有node的大小超过page size,一方面rebalance并不会处理插入元素,另一方面,rebalance时可能会合并兄弟节点。这个问题是在准备分配脏页时处理的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
// spill writes the nodes to dirty pages and splits nodes as it goes.
// Returns an error if dirty pages cannot be allocated.
func (n *node) spill() error {
var tx = n.bucket.tx
// 已经分配过,直接返回
if n.spilled {
return nil
}
// Spill child nodes first. Child nodes can materialize sibling nodes in
// the case of split-merge so we cannot use a range loop. We have to check
// the children size on every loop iteration.
sort.Sort(n.children)
for i := 0; i < len(n.children); i++ {
if err := n.children[i].spill(); err != nil {
return err
}
}
// We no longer need the child list because it's only used for spill tracking.
n.children = nil
// Split nodes into appropriate sizes. The first node will always be n.
// 根据page size有可能分裂成多个页
var nodes = n.split(tx.db.pageSize)
for _, node := range nodes {
// Add node's page to the freelist if it's not new.
if node.pgid > 0 {
tx.db.freelist.free(tx.meta.txid, tx.page(node.pgid))
node.pgid = 0
}
// Allocate contiguous space for the node.
// 为每个page分配新页
p, err := tx.allocate((node.size() / tx.db.pageSize) + 1)
if err != nil {
return err
}
// Write the node.
if p.id >= tx.meta.pgid {
panic(fmt.Sprintf("pgid (%d) above high water mark (%d)", p.id, tx.meta.pgid))
}
node.pgid = p.id
node.write(p)
node.spilled = true
// Insert into parent inodes.
if node.parent != nil {
var key = node.key
if key == nil {
key = node.inodes[0].key
}
// 更新到parent中
node.parent.put(key, node.inodes[0].key, nil, node.pgid, 0)
node.key = node.inodes[0].key
_assert(len(node.key) > 0, "spill: zero-length node key")
}
// Update the statistics.
tx.stats.Spill++
}
// If the root node split and created a new root then we need to spill that
// as well. We'll clear out the children to make sure it doesn't try to respill.
if n.parent != nil && n.parent.pgid == 0 {
n.children = nil
return n.parent.spill()
}
return nil
}
|
分配脏页时,会将超过page size的页拆分为多个页,这是基于磁盘page的B+树与内存B+树的不同点之一,而内存中的B+树一般是有固定的key数的,超过key数时才需要拆分。
事务处理¶
boltdb任一时刻只允许一个写事务,但允许多个读事务同时运行。由于采用了append only B+ tree实现,写事务不会更新已有的数据页,因此写事务运行时多个读事务也可以正常进行。
读事务¶
boltdb提供了View函数来方便执行读事务,不管成功还是失败最后都会调用Rollback来关闭事务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
// View executes a function within the context of a managed read-only transaction.
// Any error that is returned from the function is returned from the View() method.
//
// Attempting to manually rollback within the function will cause a panic.
func (db *DB) View(fn func(*Tx) error) error {
t, err := db.Begin(false)
if err != nil {
return err
}
// Make sure the transaction rolls back in the event of a panic.
defer func() {
if t.db != nil {
t.rollback()
}
}()
// Mark as a managed tx so that the inner function cannot manually rollback.
t.managed = true
// If an error is returned from the function then pass it through.
err = fn(t)
t.managed = false
if err != nil {
_ = t.Rollback()
return err
}
// 用来释放page页
if err := t.Rollback(); err != nil {
return err
}
return nil
}
func (db *DB) beginTx() (*Tx, error) {
// Lock the meta pages while we initialize the transaction. We obtain
// the meta lock before the mmap lock because that's the order that the
// write transaction will obtain them.
db.metalock.Lock()
// Obtain a read-only lock on the mmap. When the mmap is remapped it will
// obtain a write lock so all transactions must finish before it can be
// remapped.
// 读事务运行时会一直持有mmaplock的读锁,一般不会block写事务。
// 长时间执行的读事务为了防止block写事务,需要设置InitialMmapSize为比较大的值,以便让写事务不需要重新mmap。
db.mmaplock.RLock()
// Exit if the database is not open yet.
if !db.opened {
db.mmaplock.RUnlock()
db.metalock.Unlock()
return nil, ErrDatabaseNotOpen
}
// Create a transaction associated with the database.
t := &Tx{}
t.init(db)
// Keep track of transaction until it closes.
// 记录运行中的读事务,用于安全的释放page
db.txs = append(db.txs, t)
n := len(db.txs)
// Unlock the meta pages.
db.metalock.Unlock()
// Update the transaction stats.
db.statlock.Lock()
db.stats.TxN++
db.stats.OpenTxN = n
db.statlock.Unlock()
return t, nil
}
|
读事务执行完成后,需要关闭事务,以便释放资源。读事务是通过调用Rollback来关闭。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
func (tx *Tx) close() {
if tx.db == nil {
return
}
if tx.writable {
// ...
} else {
tx.db.removeTx(tx)
}
// Clear all references.
tx.db = nil
tx.meta = nil
tx.root = Bucket{tx: tx}
tx.pages = nil
}
// removeTx removes a transaction from the database.
func (db *DB) removeTx(tx *Tx) {
// Release the read lock on the mmap.
// 释放mmap读锁
db.mmaplock.RUnlock()
// Use the meta lock to restrict access to the DB object.
db.metalock.Lock()
// Remove the transaction.
// 移除
for i, t := range db.txs {
if t == tx {
last := len(db.txs) - 1
db.txs[i] = db.txs[last]
db.txs[last] = nil
db.txs = db.txs[:last]
break
}
}
n := len(db.txs)
// Unlock the meta pages.
db.metalock.Unlock()
// Merge statistics.
db.statlock.Lock()
db.stats.OpenTxN = n
db.stats.TxStats.add(&tx.stats)
db.statlock.Unlock()
}
|
写事务¶
boltdb提供了Update函数来方便执行写事务,在成功时自动Commit,在失败时自动回滚。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
// Update executes a function within the context of a read-write managed transaction.
// If no error is returned from the function then the transaction is committed.
// If an error is returned then the entire transaction is rolled back.
// Any error that is returned from the function or returned from the commit is
// returned from the Update() method.
//
// Attempting to manually commit or rollback within the function will cause a panic.
func (db *DB) Update(fn func(*Tx) error) error {
t, err := db.Begin(true)
if err != nil {
return err
}
// Make sure the transaction rolls back in the event of a panic.
defer func() {
if t.db != nil {
t.rollback()
}
}()
// Mark as a managed tx so that the inner function cannot manually commit.
t.managed = true
// If an error is returned from the function then rollback and return error.
err = fn(t)
t.managed = false
if err != nil {
_ = t.Rollback()
return err
}
return t.Commit()
}
// 开始写事务
func (db *DB) beginRWTx() (*Tx, error) {
// If the database was opened with Options.ReadOnly, return an error.
if db.readOnly {
return nil, ErrDatabaseReadOnly
}
// Obtain writer lock. This is released by the transaction when it closes.
// This enforces only one writer transaction at a time.
// 限制只能有一个写事务在同时运行
db.rwlock.Lock()
// Once we have the writer lock then we can lock the meta pages so that
// we can set up the transaction.
db.metalock.Lock()
defer db.metalock.Unlock()
// Exit if the database is not open yet.
if !db.opened {
db.rwlock.Unlock()
return nil, ErrDatabaseNotOpen
}
// Create a transaction associated with the database.
t := &Tx{writable: true}
// 写事务在init中初始化pages map,用来记录需要写的page
// 在tx.allocate中会执行 tx.pages[p.id] = p
t.init(db)
db.rwtx = t
// Free any pages associated with closed read-only transactions.
var minid txid = 0xFFFFFFFFFFFFFFFF
for _, t := range db.txs {
if t.meta.txid < minid {
minid = t.meta.txid
}
}
if minid > 0 {
// 释放已经执行完的事务用到的page
db.freelist.release(minid - 1)
}
return t, nil
}
|
回滚事务的逻辑相对简单
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
func (tx *Tx) rollback() {
if tx.db == nil {
return
}
if tx.writable {
// 回滚写事务即将释放的页,这些页本来要释放成空闲页,改为不释放了
tx.db.freelist.rollback(tx.meta.txid)
// 重新加载空闲页列表
tx.db.freelist.reload(tx.db.page(tx.db.meta().freelist))
}
tx.close()
}
func (tx *Tx) close() {
if tx.db == nil {
return
}
if tx.writable {
// Remove transaction ref & writer lock.
tx.db.rwtx = nil
// 释放锁,以便让其他写事务执行
tx.db.rwlock.Unlock()
// Merge statistics.
tx.db.statlock.Lock()
// 更新一些统计信息
tx.db.statlock.Unlock()
} else {
tx.db.removeTx(tx)
}
// Clear all references.
tx.db = nil
tx.meta = nil
tx.root = Bucket{tx: tx}
tx.pages = nil
}
|
提交事务的实现如下
- 调用rebalance来合并过小的页
- 调用splil来分配脏页,并split过大的页
- 分配新的freelist页,写盘
- 脏页写盘
- 写入新的meta页
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
// Commit writes all changes to disk and updates the meta page.
// Returns an error if a disk write error occurs, or if Commit is
// called on a read-only transaction.
func (tx *Tx) Commit() error {
_assert(!tx.managed, "managed tx commit not allowed")
if tx.db == nil {
return ErrTxClosed
} else if !tx.writable {
return ErrTxNotWritable
}
// TODO(benbjohnson): Use vectorized I/O to write out dirty pages.
// Rebalance nodes which have had deletions.
var startTime = time.Now()
// 调用rebalance来合并过小的page
tx.root.rebalance()
if tx.stats.Rebalance > 0 {
tx.stats.RebalanceTime += time.Since(startTime)
}
// spill data onto dirty pages.
startTime = time.Now()
// 调用Bucket.spill来分配脏页,并split过大的页
if err := tx.root.spill(); err != nil {
tx.rollback()
return err
}
tx.stats.SpillTime += time.Since(startTime)
// Free the old root bucket.
tx.meta.root.root = tx.root.root
opgid := tx.meta.pgid
// Free the freelist and allocate new pages for it. This will overestimate
// the size of the freelist but not underestimate the size (which would be bad).
tx.db.freelist.free(tx.meta.txid, tx.db.page(tx.meta.freelist))
// 分配新的freelist页
p, err := tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1)
if err != nil {
tx.rollback()
return err
}
// 写入freelist页
if err := tx.db.freelist.write(p); err != nil {
tx.rollback()
return err
}
tx.meta.freelist = p.id
// If the high water mark has moved up then attempt to grow the database.
// 如果文件变大了,则调用truncate
if tx.meta.pgid > opgid {
if err := tx.db.grow(int(tx.meta.pgid+1) * tx.db.pageSize); err != nil {
tx.rollback()
return err
}
}
// Write dirty pages to disk.
startTime = time.Now()
// 将涉及的脏页调用write写盘,并将没有overflow的内存page加入page pool中以便后续重用
if err := tx.write(); err != nil {
tx.rollback()
return err
}
// If strict mode is enabled then perform a consistency check.
// Only the first consistency error is reported in the panic.
if tx.db.StrictMode {
ch := tx.Check()
var errs []string
for {
err, ok := <-ch
if !ok {
break
}
errs = append(errs, err.Error())
}
if len(errs) > 0 {
panic("check fail: " + strings.Join(errs, "\n"))
}
}
// Write meta to disk.
// 写入meta页
if err := tx.writeMeta(); err != nil {
tx.rollback()
return err
}
tx.stats.WriteTime += time.Since(startTime)
// Finalize the transaction.
tx.close()
// Execute commit handlers now that the locks have been removed.
for _, fn := range tx.commitHandlers {
fn()
}
return nil
}
|
Bucket的spill实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
// spill writes all the nodes for this bucket to dirty pages.
func (b *Bucket) spill() error {
// Spill all child buckets first.
for name, child := range b.buckets {
// If the child bucket is small enough and it has no child buckets then
// write it inline into the parent bucket's page. Otherwise spill it
// like a normal bucket and make the parent value a pointer to the page.
var value []byte
if child.inlineable() {
child.free()
// 内嵌页处理
value = child.write()
} else {
// 对child bucket调用spill
if err := child.spill(); err != nil {
return err
}
// Update the child bucket header in this bucket.
value = make([]byte, unsafe.Sizeof(bucket{}))
var bucket = (*bucket)(unsafe.Pointer(&value[0]))
*bucket = *child.bucket
}
// Skip writing the bucket if there are no materialized nodes.
if child.rootNode == nil {
continue
}
// Update parent node.
var c = b.Cursor()
k, _, flags := c.seek([]byte(name))
if !bytes.Equal([]byte(name), k) {
panic(fmt.Sprintf("misplaced bucket header: %x -> %x", []byte(name), k))
}
if flags&bucketLeafFlag == 0 {
panic(fmt.Sprintf("unexpected bucket header flag: %x", flags))
}
// 写入node中
c.node().put([]byte(name), []byte(name), value, 0, bucketLeafFlag)
}
// Ignore if there's not a materialized root node.
if b.rootNode == nil {
return nil
}
// Spill nodes.
// 调用node.spill
if err := b.rootNode.spill(); err != nil {
return err
}
b.rootNode = b.rootNode.root()
// Update the root node for this bucket.
if b.rootNode.pgid >= b.tx.meta.pgid {
panic(fmt.Sprintf("pgid (%d) above high water mark (%d)", b.rootNode.pgid, b.tx.meta.pgid))
}
b.root = b.rootNode.pgid
return nil
}
|
MVCC¶
多版本并发控制(MVCC)是指数据库中同时保存数据的多个版本,以便支持多个事务的隔离和同时运行。写事务并不会直接修改数据,而是创建数据的一个新版本。
在boltdb中,支持多个读事务同时运行,任一时间只有一个写事务能够运行。从上面的事务处理分析可见,写事务并不会修改已有的page,而是分配新的page然后写入,因此同时运行的读事务可以继续访问老版本的数据而不受影响。写事务在提交时会修改meta页,此后创建的读写事务会用最新meta页来初始化事务,也就可以读到最新版本的数据。
crash recovery¶
boltdb没有做专门的crash recovery,是通过控制修改的顺序来保证数据完整性的,先写入树结构,再写入meta页。在写事务提交过程中机器宕机时,如果meta页还未修改,会用之前的状态重新打开DB,如果meta页写入不完整,会用另一个meta页来重新打开DB。需要注意的是,对于mmap而言,仅控制写入的并不能保证同步到磁盘的顺序,因此boltdb实现时都有显式的调用fdatasync
,保证前面的修改同步到磁盘后才进行后面的操作。
boltdb代码还是很值得阅读和学习的,通过4000行左右的代码,实现了高效的KV数据库。
- boltdb借助mmap来简化buffer管理器的实现,通过控制写盘顺序而不需要专门的crash recovery处理。
- boltdb实现了append only B+树。
- boltdb的页分配算法比较巧妙的是只会分配连续的页,简化了后续的处理逻辑。
参考资料