https://github.com/madushadhanushka/simple-sqlite 是从sqlite 2.5.0 抽取出的核心逻辑,适合用来学习sqlite的后端实现,如
- 如何实现page buffer
- 如何实现基于磁盘的B树
- 如何通过rollback journaling来实现事务支持和crash recovery
page buffer bool¶
先看page的实现。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
// page size 大小
#define SQLITE_PAGE_SIZE 1024
// 页号。0 代表 "not a page", 文件中的第一个页从1开始。
typedef unsigned int Pgno;
/*
** Each in-memory image of a page begins with the following header.
** This header is only visible to this pager module. The client
** code that calls pager sees only the data that follows the header.
*/
typedef struct PgHdr PgHdr;
struct PgHdr {
Pager *pPager; /* The pager to which this page belongs */
Pgno pgno; /* The page number for this page */
PgHdr *pNextHash, *pPrevHash; /* Hash collision chain for PgHdr.pgno */
int nRef; /* Number of users of this page */
PgHdr *pNextFree, *pPrevFree; /* Freelist of pages where nRef==0 */
PgHdr *pNextAll, *pPrevAll; /* A list of all pages */
char inJournal; /* TRUE if has been written to journal */
char inCkpt; /* TRUE if written to the checkpoint journal */
char dirty; /* TRUE if we need to write back changes */
/* SQLITE_PAGE_SIZE bytes of page data follow this header */
/* Pager.nExtra bytes of local data follow the page data */
};
|
每个page的大小为SQLITE_PAGE_SIZE。PgHdr是page前面的一段内存,用于簿记一些信息,如是否是脏页(dirty),是否已写入journal或checkpoint文件(inJournal, inCkpt),页号(pgno),引用数(nRef)等。此外还记录一些双链表的next/prev指针,pNextHash记录哈希表的冲突链,pNextFree记录空闲页,pNextAll记录所有页。
在内存中布局如下
1
|
| sizeof(PgHdr) | SQLITE_PAGE_SIZE | Pager.nExtra |
|
按照上面的布局,定义了如下的宏来做内存地址的转换
1
2
3
4
5
6
7
8
|
/*
** Convert a pointer to a PgHdr into a pointer to its data
** and back again.
*/
#define PGHDR_TO_DATA(P) ((void*)(&(P)[1]))
#define DATA_TO_PGHDR(D) (&((PgHdr*)(D))[-1])
#define PGHDR_TO_EXTRA(P) ((void*)&((char*)(&(P)[1]))[SQLITE_PAGE_SIZE])
|
我们看PGHDR_TO_DATA的实现。最简单的情况下, PgHdr* P = init xxx;
把P当作一个数组,P[1] 相当于 P + sizeof(PgHdr),指向了page的内存,&P[1]的类型仍然为PgHdr*, 再将其转为void* 类型。sqlite实现时大量采用了类似的技巧。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
/*
** A open page cache is an instance of the following structure.
*/
struct Pager {
char *zFilename; /* Name of the database file */
char *zJournal; /* Name of the journal file */
OsFile fd, jfd; /* File descriptors for database and journal */
OsFile cpfd; /* File descriptor for the checkpoint journal */
// page个数
int dbSize; /* Number of pages in the file */
int origDbSize; /* dbSize before the current change */
int ckptSize, ckptJSize; /* Size of database and journal at ckpt_begin() */
int nExtra; /* Add this many bytes to each in-memory page */
// 用于做一些B树的清理工作
void (*xDestructor)(void*); /* Call this routine when freeing pages */
int nPage; /* Total number of in-memory pages */
int nRef; /* Number of in-memory pages with PgHdr.nRef>0 */
int mxPage; /* Maximum number of pages to hold in cache */
int nHit, nMiss, nOvfl; /* Cache hits, missing, and LRU overflows */
u8 journalOpen; /* True if journal file descriptors is valid */
u8 ckptOpen; /* True if the checkpoint journal is open */
u8 ckptInUse; /* True we are in a checkpoint */
u8 noSync; /* Do not sync the journal if true */
u8 state; /* SQLITE_UNLOCK, _READLOCK or _WRITELOCK */
u8 errMask; /* One of several kinds of errors */
u8 tempFile; /* zFilename is a temporary file */
u8 readOnly; /* True for a read-only database */
u8 needSync; /* True if an fsync() is needed on the journal */
u8 dirtyFile; /* True if database file has changed in any way */
u8 *aInJournal; /* One bit for each page in the database file */
u8 *aInCkpt; /* One bit for each page in the database */
PgHdr *pFirst, *pLast; /* List of free pages */
PgHdr *pAll; /* List of all pages */
// 用于根据页号快速定位到页
PgHdr *aHash[N_PG_HASH]; /* Hash table to map page number of PgHdr */
};
|
Pager表示page cache实现,成员比较多,但都有注释。
1
2
3
4
5
6
|
int sqlitepager_open(
Pager **ppPager, /* Return the Pager structure here */
const char *zFilename, /* Name of the database file to open */
int mxPage, /* Max number of in-memory cache pages */
int nExtra /* Extra bytes append to each in-memory page */
)
|
sqlitepager_open用于创建一个Pager,其实现稍长,但逻辑比较简单,首先打开数据库文件,然后做成员的初始化。这里不介绍其具体实现。参数中的mxPage控制内存中的最大页数,nExtra是上面的page部局中最后额外分配的一段内存。
pager_lookup用于在内存中根据页号来查询页,如果没有缓存,不会从磁盘中加载。主要逻辑就是在哈希表中查找。
1
2
3
4
5
6
7
8
9
10
11
|
/*
** Find a page in the hash table given its page number. Return
** a pointer to the page or NULL if not found.
*/
static PgHdr *pager_lookup(Pager *pPager, Pgno pgno){
PgHdr *p = pPager->aHash[pgno % N_PG_HASH];
while( p && p->pgno!=pgno ){
p = p->pNextHash;
}
return p;
}
|
pager_get和pager_lookup的不同在于,在没有命中缓存时会从磁盘中加载。在没有超过内存页数限制或者超过了但没有空闲页时,会malloc新的内存;否则从空闲页中重用page,如果所有的空闲页都是脏页,执行刷盘,这样所有的空闲页都不是脏页了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
int sqlitepager_get(Pager *pPager, Pgno pgno, void **ppPage){
PgHdr *pPg;
/* Make sure we have not hit any critical errors.
*/
if( pPager==0 || pgno==0 ){
return SQLITE_ERROR;
}
if( pPager->errMask & ~(PAGER_ERR_FULL) ){
return pager_errcode(pPager);
}
/* If this is the first page accessed, then get a read lock
** on the database file.
*/
if( pPager->nRef==0 ){
if( sqliteOsReadLock(&pPager->fd)!=SQLITE_OK ){
*ppPage = 0;
return SQLITE_BUSY;
}
pPager->state = SQLITE_READLOCK;
/* If a journal file exists, try to play it back.
*/
// nRef == 0,又存在journal文件,说明是崩溃后重启,需要从journal文件恢复
if( sqliteOsFileExists(pPager->zJournal) ){
}
pPg = 0;
}else{
/* Search for page in cache */
pPg = pager_lookup(pPager, pgno);
}
// 没有命中缓存
if( pPg==0 ){
/* The requested page is not in the page cache. */
int h;
pPager->nMiss++;
// 没有超过内存上限,或者没有空闲页
if( pPager->nPage<pPager->mxPage || pPager->pFirst==0 ){
/* Create a new page */
// 内存分配和我们上面的分析是一致的
pPg = malloc( sizeof(*pPg) + SQLITE_PAGE_SIZE + pPager->nExtra );
memset(pPg, 0, sizeof(*pPg) + SQLITE_PAGE_SIZE + pPager->nExtra );
pPg->pPager = pPager;
pPg->pNextAll = pPager->pAll;
if( pPager->pAll ){
pPager->pAll->pPrevAll = pPg;
}
pPg->pPrevAll = 0;
pPager->pAll = pPg;
pPager->nPage++;
}else{
/* Recycle an older page. First locate the page to be recycled.
** Try to find one that is not dirty and is near the head of
** of the free list */
pPg = pPager->pFirst;
// 当前没有任何引用的页为free页,但仍然可能为脏页
while( pPg && pPg->dirty ){
pPg = pPg->pNextFree;
}
// 如果找不到可用的空闲页,则将所有的修改同步磁盘。
// 之所以同步所有的页而非只同步一个页,是因为防止短期后续出现需要同步的情况。
if( pPg==0 ){
int rc = syncAllPages(pPager);
if( rc!=0 ){
sqlitepager_rollback(pPager);
*ppPage = 0;
return SQLITE_IOERR;
}
pPg = pPager->pFirst;
}
assert( pPg->nRef==0 );
assert( pPg->dirty==0 );
/* Unlink the old page from the free list and the hash table
*/
// 省略free链,hash链的维护逻辑
pPager->nOvfl++;
}
pPg->pgno = pgno;
// 设置是否在journal或checkpoint中
if( pPager->aInJournal && (int)pgno<=pPager->origDbSize ){
pPg->inJournal = (pPager->aInJournal[pgno/8] & (1<<(pgno&7)))!=0;
}else{
pPg->inJournal = 0;
}
if( pPager->aInCkpt && (int)pgno<=pPager->ckptSize ){
pPg->inCkpt = (pPager->aInCkpt[pgno/8] & (1<<(pgno&7)))!=0;
}else{
pPg->inCkpt = 0;
}
pPg->dirty = 0;
pPg->nRef = 1;
REFINFO(pPg);
pPager->nRef++;
// 维护hash链
h = pager_hash(pgno);
pPg->pNextHash = pPager->aHash[h];
pPager->aHash[h] = pPg;
if( pPg->pNextHash ){
assert( pPg->pNextHash->pPrevHash==0 );
pPg->pNextHash->pPrevHash = pPg;
}
if( pPager->dbSize<0 ) sqlitepager_pagecount(pPager);
// 对于磁盘上没有的页,置0
if( pPager->dbSize<(int)pgno ){
memset(PGHDR_TO_DATA(pPg), 0, SQLITE_PAGE_SIZE);
}else{
int rc;
sqliteOsSeek(&pPager->fd, (pgno-1)*SQLITE_PAGE_SIZE);
// 从磁盘读取
rc = sqliteOsRead(&pPager->fd, PGHDR_TO_DATA(pPg), SQLITE_PAGE_SIZE);
if( rc!=SQLITE_OK ){
return rc;
}
}
if( pPager->nExtra>0 ){
memset(PGHDR_TO_EXTRA(pPg), 0, pPager->nExtra);
}
}else{
/* The requested page is in the page cache. */
pPager->nHit++;
page_ref(pPg);
}
*ppPage = PGHDR_TO_DATA(pPg);
return SQLITE_OK;
}
|
sqlite对page实现了引用计数,引用计数变为0时加入空闲链,变为非0时从空闲链移除。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
static void page_ref(PgHdr *pPg){
if( pPg->nRef==0 ){
/* The page is currently on the freelist. Remove it. */
if( pPg->pPrevFree ){
pPg->pPrevFree->pNextFree = pPg->pNextFree;
}else{
pPg->pPager->pFirst = pPg->pNextFree;
}
if( pPg->pNextFree ){
pPg->pNextFree->pPrevFree = pPg->pPrevFree;
}else{
pPg->pPager->pLast = pPg->pPrevFree;
}
pPg->pPager->nRef++;
}
pPg->nRef++;
REFINFO(pPg);
}
|
事务及crash recovery实现¶
sqlite 3.7.0实现了wal journal,之前采用的是rollback journaling。所谓rollback journaling,是指在修改page前,先将page的内容备份到journal文件中,然后再修改内存中的page,在事务提交时成功刷新到磁盘后再移除journal文件。如果事务在成功提交之前出现crash,在重启时sqlite会从journal文件恢复page,使数据库恢复事务提交前的状态。
在事务开始时,需要调用sqlitepager_begin
,会创建journal文件和维护journal相关的标记位,记录当前文件的页数用于回滚。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
int sqlitepager_begin(void *pData){
PgHdr *pPg = DATA_TO_PGHDR(pData);
Pager *pPager = pPg->pPager;
int rc = SQLITE_OK;
if( pPager->state==SQLITE_READLOCK ){
assert( pPager->aInJournal==0 );
// 写锁
rc = sqliteOsWriteLock(&pPager->fd);
if( rc!=SQLITE_OK ){
return rc;
}
// 用于记录哪些页已经写入journal文件了
pPager->aInJournal = malloc( pPager->dbSize/8 + 1 );
memset(pPager->aInJournal , 0, pPager->dbSize/8 + 1);
// 打开journal文件
rc = sqliteOsOpenExclusive(pPager->zJournal, &pPager->jfd, 0);
pPager->journalOpen = 1;
pPager->needSync = 0;
pPager->dirtyFile = 0;
pPager->state = SQLITE_WRITELOCK;
sqlitepager_pagecount(pPager);
// 记录之前的page个数
pPager->origDbSize = pPager->dbSize;
// 写入magic和之前的page页数
rc = sqliteOsWrite(&pPager->jfd, aJournalMagic, sizeof(aJournalMagic));
if( rc==SQLITE_OK ){
rc = sqliteOsWrite(&pPager->jfd, &pPager->dbSize, sizeof(Pgno));
}
if( rc!=SQLITE_OK ){
rc = pager_unwritelock(pPager);
if( rc==SQLITE_OK ) rc = SQLITE_FULL;
}
}
return rc;
}
|
在修改page前,需要调用sqlitepager_write
,将page内容备份到journal文件中,并设置dirty标记。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
int sqlitepager_write(void *pData){
PgHdr *pPg = DATA_TO_PGHDR(pData);
Pager *pPager = pPg->pPager;
int rc = SQLITE_OK;
/* Check for errors
*/
if( pPager->errMask ){
return pager_errcode(pPager);
}
if( pPager->readOnly ){
return SQLITE_PERM;
}
/* Mark the page as dirty. If the page has already been written
** to the journal then we can return right away.
*/
pPg->dirty = 1;
if( pPg->inJournal && (pPg->inCkpt || pPager->ckptInUse==0) ){
pPager->dirtyFile = 1;
return SQLITE_OK;
}
/* If we get this far, it means that the page needs to be
** written to the transaction journal or the ckeckpoint journal
** or both.
**
** First check to see that the transaction journal exists and
** create it if it does not.
*/
assert( pPager->state!=SQLITE_UNLOCK );
// 调用sqlitepager_begin,如果之前已经调用过,相当于空操作
rc = sqlitepager_begin(pData);
pPager->dirtyFile = 1;
if( rc!=SQLITE_OK ) return rc;
assert( pPager->state==SQLITE_WRITELOCK );
assert( pPager->journalOpen );
/* The transaction journal now exists and we have a write lock on the
** main database file. Write the current page to the transaction
** journal if it is not there already.
*/
// 如果没有备份到journal且在文件原有范围内
if( !pPg->inJournal && (int)pPg->pgno <= pPager->origDbSize ){
// 写入页号
rc = sqliteOsWrite(&pPager->jfd, &pPg->pgno, sizeof(Pgno));
if( rc==SQLITE_OK ){
// 写入page内容
rc = sqliteOsWrite(&pPager->jfd, pData, SQLITE_PAGE_SIZE);
}
assert( pPager->aInJournal!=0 );
// 设置标志位
pPager->aInJournal[pPg->pgno/8] |= 1<<(pPg->pgno&7);
pPager->needSync = !pPager->noSync;
pPg->inJournal = 1;
}
/* Update the database size and return.
*/
if( pPager->dbSize<(int)pPg->pgno ){
pPager->dbSize = pPg->pgno;
}
return rc;
}
|
在事务回滚时,调用sqlitepager_rollback
,从journal文件恢复备份的page内容。对于journal文件中所有完整的页,将其读出后写入数据库文件的相应页,并更新到存在的page缓存。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
int sqlitepager_rollback(Pager *pPager){
int rc;
if( pPager->errMask!=0 && pPager->errMask!=PAGER_ERR_FULL ){
if( pPager->state>=SQLITE_WRITELOCK ){
pager_playback(pPager);
}
return pager_errcode(pPager);
}
if( pPager->state!=SQLITE_WRITELOCK ){
return SQLITE_OK;
}
// 回放
rc = pager_playback(pPager);
if( rc!=SQLITE_OK ){
rc = SQLITE_CORRUPT;
pPager->errMask |= PAGER_ERR_CORRUPT;
}
// 设置页数为-1,后面有需要会自动获取实际的页数
pPager->dbSize = -1;
return rc;
}
static int pager_playback(Pager *pPager){
int nRec; /* Number of Records */
int i; /* Loop counter */
Pgno mxPg = 0; /* Size of the original file in pages */
unsigned char aMagic[sizeof(aJournalMagic)];
int rc;
/* Figure out how many records are in the journal. Abort early if
** the journal is empty.
*/
assert( pPager->journalOpen );
sqliteOsSeek(&pPager->jfd, 0);
rc = sqliteOsFileSize(&pPager->jfd, &nRec);
if( rc!=SQLITE_OK ){
goto end_playback;
}
// 校验至少需要存储一个完整的page
nRec = (nRec - (sizeof(aMagic)+sizeof(Pgno))) / sizeof(PageRecord);
if( nRec<=0 ){
goto end_playback;
}
/* Read the beginning of the journal and truncate the
** database file back to its original size.
*/
// 校验magic
rc = sqliteOsRead(&pPager->jfd, aMagic, sizeof(aMagic));
if( rc!=SQLITE_OK || memcmp(aMagic,aJournalMagic,sizeof(aMagic))!=0 ){
rc = SQLITE_PROTOCOL;
goto end_playback;
}
// 读取最大页数
rc = sqliteOsRead(&pPager->jfd, &mxPg, sizeof(mxPg));
if( rc!=SQLITE_OK ){
goto end_playback;
}
// 丢弃最后不完整的页
rc = sqliteOsTruncate(&pPager->fd, mxPg*SQLITE_PAGE_SIZE);
if( rc!=SQLITE_OK ){
goto end_playback;
}
// 恢复页数
pPager->dbSize = mxPg;
/* Copy original pages out of the journal and back into the database file.
*/
for(i=nRec-1; i>=0; i--){
rc = pager_playback_one_page(pPager, &pPager->jfd);
if( rc!=SQLITE_OK ) break;
}
end_playback:
if( rc!=SQLITE_OK ){
pager_unwritelock(pPager);
pPager->errMask |= PAGER_ERR_CORRUPT;
rc = SQLITE_CORRUPT;
}else{
rc = pager_unwritelock(pPager);
}
return rc;
}
static int pager_playback_one_page(Pager *pPager, OsFile *jfd){
int rc;
PgHdr *pPg; /* An existing page in the cache */
PageRecord pgRec;
// 读页号和page内容
rc = sqliteOsRead(jfd, &pgRec, sizeof(pgRec));
if( rc!=SQLITE_OK ) return rc;
/* Sanity checking on the page */
if( pgRec.pgno>pPager->dbSize || pgRec.pgno==0 ) return SQLITE_CORRUPT;
/* Playback the page. Update the in-memory copy of the page
** at the same time, if there is one.
*/
// 如果有缓存的话,也更新到缓存
pPg = pager_lookup(pPager, pgRec.pgno);
if( pPg ){
memcpy(PGHDR_TO_DATA(pPg), pgRec.aData, SQLITE_PAGE_SIZE);
memset(PGHDR_TO_EXTRA(pPg), 0, pPager->nExtra);
}
// 写入文件
rc = sqliteOsSeek(&pPager->fd, (pgRec.pgno-1)*SQLITE_PAGE_SIZE);
if( rc==SQLITE_OK ){
rc = sqliteOsWrite(&pPager->fd, pgRec.aData, SQLITE_PAGE_SIZE);
}
return rc;
}
|
事务提交时需要调用sqlitepager_commit
,刷盘并释放写锁。看实现会先同步journal文件,然后将内存中的page写入磁盘并同步数据库文件。在释放写锁时会删除journal文件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
int sqlitepager_commit(Pager *pPager){
int rc;
PgHdr *pPg;
if( pPager->errMask==PAGER_ERR_FULL ){
rc = sqlitepager_rollback(pPager);
if( rc==SQLITE_OK ) rc = SQLITE_FULL;
return rc;
}
if( pPager->errMask!=0 ){
rc = pager_errcode(pPager);
return rc;
}
if( pPager->state!=SQLITE_WRITELOCK ){
return SQLITE_ERROR;
}
assert( pPager->journalOpen );
if( pPager->dirtyFile==0 ){
// 没有修改直接返回
/* Exit early (without doing the time-consuming sqliteOsSync() calls)
** if there have been no changes to the database file. */
rc = pager_unwritelock(pPager);
pPager->dbSize = -1;
return rc;
}
// 同步journal文件
if( pPager->needSync && sqliteOsSync(&pPager->jfd)!=SQLITE_OK ){
goto commit_abort;
}
// 写所有的脏页
for(pPg=pPager->pAll; pPg; pPg=pPg->pNextAll){
if( pPg->dirty==0 ) continue;
rc = sqliteOsSeek(&pPager->fd, (pPg->pgno-1)*SQLITE_PAGE_SIZE);
if( rc!=SQLITE_OK ) goto commit_abort;
rc = sqliteOsWrite(&pPager->fd, PGHDR_TO_DATA(pPg), SQLITE_PAGE_SIZE);
if( rc!=SQLITE_OK ) goto commit_abort;
}
// 同步数据库文件
if( !pPager->noSync && sqliteOsSync(&pPager->fd)!=SQLITE_OK ){
goto commit_abort;
}
// 释放写锁时会删除journal文件
rc = pager_unwritelock(pPager);
pPager->dbSize = -1;
return rc;
/* Jump here if anything goes wrong during the commit process.
*/
commit_abort:
rc = sqlitepager_rollback(pPager);
if( rc==SQLITE_OK ){
rc = SQLITE_FULL;
}
return rc;
}
|
某些情况下脏页有可能整页被丢弃,比如在B树删除时可能page被合并到相邻的页,这时可以调用sqlitepager_dont_write
来减少事务提交时的写盘量,通过去掉脏页标记使得事务提交时不需要刷新这个页到磁盘。
1
2
3
4
5
6
7
|
void sqlitepager_dont_write(Pager *pPager, Pgno pgno){
PgHdr *pPg;
pPg = pager_lookup(pPager, pgno);
if( pPg && pPg->dirty ){
pPg->dirty = 0;
}
}
|
某些情况下,在写数据页时可能之前的数据不需要备份到journal文件中,比如重用之前释放的磁盘页时,这时可以调用sqlitepager_dont_rollback
,其主要逻辑为设置在journal文件中的标志位。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
void sqlitepager_dont_rollback(void *pData){
PgHdr *pPg = DATA_TO_PGHDR(pData);
Pager *pPager = pPg->pPager;
if( pPager->state!=SQLITE_WRITELOCK || pPager->journalOpen==0 ) return;
if( !pPg->inJournal && (int)pPg->pgno <= pPager->origDbSize ){
assert( pPager->aInJournal!=0 );
pPager->aInJournal[pPg->pgno/8] |= 1<<(pPg->pgno&7);
// 只设置标志位,但不写入journal文件
pPg->inJournal = 1;
if( pPager->ckptInUse ){
pPager->aInCkpt[pPg->pgno/8] |= 1<<(pPg->pgno&7);
pPg->inCkpt = 1;
}
}
}
|
B树实现¶
这个版本的sqlite的表和索引都使用了B树实现,还没有实现B+树。
一个打开的数据库用下面的结构体表示
1
2
3
4
5
6
7
8
9
|
struct Btree {
Pager *pPager; /* The page cache */
BtCursor *pCursor; /* A list of all open cursors */
PageOne *page1; /* First page of the database */
u8 inTrans; /* True if a transaction is in progress */
u8 inCkpt; /* True if there is a checkpoint on the transaction */
u8 readOnly; /* True if the underlying file is readonly */
Hash locks; /* Key: root page number. Data: lock count */
};
|
数据库文件的第1个page,用PageOne
来表示,其中包含标识数据库文件的magic number,以及空闲页的信息。
1
2
3
4
5
6
7
8
9
|
struct PageOne {
// ** This file contains an SQLite 2.1 database **
char zMagic[MAGIC_SIZE]; /* String that identifies the file as a database */
// 0xdae37528
int iMagic; /* Integer to verify correct byte order */
Pgno freeList; /* First free page in a list of all free pages */
int nFree; /* Number of pages on the free list */
int aMeta[SQLITE_N_BTREE_META-1]; /* User defined integers */
};
|
一个数据库文件中可以包含多个B树,这些B树存储在第2个或者页号更大的page中。
空闲页表¶
PageOne.freeList指向第一个存储空闲页信息的页。空闲页信息使用FreelistInfo
存储
1
2
3
4
|
struct FreelistInfo {
int nFree; // 空闲页的个数
Pgno aFree[(OVERFLOW_SIZE-sizeof(int))/sizeof(Pgno)];
};
|
存储空闲页信息的页使用overflow页的格式存储,使用OverflowPage.iNext指向下一个存储空闲页信息的页,这些空闲页信息构成了一个链表。FreelistInfo存储在OverflowPage.aPayload处。
1
2
3
4
5
6
|
#define OVERFLOW_SIZE (SQLITE_PAGE_SIZE-sizeof(Pgno))
struct OverflowPage {
Pgno iNext;
char aPayload[OVERFLOW_SIZE];
};
|
添加一个页到空闲页表的实现如下。如果存在不满的存储空闲页信息的页,则追加到其中,否则创建一个新的存储空闲页信息的页。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
/*
** Add a page of the database file to the freelist. Either pgno or
** pPage but not both may be 0.
**
** sqlitepager_unref() is NOT called for pPage.
*/
static int freePage(Btree *pBt, void *pPage, Pgno pgno){
PageOne *pPage1 = pBt->page1;
OverflowPage *pOvfl = (OverflowPage*)pPage;
int rc;
int needUnref = 0;
MemPage *pMemPage;
if( pgno==0 ){
assert( pOvfl!=0 );
pgno = sqlitepager_pagenumber(pOvfl);
}
assert( pgno>2 );
rc = sqlitepager_write(pPage1);
if( rc ){
return rc;
}
// 计数加1
pPage1->nFree++;
if( pPage1->nFree>0 && pPage1->freeList ){
OverflowPage *pFreeIdx;
rc = sqlitepager_get(pBt->pPager, pPage1->freeList, (void**)&pFreeIdx);
if( rc==SQLITE_OK ){
FreelistInfo *pInfo = (FreelistInfo*)pFreeIdx->aPayload;
// 如果有空闲空间,则写入
if( pInfo->nFree<(sizeof(pInfo->aFree)/sizeof(pInfo->aFree[0])) ){
rc = sqlitepager_write(pFreeIdx);
if( rc==SQLITE_OK ){
pInfo->aFree[pInfo->nFree] = pgno;
pInfo->nFree++;
sqlitepager_unref(pFreeIdx);
// 取消pgno页的脏页标记
sqlitepager_dont_write(pBt->pPager, pgno);
return rc;
}
}
sqlitepager_unref(pFreeIdx);
}
}
// 运行到这里意味着所有的存储空闲页信息的页都已写满,需要用当前页来存储空闲页信息。
if( pOvfl==0 ){
assert( pgno>0 );
rc = sqlitepager_get(pBt->pPager, pgno, (void**)&pOvfl);
if( rc ) return rc;
needUnref = 1;
}
rc = sqlitepager_write(pOvfl);
if( rc ){
if( needUnref ) sqlitepager_unref(pOvfl);
return rc;
}
// 维护链表
pOvfl->iNext = pPage1->freeList;
pPage1->freeList = pgno;
// memset也保证了FreelistInfo.nFree == 0
memset(pOvfl->aPayload, 0, OVERFLOW_SIZE);
pMemPage = (MemPage*)pPage;
pMemPage->isInit = 0;
if( pMemPage->pParent ){
sqlitepager_unref(pMemPage->pParent);
pMemPage->pParent = 0;
}
if( needUnref ) rc = sqlitepager_unref(pOvfl);
return rc;
}
|
从空闲页表分配一个页的实现如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
static int allocatePage(Btree *pBt, MemPage **ppPage, Pgno *pPgno){
PageOne *pPage1 = pBt->page1;
int rc;
// 如果有空闲页,从空闲页分配;否则从磁盘加载,分配一个新页
if( pPage1->freeList ){
OverflowPage *pOvfl;
FreelistInfo *pInfo;
rc = sqlitepager_write(pPage1);
if( rc ) return rc;
pPage1->nFree--;
rc = sqlitepager_get(pBt->pPager, pPage1->freeList, (void**)&pOvfl);
if( rc ) return rc;
rc = sqlitepager_write(pOvfl);
if( rc ){
sqlitepager_unref(pOvfl);
return rc;
}
pInfo = (FreelistInfo*)pOvfl->aPayload;
if( pInfo->nFree==0 ){
// 这个空闲页信息页只的空闲页为0,把这个页本身分配出去。
*pPgno = pPage1->freeList;
pPage1->freeList = pOvfl->iNext;
*ppPage = (MemPage*)pOvfl;
}else{
// 从中拿一个页
pInfo->nFree--;
*pPgno = pInfo->aFree[pInfo->nFree];
rc = sqlitepager_get(pBt->pPager, *pPgno, (void**)ppPage);
sqlitepager_unref(pOvfl);
if( rc==SQLITE_OK ){
// 设置空闲页的内容不需要回滚
sqlitepager_dont_rollback(*ppPage);
rc = sqlitepager_write(*ppPage);
}
}
}else{
*pPgno = sqlitepager_pagecount(pBt->pPager) + 1;
// 从Pager分配一个新页
rc = sqlitepager_get(pBt->pPager, *pPgno, (void**)ppPage);
if( rc ) return rc;
rc = sqlitepager_write(*ppPage);
}
return rc;
}
|
B树节点页实现¶
节点使用下面的结构体来表示。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
struct MemPage {
union {
char aDisk[SQLITE_PAGE_SIZE]; /* Page data stored on disk */
PageHdr hdr; /* Overlay page header */
} u;
int isInit; /* True if auxiliary data is initialized */
MemPage *pParent; /* The parent of this page. NULL for root */
// 空闲字节数。
int nFree; /* Number of free bytes in u.aDisk[] */
// 存储的cell个数。
int nCell; /* Number of entries on this page */
// 是否已经overfull,即超出page size
int isOverfull; /* Some apCell[] points outside u.aDisk[] */
// 排序的Cell列表。
Cell *apCell[MX_CELL+2]; /* All data entires in sorted order */
};
#define EXTRA_SIZE (sizeof(MemPage)-SQLITE_PAGE_SIZE)
|
MemPage.u对应磁盘上的page,之后的数据只在内存中存储。在创建Pager时,会设置Pager.nExtra = EXTRA_SIZE以便在创建内存中的页时分配足够的内存。
每个节点的KV和左子指针,使用Cell结构来表示。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
struct CellHdr {
Pgno leftChild; /* Child page that comes before this cell */
u16 nKey; /* Number of bytes in the key */
u16 iNext; /* Index in MemPage.u.aDisk[] of next cell in sorted order */
u8 nKeyHi; /* Upper 8 bits of key size for keys larger than 64K bytes */
u8 nDataHi; /* Upper 8 bits of data size when the size is more than 64K */
u16 nData; /* Number of bytes of data */
};
#define NKEY(h) (h.nKey + h.nKeyHi*65536)
#define NDATA(h) (h.nData + h.nDataHi*65536)
// Cell的最小大小
#define MIN_CELL_SIZE (sizeof(CellHdr)+4)
// 一个页中最多可以存多少个Cell
#define MX_CELL ((SQLITE_PAGE_SIZE-sizeof(PageHdr))/MIN_CELL_SIZE)
#define USABLE_SPACE (SQLITE_PAGE_SIZE - sizeof(PageHdr))
// 不使用overflow页时Cell的最大字节数,保证一个page至少可以存4个Cell
#define MX_LOCAL_PAYLOAD ((USABLE_SPACE/4-(sizeof(CellHdr)+sizeof(Pgno)))&~3)
struct Cell {
CellHdr h; /* The cell header */
char aPayload[MX_LOCAL_PAYLOAD]; /* Key and data */
// 第一个overflow页的页号
Pgno ovfl; /* The first overflow page */
};
|
Cell由CellHdr和payload数据以及第一个overflow页的页号构成。虽然Cell的结构体的长度是固定的,但实际Cell是变长的,通过CellHdr来确定Cell的实际长度。payload至少4个字节,最多MX_LOCAL_PAYLOAD个字节,如果payload需要的空间大于MX_LOCAL_PAYLOAD时会分配overflow页。
CellHdr描述了key和value的长度,都使用了24个bit表示,为了节省空间拆分为高位8bit的nKeyHi和16 bit的nKey(如果不这么拆分的话要占用32个bit),定义了NKEY和NDATA宏来得到K和V的长度。CellHdr.leftChild指向了左子树的页号,其中存储的key都小于这个Cell的key。
分配一个Cell的实现如下,如果空间不足,会分配overflow页来存储。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
static int fillInCell(
Btree *pBt, /* The whole Btree. Needed to allocate pages */
Cell *pCell, /* Populate this Cell structure */
const void *pKey, int nKey, /* The key */
const void *pData,int nData /* The data */
){
OverflowPage *pOvfl, *pPrior;
Pgno *pNext;
int spaceLeft;
int n, rc;
int nPayload;
const char *pPayload;
char *pSpace;
pCell->h.leftChild = 0;
// 设置size
pCell->h.nKey = nKey & 0xffff;
pCell->h.nKeyHi = nKey >> 16;
pCell->h.nData = nData & 0xffff;
pCell->h.nDataHi = nData >> 16;
pCell->h.iNext = 0;
pNext = &pCell->ovfl;
pSpace = pCell->aPayload;
spaceLeft = MX_LOCAL_PAYLOAD;
pPayload = pKey;
pKey = 0;
nPayload = nKey;
pPrior = 0;
while( nPayload>0 ){
if( spaceLeft==0 ){
// 分配一个overflow页,页号设置到pCell->ovfl或者pOvfl->iNext中
rc = allocatePage(pBt, (MemPage**)&pOvfl, pNext);
if( rc ){
*pNext = 0;
}
if( pPrior ) sqlitepager_unref(pPrior);
if( rc ){
clearCell(pBt, pCell);
return rc;
}
pPrior = pOvfl;
spaceLeft = OVERFLOW_SIZE;
pSpace = pOvfl->aPayload;
pNext = &pOvfl->iNext;
}
n = nPayload;
if( n>spaceLeft ) n = spaceLeft;
// 挎包
memcpy(pSpace, pPayload, n);
nPayload -= n;
if( nPayload==0 && pData ){
// key写完后接着写value
pPayload = pData;
nPayload = nData;
pData = 0;
}else{
pPayload += n;
}
spaceLeft -= n;
pSpace += n;
}
*pNext = 0;
if( pPrior ){
sqlitepager_unref(pPrior);
}
return SQLITE_OK;
}
|
释放Cell涉及到两个函数。clearCell
负责释放Cell分配的overflow页。dropCell
负责将Cell非溢出的内存加入空闲链表中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
static int clearCell(Btree *pBt, Cell *pCell){
Pager *pPager = pBt->pPager;
OverflowPage *pOvfl;
Pgno ovfl, nextOvfl;
int rc;
if( NKEY(pCell->h) + NDATA(pCell->h) <= MX_LOCAL_PAYLOAD ){
return SQLITE_OK;
}
ovfl = pCell->ovfl;
pCell->ovfl = 0;
while( ovfl ){
// 释放overflow页
rc = sqlitepager_get(pPager, ovfl, (void**)&pOvfl);
if( rc ) return rc;
nextOvfl = pOvfl->iNext;
rc = freePage(pBt, pOvfl, ovfl);
if( rc ) return rc;
sqlitepager_unref(pOvfl);
ovfl = nextOvfl;
}
return SQLITE_OK;
}
static void dropCell(MemPage *pPage, int idx, int sz){
int j;
assert( idx>=0 && idx<pPage->nCell );
assert( sz==cellSize(pPage->apCell[idx]) );
assert( sqlitepager_iswriteable(pPage) );
// 释放Cell本身占的空间
freeSpace(pPage, Addr(pPage->apCell[idx]) - Addr(pPage), sz);
for(j=idx; j<pPage->nCell-1; j++){
pPage->apCell[j] = pPage->apCell[j+1];
}
pPage->nCell--;
}
/
** Most of the effort here is involved in coalesing adjacent
** free blocks into a single big free block.
*/
static void freeSpace(MemPage *pPage, int start, int size){
int end = start + size;
u16 *pIdx, idx;
FreeBlk *pFBlk;
FreeBlk *pNew;
FreeBlk *pNext;
assert( sqlitepager_iswriteable(pPage) );
assert( size == ROUNDUP(size) );
assert( start == ROUNDUP(start) );
pIdx = &pPage->u.hdr.firstFree;
idx = *pIdx;
while( idx!=0 && idx<start ){
pFBlk = (FreeBlk*)&pPage->u.aDisk[idx];
if( idx + pFBlk->iSize == start ){
pFBlk->iSize += size;
if( idx + pFBlk->iSize == pFBlk->iNext ){
// 如果可以和下一个区间合并
pNext = (FreeBlk*)&pPage->u.aDisk[pFBlk->iNext];
pFBlk->iSize += pNext->iSize;
pFBlk->iNext = pNext->iNext;
}
pPage->nFree += size;
return;
}
pIdx = &pFBlk->iNext;
idx = *pIdx;
}
pNew = (FreeBlk*)&pPage->u.aDisk[start];
// 检查是否可以和后面的区间合并
if( idx != end ){
pNew->iSize = size;
pNew->iNext = idx;
}else{
pNext = (FreeBlk*)&pPage->u.aDisk[idx];
pNew->iSize = size + pNext->iSize;
pNew->iNext = pNext->iNext;
}
*pIdx = start;
pPage->nFree += size;
}
|
一个MemPage逻辑上来说由多个排序的Cell组成,但在物理存储时即不保证有序也不保证连续。由于插入数据时不一定有序,因此要保证物理存储有序需要移动数据而导致额外的开销,因此实现时并不会移动数据,但通过CellHdr.iNext构成了一个有序的Cell链表。由于删除数据时也不会移动数据,因此Cell之间可能有空洞,这些空洞使用FreeBlk表示,并通过FreeBlk.iNext构成了一个链表。
1
2
3
4
|
struct FreeBlk {
u16 iSize; /* Number of bytes in this block of free space */
u16 iNext; /* Index in MemPage.u.aDisk[] of the next free block */
};
|
从磁盘加载page后,通过initPage维护内存中的辅助信息,将所有Cell加载到内存中的apCell数组,计算总的空闲字节数信息。initPage后,可以通过apCell随机访问一个Cell,而不需要通过Cell链表顺序访问。在插入时,新的Cell可能超出page size,在短暂的时间内apCell会有指向aDisk外的Cell。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
static int initPage(MemPage *pPage, Pgno pgnoThis, MemPage *pParent){
int idx; /* An index into pPage->u.aDisk[] */
Cell *pCell; /* A pointer to a Cell in pPage->u.aDisk[] */
FreeBlk *pFBlk; /* A pointer to a free block in pPage->u.aDisk[] */
int sz; /* The size of a Cell in bytes */
int freeSpace; /* Amount of free space on the page */
if( pPage->pParent ){
assert( pPage->pParent==pParent );
return SQLITE_OK;
}
if( pParent ){
pPage->pParent = pParent;
// 增加引用计数
sqlitepager_ref(pParent);
}
if( pPage->isInit ) return SQLITE_OK;
pPage->isInit = 1;
pPage->nCell = 0;
freeSpace = USABLE_SPACE;
idx = pPage->u.hdr.firstCell;
while( idx!=0 ){
// 剩余空间存不下一个Cell
if( idx>SQLITE_PAGE_SIZE-MIN_CELL_SIZE ) goto page_format_error;
// 起始地址在PageHdr内
if( idx<sizeof(PageHdr) ) goto page_format_error;
// 没有4字节对齐
if( idx!=ROUNDUP(idx) ) goto page_format_error;
pCell = (Cell*)&pPage->u.aDisk[idx];
sz = cellSize(pCell);
// 超过了page size
if( idx+sz > SQLITE_PAGE_SIZE ) goto page_format_error;
freeSpace -= sz;
// 加载到cell数组
pPage->apCell[pPage->nCell++] = pCell;
idx = pCell->h.iNext;
}
pPage->nFree = 0;
idx = pPage->u.hdr.firstFree;
while( idx!=0 ){
if( idx>SQLITE_PAGE_SIZE-sizeof(FreeBlk) ) goto page_format_error;
if( idx<sizeof(PageHdr) ) goto page_format_error;
pFBlk = (FreeBlk*)&pPage->u.aDisk[idx];
// 维护空闲byte数统计信息
pPage->nFree += pFBlk->iSize;
if( pFBlk->iNext>0 && pFBlk->iNext <= idx ) goto page_format_error;
idx = pFBlk->iNext;
}
if( pPage->nCell==0 && pPage->nFree==0 ){
/* As a special case, an uninitialized root page appears to be
** an empty database */
return SQLITE_OK;
}
if( pPage->nFree!=freeSpace ) goto page_format_error;
return SQLITE_OK;
page_format_error:
return SQLITE_CORRUPT;
}
|
B树操作¶
创建B树¶
先看创建数据库的实现。如果已经至少有2个页,则直接返回。否则创建2个页,并初始化第1个页为PageOne,对第2个页调用zeroPage来重置为一个包含0项的B树节点页。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
static int newDatabase(Btree *pBt){
MemPage *pRoot;
PageOne *pP1;
int rc;
if( sqlitepager_pagecount(pBt->pPager)>1 ) return SQLITE_OK;
pP1 = pBt->page1;
rc = sqlitepager_write(pBt->page1);
if( rc ) return rc;
rc = sqlitepager_get(pBt->pPager, 2, (void**)&pRoot);
if( rc ) return rc;
rc = sqlitepager_write(pRoot);
if( rc ){
sqlitepager_unref(pRoot);
return rc;
}
strcpy(pP1->zMagic, zMagicHeader);
pP1->iMagic = MAGIC;
zeroPage(pRoot);
sqlitepager_unref(pRoot);
return SQLITE_OK;
}
|
sqliteBtreeCreateTable
要做的事情很简单,分配一个页作为根页,返回页号。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
int sqliteBtreeCreateTable(Btree *pBt, int *piTable){
MemPage *pRoot;
Pgno pgnoRoot;
int rc;
if( !pBt->inTrans ){
return SQLITE_ERROR; /* Must start a transaction first */
}
if( pBt->readOnly ){
return SQLITE_READONLY;
}
// 如果有空闲的,用空闲的,否则分配一个新page
rc = allocatePage(pBt, &pRoot, &pgnoRoot);
if( rc ) return rc;
assert( sqlitepager_iswriteable(pRoot) );
zeroPage(pRoot);
// 不在使用中就调unref
sqlitepager_unref(pRoot);
// 返回根所在的页号
*piTable = (int)pgnoRoot;
return SQLITE_OK;
}
|
Cursor操作¶
Cursor用于B树的遍历或者定位,其定义如下。mPage指向当前的page,idx指向当前的项。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
/*
** A cursor is a pointer to a particular entry in the BTree.
** The entry is identified by its MemPage and the index in
** MemPage.apCell[] of the entry.
*/
struct BtCursor {
Btree *pBt; /* The Btree to which this cursor belongs */
BtCursor *pNext, *pPrev; /* Forms a linked list of all cursors */
Pgno pgnoRoot; /* The root page of this tree */
MemPage *pPage; /* Page that contains the entry */
int idx; /* Index of the entry in pPage->apCell[] */
u8 wrFlag; /* True if writable */
u8 bSkipNext; /* sqliteBtreeNext() is no-op if true */
u8 iMatch; /* compare result from last sqliteBtreeMoveto() */
};
|
调用sqliteBtreeCursor
创建一个Cursor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
int sqliteBtreeCursor(Btree *pBt, int iTable, int wrFlag, BtCursor **ppCur){
int rc;
BtCursor *pCur;
ptr nLock;
if( pBt->page1==0 ){
// 打开PageOne,并获取一个数据库读锁
rc = lockBtree(pBt);
if( rc!=SQLITE_OK ){
*ppCur = 0;
return rc;
}
}
if( wrFlag && pBt->readOnly ){
*ppCur = 0;
return SQLITE_READONLY;
}
pCur = sqliteMalloc( sizeof(*pCur) );
if( pCur==0 ){
rc = SQLITE_NOMEM;
goto create_cursor_exception;
}
// 设置root页号
pCur->pgnoRoot = (Pgno)iTable;
// 获取当前页
rc = sqlitepager_get(pBt->pPager, pCur->pgnoRoot, (void**)&pCur->pPage);
if( rc!=SQLITE_OK ){
goto create_cursor_exception;
}
// 初始化page在内存中的额外辅助信息
rc = initPage(pCur->pPage, pCur->pgnoRoot, 0);
if( rc!=SQLITE_OK ){
goto create_cursor_exception;
}
nLock = (ptr)sqliteHashFind(&pBt->locks, 0, iTable);
// nLock < 0说明已被别人上了写锁
// nLock > 0 说明已被别人上了读锁,如果wrFlag为true,则获取写锁失败
if( nLock<0 || (nLock>0 && wrFlag) ){
rc = SQLITE_LOCKED;
goto create_cursor_exception;
}
// 写锁为负,读锁为正,0即NULL表示没有锁
nLock = wrFlag ? -1 : nLock+1;
sqliteHashInsert(&pBt->locks, 0, iTable, (void*)nLock);
pCur->pBt = pBt;
pCur->wrFlag = wrFlag;
pCur->idx = 0;
// 维护cursor链表
pCur->pNext = pBt->pCursor;
if( pCur->pNext ){
pCur->pNext->pPrev = pCur;
}
pCur->pPrev = 0;
pBt->pCursor = pCur;
*ppCur = pCur;
return SQLITE_OK;
create_cursor_exception:
*ppCur = 0;
if( pCur ){
if( pCur->pPage ) sqlitepager_unref(pCur->pPage);
sqliteFree(pCur);
}
unlockBtreeIfUnused(pBt);
return rc;
}
|
这个版本的Cursor支持的操作为First, Last, Next, Moveto,分别指向第一个、最后一个、下一个、指定的key。
sqliteBtreeFirst
先将Cursor指向root,然后沿着左子指针不断移动。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
int sqliteBtreeFirst(BtCursor *pCur, int *pRes){
int rc;
if( pCur->pPage==0 ) return SQLITE_ABORT;
rc = moveToRoot(pCur);
if( rc ) return rc;
if( pCur->pPage->nCell==0 ){
*pRes = 1;
return SQLITE_OK;
}
*pRes = 0;
rc = moveToLeftmost(pCur);
pCur->bSkipNext = 0;
return rc;
}
static int moveToRoot(BtCursor *pCur){
MemPage *pNew;
int rc;
// 载入page,并修改cursor指向的page
rc = sqlitepager_get(pCur->pBt->pPager, pCur->pgnoRoot, (void**)&pNew);
if( rc ) return rc;
rc = initPage(pNew, pCur->pgnoRoot, 0);
if( rc ) return rc;
sqlitepager_unref(pCur->pPage);
pCur->pPage = pNew;
// 重置idx为0
pCur->idx = 0;
return SQLITE_OK;
}
static int moveToLeftmost(BtCursor *pCur){
Pgno pgno;
int rc;
// 沿着最左的child指针不断移动
while( (pgno = pCur->pPage->apCell[pCur->idx]->h.leftChild)!=0 ){
rc = moveToChild(pCur, pgno);
if( rc ) return rc;
}
return SQLITE_OK;
}
static int moveToChild(BtCursor *pCur, int newPgno){
int rc;
MemPage *pNewPage;
// 载入page,并修改cursor指向的page
rc = sqlitepager_get(pCur->pBt->pPager, newPgno, (void**)&pNewPage);
if( rc ) return rc;
rc = initPage(pNewPage, newPgno, pCur->pPage);
if( rc ) return rc;
sqlitepager_unref(pCur->pPage);
pCur->pPage = pNewPage;
pCur->idx = 0;
return SQLITE_OK;
}
|
sqliteBtreeNext
移动到下一个KV。如果这一层还没遍历完,先将idx指向下一个Cell,然后移动到该Cell下层的最左节点。否则向上层回溯,由于采用的是B树实现,回溯到上层的有效节点就可以返回了,因为中间节点也存储着数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
int sqliteBtreeNext(BtCursor *pCur, int *pRes){
int rc;
if( pCur->pPage==0 ){
if( pRes ) *pRes = 1;
return SQLITE_ABORT;
}
// 之前的操作已经指向了下一个元素,直接返回
if( pCur->bSkipNext && pCur->idx<pCur->pPage->nCell ){
pCur->bSkipNext = 0;
if( pRes ) *pRes = 0;
return SQLITE_OK;
}
// idx + 1
pCur->idx++;
// 已经遍历完这个page
if( pCur->idx>=pCur->pPage->nCell ){
// 如果存在右子树
if( pCur->pPage->u.hdr.rightChild ){
// 移动到右子树
rc = moveToChild(pCur, pCur->pPage->u.hdr.rightChild);
if( rc ) return rc;
// 移动到右子树下的最左节点
rc = moveToLeftmost(pCur);
if( rc ) return rc;
if( pRes ) *pRes = 0;
return SQLITE_OK;
}
do{
// pParent == 0 说明整个树已遍历完
if( pCur->pPage->pParent==0 ){
if( pRes ) *pRes = 1;
return SQLITE_OK;
}
// 移动到parent
rc = moveToParent(pCur);
if( rc ) return rc;
}while( pCur->idx>=pCur->pPage->nCell );
// B树的中间节点也存储数据,因此这里返回了。
if( pRes ) *pRes = 0;
return SQLITE_OK;
}
// 移动到该节点下的最左节点
rc = moveToLeftmost(pCur);
if( rc ) return rc;
if( pRes ) *pRes = 0;
return SQLITE_OK;
}
static int moveToParent(BtCursor *pCur){
Pgno oldPgno;
MemPage *pParent;
int i;
pParent = pCur->pPage->pParent;
if( pParent==0 ) return SQLITE_INTERNAL;
oldPgno = sqlitepager_pagenumber(pCur->pPage);
sqlitepager_ref(pParent);
sqlitepager_unref(pCur->pPage);
pCur->pPage = pParent;
pCur->idx = pParent->nCell;
for(i=0; i<pParent->nCell; i++){
if( pParent->apCell[i]->h.leftChild==oldPgno ){
// 设置idx为包含原child的Cell的下标
pCur->idx = i;
break;
}
}
return SQLITE_OK;
}
|
sqliteBtreeMoveto
将Cursor移动到指定key附近的KV。在key不存在时,有可能指向在该key之前或之后的key,使用*pRes的值来表示不同的结果(<0 之前的key,=0 恰好指向,>0 之后的key)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
int sqliteBtreeMoveto(BtCursor *pCur, const void *pKey, int nKey, int *pRes){
int rc;
if( pCur->pPage==0 ) return SQLITE_ABORT;
pCur->bSkipNext = 0;
// 先移动到root页
rc = moveToRoot(pCur);
if( rc ) return rc;
for(;;){
int lwr, upr;
Pgno chldPg;
MemPage *pPage = pCur->pPage;
int c = -1;
lwr = 0;
upr = pPage->nCell-1;
// 二分查找
while( lwr<=upr ){
pCur->idx = (lwr+upr)/2;
// 比较当前idx指向的Cell和key的大小关系,必要时会加载overflow页的数据。
rc = sqliteBtreeKeyCompare(pCur, pKey, nKey, 0, &c);
if( rc ) return rc;
if( c==0 ){
pCur->iMatch = c;
if( pRes ) *pRes = 0;
return SQLITE_OK;
}
if( c<0 ){
lwr = pCur->idx+1;
}else{
upr = pCur->idx-1;
}
}
assert( lwr==upr+1 );
if( lwr>=pPage->nCell ){
chldPg = pPage->u.hdr.rightChild;
}else{
chldPg = pPage->apCell[lwr]->h.leftChild;
}
// 没有child页了则返回
if( chldPg==0 ){
pCur->iMatch = c;
if( pRes ) *pRes = c;
return SQLITE_OK;
}
// 从该child处继续找
rc = moveToChild(pCur, chldPg);
if( rc ) return rc;
}
/* NOT REACHED */
}
|
插入删除¶
在插入KV时,先将Cursor移动到key附近,然后根据是否存在key做一些不同apCell
数组的维护工作,最后调用rebalance来做树平衡。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
/*
** Insert a new record into the BTree. The key is given by (pKey,nKey)
** and the data is given by (pData,nData). The cursor is used only to
** define what database the record should be inserted into. The cursor
** is left pointing at the new record.
*/
int sqliteBtreeInsert(
BtCursor *pCur, /* Insert data into the table of this cursor */
const void *pKey, int nKey, /* The key of the new record */
const void *pData, int nData /* The data of the new record */
){
Cell newCell;
int rc;
int loc;
int szNew;
MemPage *pPage;
Btree *pBt = pCur->pBt;
if( pCur->pPage==0 ){
return SQLITE_ABORT; /* A rollback destroyed this cursor */
}
if( !pCur->pBt->inTrans || nKey+nData==0 ){
return SQLITE_ERROR; /* Must start a transaction first */
}
if( !pCur->wrFlag ){
return SQLITE_PERM; /* Cursor not open for writing */
}
// 移动到key附近
rc = sqliteBtreeMoveto(pCur, pKey, nKey, &loc);
if( rc ) return rc;
pPage = pCur->pPage;
rc = sqlitepager_write(pPage);
if( rc ) return rc;
// 分配一个cell来存放key/value,可能有overflow 页
rc = fillInCell(pBt, &newCell, pKey, nKey, pData, nData);
if( rc ) return rc;
szNew = cellSize(&newCell);
if( loc==0 ){
// 恰好匹配,需要删除之前的Cell。
// 调用clearCell释放涉及的overflow页,调用dropCell来从apCell数组中删除。
newCell.h.leftChild = pPage->apCell[pCur->idx]->h.leftChild;
rc = clearCell(pBt, pPage->apCell[pCur->idx]);
if( rc ) return rc;
dropCell(pPage, pCur->idx, cellSize(pPage->apCell[pCur->idx]));
}else if( loc<0 && pPage->nCell>0 ){
assert( pPage->u.hdr.rightChild==0 ); /* Must be a leaf page */
// idx+1指向下一个位置,和下面的else得到相同的结果
pCur->idx++;
}else{
// rightChild说明是leaf页
assert( pPage->u.hdr.rightChild==0 ); /* Must be a leaf page */
}
// 将新的Cell插入,执行完成后有可能超出page size,后面的balance会处理。
insertCell(pPage, pCur->idx, &newCell, szNew);
rc = balance(pCur->pBt, pPage, pCur);
/* sqliteBtreePageDump(pCur->pBt, pCur->pgnoRoot, 1); */
/* fflush(stdout); */
return rc;
}
static void insertCell(MemPage *pPage, int i, Cell *pCell, int sz){
int idx, j;
assert( i>=0 && i<=pPage->nCell );
assert( sz==cellSize(pCell) );
assert( sqlitepager_iswriteable(pPage) );
// 分配空间不足,idx==0说明没有足够的空间。
idx = allocateSpace(pPage, sz);
for(j=pPage->nCell; j>i; j--){
pPage->apCell[j] = pPage->apCell[j-1];
}
pPage->nCell++;
if( idx<=0 ){
// 空间不足,标记已overfull
pPage->isOverfull = 1;
pPage->apCell[i] = pCell;
}else{
memcpy(&pPage->u.aDisk[idx], pCell, sz);
pPage->apCell[i] = (Cell*)&pPage->u.aDisk[idx];
}
}
static int allocateSpace(MemPage *pPage, int nByte){
FreeBlk *p;
u16 *pIdx;
int start;
int cnt = 0;
assert( sqlitepager_iswriteable(pPage) );
assert( nByte==ROUNDUP(nByte) );
// 空间不足或者之前已经不足过
if( pPage->nFree<nByte || pPage->isOverfull ) return 0;
pIdx = &pPage->u.hdr.firstFree;
p = (FreeBlk*)&pPage->u.aDisk[*pIdx];
while( p->iSize<nByte ){
assert( cnt++ < SQLITE_PAGE_SIZE/4 );
if( p->iNext==0 ){
// 没有一个足够大的空闲块,需要整理页。将所有的Cell移动到页的开始,最后有一个大的空闲块。
defragmentPage(pPage);
pIdx = &pPage->u.hdr.firstFree;
}else{
pIdx = &p->iNext;
}
p = (FreeBlk*)&pPage->u.aDisk[*pIdx];
}
// 正好匹配
if( p->iSize==nByte ){
start = *pIdx;
*pIdx = p->iNext;
}else{
// 分裂出一个新块
FreeBlk *pNew;
start = *pIdx;
pNew = (FreeBlk*)&pPage->u.aDisk[start + nByte];
pNew->iNext = p->iNext;
pNew->iSize = p->iSize - nByte;
*pIdx = start + nByte;
}
pPage->nFree -= nByte;
return start;
}
|
sqliteBtreeDelete
用于删除Cursor当前指向的KV。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
int sqliteBtreeDelete(BtCursor *pCur){
MemPage *pPage = pCur->pPage;
Cell *pCell;
int rc;
Pgno pgnoChild;
if( pCur->pPage==0 ){
return SQLITE_ABORT; /* A rollback destroyed this cursor */
}
if( !pCur->pBt->inTrans ){
return SQLITE_ERROR; /* Must start a transaction first */
}
if( pCur->idx >= pPage->nCell ){
return SQLITE_ERROR; /* The cursor is not pointing to anything */
}
if( !pCur->wrFlag ){
return SQLITE_PERM; /* Did not open this cursor for writing */
}
rc = sqlitepager_write(pPage);
if( rc ) return rc;
pCell = pPage->apCell[pCur->idx];
pgnoChild = pCell->h.leftChild;
clearCell(pCur->pBt, pCell);
if( pgnoChild ){
/*
** The entry we are about to delete is not a leaf so if we do not
** do something we will leave a hole on an internal page.
** We have to fill the hole by moving in a cell from a leaf. The
** next Cell after the one to be deleted is guaranteed to exist and
** to be a leaf so we can use it.
*/
// 如果不是叶节点,将该节点的下一个节点移动过来,以满足B树的要求。
BtCursor leafCur;
Cell *pNext;
int szNext;
getTempCursor(pCur, &leafCur);
// 找到next节点
rc = sqliteBtreeNext(&leafCur, 0);
if( rc!=SQLITE_OK ){
return SQLITE_CORRUPT;
}
rc = sqlitepager_write(leafCur.pPage);
if( rc ) return rc;
// 从当前page drop当前cell
dropCell(pPage, pCur->idx, cellSize(pCell));
pNext = leafCur.pPage->apCell[leafCur.idx];
szNext = cellSize(pNext);
// 修改next节点的leftChild值
pNext->h.leftChild = pgnoChild;
// 将next节点插入到当前的page
insertCell(pPage, pCur->idx, pNext, szNext);
// 平衡
rc = balance(pCur->pBt, pPage, pCur);
if( rc ) return rc;
pCur->bSkipNext = 1;
// 将next节点drop掉
dropCell(leafCur.pPage, leafCur.idx, szNext);
// 平衡
rc = balance(pCur->pBt, leafCur.pPage, pCur);
releaseTempCursor(&leafCur);
}else{
// drop cell
dropCell(pPage, pCur->idx, cellSize(pCell));
if( pCur->idx>=pPage->nCell ){
pCur->idx = pPage->nCell-1;
if( pCur->idx<0 ){
pCur->idx = 0;
pCur->bSkipNext = 1;
}else{
pCur->bSkipNext = 0;
}
}else{
pCur->bSkipNext = 1;
}
rc = balance(pCur->pBt, pPage, pCur);
}
return rc;
}
|
平衡操作¶
在看balance实现之前,先看几个辅助函数
relinkCellList
用于在插入或删除Cell后,更新MemPage.u.aDisk
中的next指针,这是因为在insertCell和dropCell中只处理了MemPage.apCell,没有处理aDisk中的iNext。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
static void relinkCellList(MemPage *pPage){
int i;
u16 *pIdx;
assert( sqlitepager_iswriteable(pPage) );
pIdx = &pPage->u.hdr.firstCell;
for(i=0; i<pPage->nCell; i++){
int idx = Addr(pPage->apCell[i]) - Addr(pPage);
assert( idx>0 && idx<SQLITE_PAGE_SIZE );
*pIdx = idx;
pIdx = &pPage->apCell[i]->h.iNext;
}
*pIdx = 0;
}
|
copyPage
用于拷贝存在overfull时的page,对于不在page的Cell,拷贝后的page仍然指向原内存。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
static void copyPage(MemPage *pTo, MemPage *pFrom){
uptr from, to;
int i;
memcpy(pTo->u.aDisk, pFrom->u.aDisk, SQLITE_PAGE_SIZE);
pTo->pParent = 0;
pTo->isInit = 1;
pTo->nCell = pFrom->nCell;
pTo->nFree = pFrom->nFree;
pTo->isOverfull = pFrom->isOverfull;
to = Addr(pTo);
from = Addr(pFrom);
for(i=0; i<pTo->nCell; i++){
uptr x = Addr(pFrom->apCell[i]);
// 如果在page范围内,更新,否则用from的值
if( x>from && x<from+SQLITE_PAGE_SIZE ){
*((uptr*)&pTo->apCell[i]) = x + to - from;
}else{
pTo->apCell[i] = pFrom->apCell[i];
}
}
}
|
reparentChildPages
用于在移动page后修改内存中的child页的pParent
指针,因为pParent
指针仅在内存中存储,所以在实现时不需要从磁盘加载没有缓存的page。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
static void reparentChildPages(Pager *pPager, MemPage *pPage){
int i;
for(i=0; i<pPage->nCell; i++){
reparentPage(pPager, pPage->apCell[i]->h.leftChild, pPage);
}
reparentPage(pPager, pPage->u.hdr.rightChild, pPage);
}
static void reparentPage(Pager *pPager, Pgno pgno, MemPage *pNewParent){
MemPage *pThis;
if( pgno==0 ) return;
assert( pPager!=0 );
// 只需要修改内存中缓存的页的parent,因此调lookup而不是get
pThis = sqlitepager_lookup(pPager, pgno);
if( pThis && pThis->isInit ){
if( pThis->pParent!=pNewParent ){
if( pThis->pParent ) sqlitepager_unref(pThis->pParent);
pThis->pParent = pNewParent;
if( pNewParent ) sqlitepager_ref(pNewParent);
}
sqlitepager_unref(pThis);
}
}
|
铺垫完成,下面来看B树的平衡实现。
balance
用于实现B树的平衡,它的基本思想很简单,找到页面相邻的两个兄弟页面,然后根据这三个页面的总空间需求,分配新的页面来存储,新的页面的free空间尽可能均匀。最终兄弟页面的个数可能会加1或减1,目标是页面的空间使用在66%到100%之间。
1 先看root page的处理。
- 如果Cell个数为0,且有rightChild,将rightChild的页拷贝过来作为新的root page。拷贝之后需要修改内存中child页的parent指针。
- 如果root页overfull,创建一个新的child页拷贝过去,然后将root页设置为空,并且rightChild指向新页,后面的逻辑会执行新页的split。通过这样的处理可以复用非root页的balance逻辑。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
static int balance(Btree *pBt, MemPage *pPage, BtCursor *pCur){
// 省略一些变量定义
pParent = pPage->pParent;
// 1 root页处理
if( pParent==0 ){
Pgno pgnoChild;
MemPage *pChild;
if( pPage->nCell==0 ){
// 如果root page没有cell但rightChild不为空
if( pPage->u.hdr.rightChild ){
/*
** The root page is empty. Copy the one child page
** into the root page and return. This reduces the depth
** of the BTree by one.
*/
pgnoChild = pPage->u.hdr.rightChild;
rc = sqlitepager_get(pBt->pPager, pgnoChild, (void**)&pChild);
if( rc ) return rc;
// 拷贝过来后重新初始化
memcpy(pPage, pChild, SQLITE_PAGE_SIZE);
pPage->isInit = 0;
rc = initPage(pPage, sqlitepager_pagenumber(pPage), 0);
assert( rc==SQLITE_OK );
// 拷贝之后,修改内存中的child页的parent指针
reparentChildPages(pBt->pPager, pPage);
if( pCur && pCur->pPage==pChild ){
sqlitepager_unref(pChild);
pCur->pPage = pPage;
sqlitepager_ref(pPage);
}
freePage(pBt, pChild, pgnoChild);
sqlitepager_unref(pChild);
}else{
relinkCellList(pPage);
}
return SQLITE_OK;
}
if( !pPage->isOverfull ){
/* It is OK for the root page to be less than half full.
*/
relinkCellList(pPage);
return SQLITE_OK;
}
/*
** If we get to here, it means the root page is overfull.
** When this happens, Create a new child page and copy the
** contents of the root into the child. Then make the root
** page an empty page with rightChild pointing to the new
** child. Then fall thru to the code below which will cause
** the overfull child page to be split.
*/
// root页overfull,创建一个新的child页拷贝过去,
// 然后将root页设置为空,并且rightChild指向新页,后面的逻辑会执行新页的split
rc = sqlitepager_write(pPage);
if( rc ) return rc;
rc = allocatePage(pBt, &pChild, &pgnoChild);
if( rc ) return rc;
assert( sqlitepager_iswriteable(pChild) );
// 和上面memcpy然后initPage的区别在于,copyPage会考虑overfull,这些cell仍然指向pFrom的原始内存
copyPage(pChild, pPage);
pChild->pParent = pPage;
sqlitepager_ref(pPage);
pChild->isOverfull = 1;
if( pCur && pCur->pPage==pPage ){
sqlitepager_unref(pPage);
pCur->pPage = pChild;
}else{
extraUnref = pChild;
}
zeroPage(pPage);
pPage->u.hdr.rightChild = pgnoChild;
pParent = pPage;
pPage = pChild;
}
// 需要修改parent页
rc = sqlitepager_write(pParent);
if( rc ) return rc;
|
2 在页的左右各取一个兄弟。如果页是最左或最右时,会取同一边的两个兄弟。如果兄弟个数少于等于3,会用上所有的兄弟页。这些页的Cell加上parent中分割的Cell会一起计算总的空间需求,用于分配新页。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
// 省略一些逻辑,设置idx为parent页指向本页的Cell的下标
// 2 在页左右各取一个兄弟。如果页是最左或最右时,会取同一边的两个兄弟。
// 如果兄弟个数少于等于3,会用上所有的兄弟页。
if( idx==pParent->nCell ){
nxDiv = idx - 2;
}else{
nxDiv = idx - 1;
}
if( nxDiv<0 ) nxDiv = 0;
nDiv = 0;
for(i=0, k=nxDiv; i<3; i++, k++){
if( k<pParent->nCell ){
// parent只分割Cell的index
idxDiv[i] = k;
// 获取parent中的分割Cell,会和兄弟page的Cell一起用于分配新页
apDiv[i] = pParent->apCell[k];
nDiv++;
pgnoOld[i] = apDiv[i]->h.leftChild;
}else if( k==pParent->nCell ){
pgnoOld[i] = pParent->u.hdr.rightChild;
}else{
break;
}
// 加载兄弟页
rc = sqlitepager_get(pBt->pPager, pgnoOld[i], (void**)&apOld[i]);
if( rc ) goto balance_cleanup;
rc = initPage(apOld[i], pgnoOld[i], pParent);
if( rc ) goto balance_cleanup;
nOld++;
}
|
3 将找到的兄弟页拷贝到内存中的临时页,因为这些兄弟页后续可能会修改。
1
2
3
4
5
6
7
8
9
|
// 3 将这些页拷贝到内存中新页,因为原始页有可能修改。
for(i=0; i<nOld; i++){
copyPage(&aOld[i], apOld[i]);
rc = freePage(pBt, apOld[i], pgnoOld[i]);
if( rc ) goto balance_cleanup;
sqlitepager_unref(apOld[i]);
apOld[i] = &aOld[i];
}
|
4 计算兄弟页和parent中的分割Cell需要使用的空间,把这些Cell都拷贝到apCell数组,然后计算需要分配多少新页,每个新页的剩余空间保证相对均匀。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
// 4 计算兄弟页和parent中的分割Cell需要使用的空间,把这些Cell都拷贝到apCell数组
Cell *apCell[MX_CELL*3+5]; /* All cells from pages being balanceed */
int szCell[MX_CELL*3+5]; /* Local size of all cells */
nCell = 0;
for(i=0; i<nOld; i++){
MemPage *pOld = apOld[i];
for(j=0; j<pOld->nCell; j++){
apCell[nCell] = pOld->apCell[j];
szCell[nCell] = cellSize(apCell[nCell]);
nCell++;
}
// 拷贝parent中的分割Cell
if( i<nOld-1 ){
szCell[nCell] = cellSize(apDiv[i]);
memcpy(&aTemp[i], apDiv[i], szCell[nCell]);
apCell[nCell] = &aTemp[i];
// 临时从parent中删除
dropCell(pParent, nxDiv, szCell[nCell]);
assert( apCell[nCell]->h.leftChild==pgnoOld[i] );
apCell[nCell]->h.leftChild = pOld->u.hdr.rightChild;
nCell++;
}
}
// 计算总的size
totalSize = 0;
for(i=0; i<nCell; i++){
totalSize += szCell[i];
}
int cntNew[4]; /* Index in apCell[] of cell after i-th page */
int szNew[4]; /* Combined size of cells place on i-th page */
for(subtotal=k=i=0; i<nCell; i++){
subtotal += szCell[i];
// #define USABLE_SPACE (SQLITE_PAGE_SIZE - sizeof(PageHdr))
// 分配一个新页
if( subtotal > USABLE_SPACE ){
szNew[k] = subtotal - szCell[i];
cntNew[k] = i;
subtotal = 0;
k++;
}
}
szNew[k] = subtotal;
cntNew[k] = nCell;
k++;
// 经过上一个循环,前面的页会接近满,而后面的页有可能接近空
// 下面的循环再进行调整,至少占满USABLE_SPACE/2
for(i=k-1; i>0; i--){
while( szNew[i]<USABLE_SPACE/2 ){
// 从前面挪Cell
cntNew[i-1]--;
assert( cntNew[i-1]>0 );
szNew[i] += szCell[cntNew[i-1]];
szNew[i-1] -= szCell[cntNew[i-1]-1];
}
}
assert( cntNew[0]>0 );
// 分配k个新页
for(i=0; i<k; i++){
rc = allocatePage(pBt, &apNew[i], &pgnoNew[i]);
if( rc ) goto balance_cleanup;
nNew++;
zeroPage(apNew[i]);
apNew[i]->isInit = 1;
}
|
5 将新页按照页号排序,然后写入Cell到新页,并把分割的Cell插入到parent中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
// 5 将新页按照页号排序,排序后磁盘文件是有序的,scan操作是线性的。注释中说排序可以让大的插入和删除快25%。
for(i=0; i<k-1; i++){
int minV = pgnoNew[i];
int minI = i;
for(j=i+1; j<k; j++){
if( pgnoNew[j]<minV ){
minI = j;
minV = pgnoNew[j];
}
}
if( minI>i ){
int t;
MemPage *pT;
t = pgnoNew[i];
pT = apNew[i];
pgnoNew[i] = pgnoNew[minI];
apNew[i] = apNew[minI];
pgnoNew[minI] = t;
apNew[minI] = pT
}
}
/*
** Evenly distribute the data in apCell[] across the new pages.
** Insert divider cells into pParent as necessary.
*/
j = 0;
for(i=0; i<nNew; i++){
MemPage *pNew = apNew[i];
while( j<cntNew[i] ){
assert( pNew->nFree>=szCell[j] );
// 插入到页中
insertCell(pNew, pNew->nCell, apCell[j], szCell[j]);
j++;
}
assert( pNew->nCell>0 );
assert( !pNew->isOverfull );
// 维护Cell的next指针
relinkCellList(pNew);
if( i<nNew-1 && j<nCell ){
// 按照B树的性质,右兄弟节点的子结点也大于自己,所以这里是设置rightChild
pNew->u.hdr.rightChild = apCell[j]->h.leftChild;
// 设置parent的分割Cell的leftChild为pgnoNew[i]
apCell[j]->h.leftChild = pgnoNew[i];
// 分割的Cell插入parent
insertCell(pParent, nxDiv, apCell[j], szCell[j]);
j++;
nxDiv++;
}
}
assert( j==nCell );
apNew[nNew-1]->u.hdr.rightChild = apOld[nOld-1]->u.hdr.rightChild;
if( nxDiv==pParent->nCell ){
pParent->u.hdr.rightChild = pgnoNew[nNew-1];
}else{
pParent->apCell[nxDiv]->h.leftChild = pgnoNew[nNew-1];
}
|
6 最后做一些维护工作。由于balance可能修改parent,所以对parent调用balance。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
/*
** Reparent children of all cells.
*/
for(i=0; i<nNew; i++){
reparentChildPages(pBt->pPager, apNew[i]);
}
reparentChildPages(pBt->pPager, pParent);
/*
** balance the parent page.
*/
rc = balance(pBt, pParent, pCur);
|
事务操作¶
多个读事务可以同时执行。
写事务执行时,其他读写事务都不能执行。
事务实现基本就是调用下Pager的事务操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
int sqliteBtreeBeginTrans(Btree *pBt){
int rc;
if( pBt->inTrans ) return SQLITE_ERROR;
if( pBt->page1==0 ){
// 加载页面,如果数据库不为空,校验magic number
rc = lockBtree(pBt);
if( rc!=SQLITE_OK ){
return rc;
}
}
if( pBt->readOnly ){
rc = SQLITE_OK;
}else{
rc = sqlitepager_begin(pBt->page1);
if( rc==SQLITE_OK ){
rc = newDatabase(pBt);
}
}
if( rc==SQLITE_OK ){
pBt->inTrans = 1;
pBt->inCkpt = 0;
}else{
unlockBtreeIfUnused(pBt);
}
return rc;
}
int sqliteBtreeRollback(Btree *pBt){
int rc;
BtCursor *pCur;
if( pBt->inTrans==0 ) return SQLITE_OK;
pBt->inTrans = 0;
pBt->inCkpt = 0;
for(pCur=pBt->pCursor; pCur; pCur=pCur->pNext){
if( pCur->pPage ){
sqlitepager_unref(pCur->pPage);
pCur->pPage = 0;
}
}
rc = pBt->readOnly ? SQLITE_OK : sqlitepager_rollback(pBt->pPager);
unlockBtreeIfUnused(pBt);
return rc;
}
int sqliteBtreeCommit(Btree *pBt){
int rc;
if( pBt->inTrans==0 ) return SQLITE_ERROR;
rc = pBt->readOnly ? SQLITE_OK : sqlitepager_commit(pBt->pPager);
pBt->inTrans = 0;
pBt->inCkpt = 0;
unlockBtreeIfUnused(pBt);
return rc;
}
|
对比BoltDB¶
- sqlite自己做page buffer管理,需要处理page淘汰、脏页写盘,而BoltDB使用系统mmap来管理内存,很多管理逻辑都委托给OS,实现相对简单。
- 这个版本的sqlite只有B树实现,而BoltDB使用B+树实现。
- sqlite的rebalance实现更加复杂,BoltDB相对简单。
代码技巧¶
sqlite实现中的一些代码技巧值得学习
-
将地址当作数组访问,然后方便的定位到该结构体前后的内存。
1
|
#define PGHDR_TO_DATA(P) ((void*)(&(P)[1]))
|
-
通过指针来简化一些处理,类似于Linus:利用二级指针删除单向链表 | 酷 壳 - CoolShell的技巧。
比如下面的代码中,如果不用指针,firstCell和iNext需要分别去设置,但通过指针把它们的设置逻辑统一了起来。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
static void relinkCellList(MemPage *pPage){
int i;
u16 *pIdx;
assert( sqlitepager_iswriteable(pPage) );
pIdx = &pPage->u.hdr.firstCell;
for(i=0; i<pPage->nCell; i++){
int idx = Addr(pPage->apCell[i]) - Addr(pPage);
assert( idx>0 && idx<SQLITE_PAGE_SIZE );
*pIdx = idx;
pIdx = &pPage->apCell[i]->h.iNext;
}
*pIdx = 0;
}
|