https://github.com/madushadhanushka/simple-sqlite 是从sqlite 2.5.0 抽取出的核心逻辑,适合用来学习sqlite的后端实现,如

  1. 如何实现page buffer
  2. 如何实现基于磁盘的B树
  3. 如何通过rollback journaling来实现事务支持和crash recovery

page buffer bool

page和pager实现

先看page的实现。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// page size 大小
#define SQLITE_PAGE_SIZE 1024

// 页号。0 代表 "not a page", 文件中的第一个页从1开始。
typedef unsigned int Pgno;

/*
** Each in-memory image of a page begins with the following header.
** This header is only visible to this pager module.  The client
** code that calls pager sees only the data that follows the header.
*/
typedef struct PgHdr PgHdr;
struct PgHdr {
  Pager *pPager;                 /* The pager to which this page belongs */
  Pgno pgno;                     /* The page number for this page */
  PgHdr *pNextHash, *pPrevHash;  /* Hash collision chain for PgHdr.pgno */
  int nRef;                      /* Number of users of this page */
  PgHdr *pNextFree, *pPrevFree;  /* Freelist of pages where nRef==0 */
  PgHdr *pNextAll, *pPrevAll;    /* A list of all pages */
  char inJournal;                /* TRUE if has been written to journal */
  char inCkpt;                   /* TRUE if written to the checkpoint journal */
  char dirty;                    /* TRUE if we need to write back changes */
  /* SQLITE_PAGE_SIZE bytes of page data follow this header */
  /* Pager.nExtra bytes of local data follow the page data */
};

每个page的大小为SQLITE_PAGE_SIZE。PgHdr是page前面的一段内存,用于簿记一些信息,如是否是脏页(dirty),是否已写入journal或checkpoint文件(inJournal, inCkpt),页号(pgno),引用数(nRef)等。此外还记录一些双链表的next/prev指针,pNextHash记录哈希表的冲突链,pNextFree记录空闲页,pNextAll记录所有页。

在内存中布局如下

1
|    sizeof(PgHdr)   |        SQLITE_PAGE_SIZE     |   Pager.nExtra      |

按照上面的布局,定义了如下的宏来做内存地址的转换

1
2
3
4
5
6
7
8
/*
** Convert a pointer to a PgHdr into a pointer to its data
** and back again.
*/

#define PGHDR_TO_DATA(P)  ((void*)(&(P)[1]))
#define DATA_TO_PGHDR(D)  (&((PgHdr*)(D))[-1])
#define PGHDR_TO_EXTRA(P) ((void*)&((char*)(&(P)[1]))[SQLITE_PAGE_SIZE])

我们看PGHDR_TO_DATA的实现。最简单的情况下, PgHdr* P = init xxx; 把P当作一个数组,P[1] 相当于 P + sizeof(PgHdr),指向了page的内存,&P[1]的类型仍然为PgHdr*, 再将其转为void* 类型。sqlite实现时大量采用了类似的技巧。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/*
** A open page cache is an instance of the following structure.
*/
struct Pager {
  char *zFilename;            /* Name of the database file */
  char *zJournal;             /* Name of the journal file */
  OsFile fd, jfd;             /* File descriptors for database and journal */
  OsFile cpfd;                /* File descriptor for the checkpoint journal */
  // page个数
  int dbSize;                 /* Number of pages in the file */
  int origDbSize;             /* dbSize before the current change */
  int ckptSize, ckptJSize;    /* Size of database and journal at ckpt_begin() */
  int nExtra;                 /* Add this many bytes to each in-memory page */
  // 用于做一些B树的清理工作
  void (*xDestructor)(void*); /* Call this routine when freeing pages */
  int nPage;                  /* Total number of in-memory pages */
  int nRef;                   /* Number of in-memory pages with PgHdr.nRef>0 */
  int mxPage;                 /* Maximum number of pages to hold in cache */
  int nHit, nMiss, nOvfl;     /* Cache hits, missing, and LRU overflows */
  u8 journalOpen;             /* True if journal file descriptors is valid */
  u8 ckptOpen;                /* True if the checkpoint journal is open */
  u8 ckptInUse;               /* True we are in a checkpoint */
  u8 noSync;                  /* Do not sync the journal if true */
  u8 state;                   /* SQLITE_UNLOCK, _READLOCK or _WRITELOCK */
  u8 errMask;                 /* One of several kinds of errors */
  u8 tempFile;                /* zFilename is a temporary file */
  u8 readOnly;                /* True for a read-only database */
  u8 needSync;                /* True if an fsync() is needed on the journal */
  u8 dirtyFile;               /* True if database file has changed in any way */
  u8 *aInJournal;             /* One bit for each page in the database file */
  u8 *aInCkpt;                /* One bit for each page in the database */
  PgHdr *pFirst, *pLast;      /* List of free pages */
  PgHdr *pAll;                /* List of all pages */
  // 用于根据页号快速定位到页
  PgHdr *aHash[N_PG_HASH];    /* Hash table to map page number of PgHdr */
};

Pager表示page cache实现,成员比较多,但都有注释。

1
2
3
4
5
6
int sqlitepager_open(
  Pager **ppPager,         /* Return the Pager structure here */
  const char *zFilename,   /* Name of the database file to open */
  int mxPage,              /* Max number of in-memory cache pages */
  int nExtra               /* Extra bytes append to each in-memory page */
)

sqlitepager_open用于创建一个Pager,其实现稍长,但逻辑比较简单,首先打开数据库文件,然后做成员的初始化。这里不介绍其具体实现。参数中的mxPage控制内存中的最大页数,nExtra是上面的page部局中最后额外分配的一段内存。

pager_lookup用于在内存中根据页号来查询页,如果没有缓存,不会从磁盘中加载。主要逻辑就是在哈希表中查找。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
/*
** Find a page in the hash table given its page number.  Return
** a pointer to the page or NULL if not found.
*/
static PgHdr *pager_lookup(Pager *pPager, Pgno pgno){
  PgHdr *p = pPager->aHash[pgno % N_PG_HASH];
  while( p && p->pgno!=pgno ){
    p = p->pNextHash;
  }
  return p;
}

pager_get和pager_lookup的不同在于,在没有命中缓存时会从磁盘中加载。在没有超过内存页数限制或者超过了但没有空闲页时,会malloc新的内存;否则从空闲页中重用page,如果所有的空闲页都是脏页,执行刷盘,这样所有的空闲页都不是脏页了。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
int sqlitepager_get(Pager *pPager, Pgno pgno, void **ppPage){
  PgHdr *pPg;

  /* Make sure we have not hit any critical errors.
  */ 
  if( pPager==0 || pgno==0 ){
    return SQLITE_ERROR;
  }
  if( pPager->errMask & ~(PAGER_ERR_FULL) ){
    return pager_errcode(pPager);
  }

  /* If this is the first page accessed, then get a read lock
  ** on the database file.
  */
  if( pPager->nRef==0 ){
    if( sqliteOsReadLock(&pPager->fd)!=SQLITE_OK ){
      *ppPage = 0;
      return SQLITE_BUSY;
    }
    pPager->state = SQLITE_READLOCK;

    /* If a journal file exists, try to play it back.
    */
    // nRef == 0,又存在journal文件,说明是崩溃后重启,需要从journal文件恢复
    if( sqliteOsFileExists(pPager->zJournal) ){
    }
    pPg = 0;
  }else{
    /* Search for page in cache */
    pPg = pager_lookup(pPager, pgno);
  }
  // 没有命中缓存
  if( pPg==0 ){
    /* The requested page is not in the page cache. */
    int h;
    pPager->nMiss++;
    // 没有超过内存上限,或者没有空闲页
    if( pPager->nPage<pPager->mxPage || pPager->pFirst==0 ){
      /* Create a new page */
      // 内存分配和我们上面的分析是一致的
      pPg = malloc( sizeof(*pPg) + SQLITE_PAGE_SIZE + pPager->nExtra  );
      memset(pPg, 0, sizeof(*pPg) + SQLITE_PAGE_SIZE + pPager->nExtra );
      pPg->pPager = pPager;
      pPg->pNextAll = pPager->pAll;
      if( pPager->pAll ){
        pPager->pAll->pPrevAll = pPg;
      }
      pPg->pPrevAll = 0;
      pPager->pAll = pPg;
      pPager->nPage++;
    }else{
      /* Recycle an older page.  First locate the page to be recycled.
      ** Try to find one that is not dirty and is near the head of
      ** of the free list */
      pPg = pPager->pFirst;
      // 当前没有任何引用的页为free页,但仍然可能为脏页
      while( pPg && pPg->dirty ){
        pPg = pPg->pNextFree;
      }

      // 如果找不到可用的空闲页,则将所有的修改同步磁盘。
      // 之所以同步所有的页而非只同步一个页,是因为防止短期后续出现需要同步的情况。
      if( pPg==0 ){
        int rc = syncAllPages(pPager);
        if( rc!=0 ){
          sqlitepager_rollback(pPager);
          *ppPage = 0;
          return SQLITE_IOERR;
        }
        pPg = pPager->pFirst;
      }
      assert( pPg->nRef==0 );
      assert( pPg->dirty==0 );

      /* Unlink the old page from the free list and the hash table
      */
      // 省略free链,hash链的维护逻辑
      pPager->nOvfl++;
    }
    pPg->pgno = pgno;
    // 设置是否在journal或checkpoint中
    if( pPager->aInJournal && (int)pgno<=pPager->origDbSize ){
      pPg->inJournal = (pPager->aInJournal[pgno/8] & (1<<(pgno&7)))!=0;
    }else{
      pPg->inJournal = 0;
    }
    if( pPager->aInCkpt && (int)pgno<=pPager->ckptSize ){
      pPg->inCkpt = (pPager->aInCkpt[pgno/8] & (1<<(pgno&7)))!=0;
    }else{
      pPg->inCkpt = 0;
    }
    pPg->dirty = 0;
    pPg->nRef = 1;
    REFINFO(pPg);
    pPager->nRef++;
    // 维护hash链
    h = pager_hash(pgno);
    pPg->pNextHash = pPager->aHash[h];
    pPager->aHash[h] = pPg;
    if( pPg->pNextHash ){
      assert( pPg->pNextHash->pPrevHash==0 );
      pPg->pNextHash->pPrevHash = pPg;
    }
    if( pPager->dbSize<0 ) sqlitepager_pagecount(pPager);
    // 对于磁盘上没有的页,置0
    if( pPager->dbSize<(int)pgno ){
      memset(PGHDR_TO_DATA(pPg), 0, SQLITE_PAGE_SIZE);
    }else{
      int rc;
      sqliteOsSeek(&pPager->fd, (pgno-1)*SQLITE_PAGE_SIZE);
      // 从磁盘读取
      rc = sqliteOsRead(&pPager->fd, PGHDR_TO_DATA(pPg), SQLITE_PAGE_SIZE);
      if( rc!=SQLITE_OK ){
        return rc;
      }
    }
    if( pPager->nExtra>0 ){
      memset(PGHDR_TO_EXTRA(pPg), 0, pPager->nExtra);
    }
  }else{
    /* The requested page is in the page cache. */
    pPager->nHit++;
    page_ref(pPg);
  }
  *ppPage = PGHDR_TO_DATA(pPg);
  return SQLITE_OK;
}

sqlite对page实现了引用计数,引用计数变为0时加入空闲链,变为非0时从空闲链移除。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
static void page_ref(PgHdr *pPg){
  if( pPg->nRef==0 ){
    /* The page is currently on the freelist.  Remove it. */
    if( pPg->pPrevFree ){
      pPg->pPrevFree->pNextFree = pPg->pNextFree;
    }else{
      pPg->pPager->pFirst = pPg->pNextFree;
    }
    if( pPg->pNextFree ){
      pPg->pNextFree->pPrevFree = pPg->pPrevFree;
    }else{
      pPg->pPager->pLast = pPg->pPrevFree;
    }
    pPg->pPager->nRef++;
  }
  pPg->nRef++;
  REFINFO(pPg);
}

事务及crash recovery实现

sqlite 3.7.0实现了wal journal,之前采用的是rollback journaling。所谓rollback journaling,是指在修改page前,先将page的内容备份到journal文件中,然后再修改内存中的page,在事务提交时成功刷新到磁盘后再移除journal文件。如果事务在成功提交之前出现crash,在重启时sqlite会从journal文件恢复page,使数据库恢复事务提交前的状态。

在事务开始时,需要调用sqlitepager_begin ,会创建journal文件和维护journal相关的标记位,记录当前文件的页数用于回滚。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
int sqlitepager_begin(void *pData){
  PgHdr *pPg = DATA_TO_PGHDR(pData);
  Pager *pPager = pPg->pPager;
  int rc = SQLITE_OK;
  if( pPager->state==SQLITE_READLOCK ){
    assert( pPager->aInJournal==0 );
    // 写锁
    rc = sqliteOsWriteLock(&pPager->fd);
    if( rc!=SQLITE_OK ){
      return rc;
    }
    // 用于记录哪些页已经写入journal文件了
    pPager->aInJournal  = malloc(  pPager->dbSize/8 + 1 );
    memset(pPager->aInJournal , 0,  pPager->dbSize/8 + 1);
    // 打开journal文件
    rc = sqliteOsOpenExclusive(pPager->zJournal, &pPager->jfd, 0);
    pPager->journalOpen = 1;
    pPager->needSync = 0;
    pPager->dirtyFile = 0;
    pPager->state = SQLITE_WRITELOCK;
    sqlitepager_pagecount(pPager);
    // 记录之前的page个数
    pPager->origDbSize = pPager->dbSize;
    // 写入magic和之前的page页数
    rc = sqliteOsWrite(&pPager->jfd, aJournalMagic, sizeof(aJournalMagic));
    if( rc==SQLITE_OK ){
      rc = sqliteOsWrite(&pPager->jfd, &pPager->dbSize, sizeof(Pgno));
    }
    if( rc!=SQLITE_OK ){
      rc = pager_unwritelock(pPager);
      if( rc==SQLITE_OK ) rc = SQLITE_FULL;
    }
  }
  return rc;
}

在修改page前,需要调用sqlitepager_write ,将page内容备份到journal文件中,并设置dirty标记。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
int sqlitepager_write(void *pData){
  PgHdr *pPg = DATA_TO_PGHDR(pData);
  Pager *pPager = pPg->pPager;
  int rc = SQLITE_OK;

  /* Check for errors
  */
  if( pPager->errMask ){ 
    return pager_errcode(pPager);
  }
  if( pPager->readOnly ){
    return SQLITE_PERM;
  }

  /* Mark the page as dirty.  If the page has already been written
  ** to the journal then we can return right away.
  */
  pPg->dirty = 1;
  if( pPg->inJournal && (pPg->inCkpt || pPager->ckptInUse==0) ){
    pPager->dirtyFile = 1;
    return SQLITE_OK;
  }

  /* If we get this far, it means that the page needs to be
  ** written to the transaction journal or the ckeckpoint journal
  ** or both.
  **
  ** First check to see that the transaction journal exists and
  ** create it if it does not.
  */
  assert( pPager->state!=SQLITE_UNLOCK );
  // 调用sqlitepager_begin,如果之前已经调用过,相当于空操作
  rc = sqlitepager_begin(pData);
  pPager->dirtyFile = 1;
  if( rc!=SQLITE_OK ) return rc;
  assert( pPager->state==SQLITE_WRITELOCK );
  assert( pPager->journalOpen );

  /* The transaction journal now exists and we have a write lock on the
  ** main database file.  Write the current page to the transaction 
  ** journal if it is not there already.
  */
  // 如果没有备份到journal且在文件原有范围内
  if( !pPg->inJournal && (int)pPg->pgno <= pPager->origDbSize ){
    // 写入页号
    rc = sqliteOsWrite(&pPager->jfd, &pPg->pgno, sizeof(Pgno));
    if( rc==SQLITE_OK ){
      // 写入page内容
      rc = sqliteOsWrite(&pPager->jfd, pData, SQLITE_PAGE_SIZE);
    }
    assert( pPager->aInJournal!=0 );
    // 设置标志位
    pPager->aInJournal[pPg->pgno/8] |= 1<<(pPg->pgno&7);
    pPager->needSync = !pPager->noSync;
    pPg->inJournal = 1;
  }

  /* Update the database size and return.
  */
  if( pPager->dbSize<(int)pPg->pgno ){
    pPager->dbSize = pPg->pgno;
  }
  return rc;
}

在事务回滚时,调用sqlitepager_rollback ,从journal文件恢复备份的page内容。对于journal文件中所有完整的页,将其读出后写入数据库文件的相应页,并更新到存在的page缓存。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
int sqlitepager_rollback(Pager *pPager){
  int rc;
  if( pPager->errMask!=0 && pPager->errMask!=PAGER_ERR_FULL ){
    if( pPager->state>=SQLITE_WRITELOCK ){
      pager_playback(pPager);
    }
    return pager_errcode(pPager);
  }
  if( pPager->state!=SQLITE_WRITELOCK ){
    return SQLITE_OK;
  }
  // 回放
  rc = pager_playback(pPager);
  if( rc!=SQLITE_OK ){
    rc = SQLITE_CORRUPT;
    pPager->errMask |= PAGER_ERR_CORRUPT;
  }
  // 设置页数为-1,后面有需要会自动获取实际的页数
  pPager->dbSize = -1;
  return rc;
}

static int pager_playback(Pager *pPager){
  int nRec;                /* Number of Records */
  int i;                   /* Loop counter */
  Pgno mxPg = 0;           /* Size of the original file in pages */
  unsigned char aMagic[sizeof(aJournalMagic)];
  int rc;

  /* Figure out how many records are in the journal.  Abort early if
  ** the journal is empty.
  */
  assert( pPager->journalOpen );
  sqliteOsSeek(&pPager->jfd, 0);
  rc = sqliteOsFileSize(&pPager->jfd, &nRec);
  if( rc!=SQLITE_OK ){
    goto end_playback;
  }
  // 校验至少需要存储一个完整的page
  nRec = (nRec - (sizeof(aMagic)+sizeof(Pgno))) / sizeof(PageRecord);
  if( nRec<=0 ){
    goto end_playback;
  }

  /* Read the beginning of the journal and truncate the
  ** database file back to its original size.
  */
  // 校验magic
  rc = sqliteOsRead(&pPager->jfd, aMagic, sizeof(aMagic));
  if( rc!=SQLITE_OK || memcmp(aMagic,aJournalMagic,sizeof(aMagic))!=0 ){
    rc = SQLITE_PROTOCOL;
    goto end_playback;
  }
  // 读取最大页数
  rc = sqliteOsRead(&pPager->jfd, &mxPg, sizeof(mxPg));
  if( rc!=SQLITE_OK ){
    goto end_playback;
  }
  // 丢弃最后不完整的页
  rc = sqliteOsTruncate(&pPager->fd, mxPg*SQLITE_PAGE_SIZE);
  if( rc!=SQLITE_OK ){
    goto end_playback;
  }
  // 恢复页数
  pPager->dbSize = mxPg;
  
  /* Copy original pages out of the journal and back into the database file.
  */
  for(i=nRec-1; i>=0; i--){
    rc = pager_playback_one_page(pPager, &pPager->jfd);
    if( rc!=SQLITE_OK ) break;
  }

end_playback:
  if( rc!=SQLITE_OK ){
    pager_unwritelock(pPager);
    pPager->errMask |= PAGER_ERR_CORRUPT;
    rc = SQLITE_CORRUPT;
  }else{
    rc = pager_unwritelock(pPager);
  }
  return rc;
}

static int pager_playback_one_page(Pager *pPager, OsFile *jfd){
  int rc;
  PgHdr *pPg;              /* An existing page in the cache */
  PageRecord pgRec;
  // 读页号和page内容
  rc = sqliteOsRead(jfd, &pgRec, sizeof(pgRec));
  if( rc!=SQLITE_OK ) return rc;

  /* Sanity checking on the page */
  if( pgRec.pgno>pPager->dbSize || pgRec.pgno==0 ) return SQLITE_CORRUPT;

  /* Playback the page.  Update the in-memory copy of the page
  ** at the same time, if there is one.
  */
  // 如果有缓存的话,也更新到缓存
  pPg = pager_lookup(pPager, pgRec.pgno);
  if( pPg ){
    memcpy(PGHDR_TO_DATA(pPg), pgRec.aData, SQLITE_PAGE_SIZE);
    memset(PGHDR_TO_EXTRA(pPg), 0, pPager->nExtra);
  }
  // 写入文件
  rc = sqliteOsSeek(&pPager->fd, (pgRec.pgno-1)*SQLITE_PAGE_SIZE);
  if( rc==SQLITE_OK ){
    rc = sqliteOsWrite(&pPager->fd, pgRec.aData, SQLITE_PAGE_SIZE);
  }
  return rc;
}

事务提交时需要调用sqlitepager_commit ,刷盘并释放写锁。看实现会先同步journal文件,然后将内存中的page写入磁盘并同步数据库文件。在释放写锁时会删除journal文件。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
int sqlitepager_commit(Pager *pPager){
  int rc;
  PgHdr *pPg;

  if( pPager->errMask==PAGER_ERR_FULL ){
    rc = sqlitepager_rollback(pPager);
    if( rc==SQLITE_OK ) rc = SQLITE_FULL;
    return rc;
  }
  if( pPager->errMask!=0 ){
    rc = pager_errcode(pPager);
    return rc;
  }
  if( pPager->state!=SQLITE_WRITELOCK ){
    return SQLITE_ERROR;
  }
  assert( pPager->journalOpen );
  if( pPager->dirtyFile==0 ){
    // 没有修改直接返回
    /* Exit early (without doing the time-consuming sqliteOsSync() calls)
    ** if there have been no changes to the database file. */
    rc = pager_unwritelock(pPager);
    pPager->dbSize = -1;
    return rc;
  }
  // 同步journal文件
  if( pPager->needSync && sqliteOsSync(&pPager->jfd)!=SQLITE_OK ){
    goto commit_abort;
  }
  // 写所有的脏页
  for(pPg=pPager->pAll; pPg; pPg=pPg->pNextAll){
    if( pPg->dirty==0 ) continue;
    rc = sqliteOsSeek(&pPager->fd, (pPg->pgno-1)*SQLITE_PAGE_SIZE);
    if( rc!=SQLITE_OK ) goto commit_abort;
    rc = sqliteOsWrite(&pPager->fd, PGHDR_TO_DATA(pPg), SQLITE_PAGE_SIZE);
    if( rc!=SQLITE_OK ) goto commit_abort;
  }
  // 同步数据库文件
  if( !pPager->noSync && sqliteOsSync(&pPager->fd)!=SQLITE_OK ){
    goto commit_abort;
  }
  // 释放写锁时会删除journal文件
  rc = pager_unwritelock(pPager);
  pPager->dbSize = -1;
  return rc;

  /* Jump here if anything goes wrong during the commit process.
  */
commit_abort:
  rc = sqlitepager_rollback(pPager);
  if( rc==SQLITE_OK ){
    rc = SQLITE_FULL;
  }
  return rc;
}

某些情况下脏页有可能整页被丢弃,比如在B树删除时可能page被合并到相邻的页,这时可以调用sqlitepager_dont_write 来减少事务提交时的写盘量,通过去掉脏页标记使得事务提交时不需要刷新这个页到磁盘。

1
2
3
4
5
6
7
void sqlitepager_dont_write(Pager *pPager, Pgno pgno){
  PgHdr *pPg;
  pPg = pager_lookup(pPager, pgno);
  if( pPg && pPg->dirty ){
    pPg->dirty = 0;
  }
}

某些情况下,在写数据页时可能之前的数据不需要备份到journal文件中,比如重用之前释放的磁盘页时,这时可以调用sqlitepager_dont_rollback ,其主要逻辑为设置在journal文件中的标志位。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
void sqlitepager_dont_rollback(void *pData){
  PgHdr *pPg = DATA_TO_PGHDR(pData);
  Pager *pPager = pPg->pPager;

  if( pPager->state!=SQLITE_WRITELOCK || pPager->journalOpen==0 ) return;
  if( !pPg->inJournal && (int)pPg->pgno <= pPager->origDbSize ){
    assert( pPager->aInJournal!=0 );
    pPager->aInJournal[pPg->pgno/8] |= 1<<(pPg->pgno&7);
    // 只设置标志位,但不写入journal文件
    pPg->inJournal = 1;
    if( pPager->ckptInUse ){
      pPager->aInCkpt[pPg->pgno/8] |= 1<<(pPg->pgno&7);
      pPg->inCkpt = 1;
    }
  }
}

B树实现

这个版本的sqlite的表和索引都使用了B树实现,还没有实现B+树。

一个打开的数据库用下面的结构体表示

1
2
3
4
5
6
7
8
9
struct Btree {
  Pager *pPager;        /* The page cache */
  BtCursor *pCursor;    /* A list of all open cursors */
  PageOne *page1;       /* First page of the database */
  u8 inTrans;           /* True if a transaction is in progress */
  u8 inCkpt;            /* True if there is a checkpoint on the transaction */
  u8 readOnly;          /* True if the underlying file is readonly */
  Hash locks;           /* Key: root page number.  Data: lock count */
};

数据库文件的第1个page,用PageOne 来表示,其中包含标识数据库文件的magic number,以及空闲页的信息。

1
2
3
4
5
6
7
8
9
struct PageOne {
  // ** This file contains an SQLite 2.1 database **
  char zMagic[MAGIC_SIZE]; /* String that identifies the file as a database */
  // 0xdae37528
  int iMagic;              /* Integer to verify correct byte order */
  Pgno freeList;           /* First free page in a list of all free pages */
  int nFree;               /* Number of pages on the free list */
  int aMeta[SQLITE_N_BTREE_META-1];  /* User defined integers */
};

一个数据库文件中可以包含多个B树,这些B树存储在第2个或者页号更大的page中。

空闲页表

PageOne.freeList指向第一个存储空闲页信息的页。空闲页信息使用FreelistInfo 存储

1
2
3
4
struct FreelistInfo {
  int nFree;  // 空闲页的个数
  Pgno aFree[(OVERFLOW_SIZE-sizeof(int))/sizeof(Pgno)];
};

存储空闲页信息的页使用overflow页的格式存储,使用OverflowPage.iNext指向下一个存储空闲页信息的页,这些空闲页信息构成了一个链表。FreelistInfo存储在OverflowPage.aPayload处。

1
2
3
4
5
6
#define OVERFLOW_SIZE (SQLITE_PAGE_SIZE-sizeof(Pgno))

struct OverflowPage {
  Pgno iNext;
  char aPayload[OVERFLOW_SIZE];
};

添加一个页到空闲页表的实现如下。如果存在不满的存储空闲页信息的页,则追加到其中,否则创建一个新的存储空闲页信息的页。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/*
** Add a page of the database file to the freelist.  Either pgno or
** pPage but not both may be 0. 
**
** sqlitepager_unref() is NOT called for pPage.
*/
static int freePage(Btree *pBt, void *pPage, Pgno pgno){
  PageOne *pPage1 = pBt->page1;
  OverflowPage *pOvfl = (OverflowPage*)pPage;
  int rc;
  int needUnref = 0;
  MemPage *pMemPage;

  if( pgno==0 ){
    assert( pOvfl!=0 );
    pgno = sqlitepager_pagenumber(pOvfl);
  }
  assert( pgno>2 );
  rc = sqlitepager_write(pPage1);
  if( rc ){
    return rc;
  }
  // 计数加1
  pPage1->nFree++;
  if( pPage1->nFree>0 && pPage1->freeList ){
    OverflowPage *pFreeIdx;
    rc = sqlitepager_get(pBt->pPager, pPage1->freeList, (void**)&pFreeIdx);
    if( rc==SQLITE_OK ){
      FreelistInfo *pInfo = (FreelistInfo*)pFreeIdx->aPayload;
      // 如果有空闲空间,则写入
      if( pInfo->nFree<(sizeof(pInfo->aFree)/sizeof(pInfo->aFree[0])) ){
        rc = sqlitepager_write(pFreeIdx);
        if( rc==SQLITE_OK ){
          pInfo->aFree[pInfo->nFree] = pgno;
          pInfo->nFree++;
          sqlitepager_unref(pFreeIdx);
          // 取消pgno页的脏页标记
          sqlitepager_dont_write(pBt->pPager, pgno);
          return rc;
        }
      }
      sqlitepager_unref(pFreeIdx);
    }
  }
  // 运行到这里意味着所有的存储空闲页信息的页都已写满,需要用当前页来存储空闲页信息。
  if( pOvfl==0 ){
    assert( pgno>0 );
    rc = sqlitepager_get(pBt->pPager, pgno, (void**)&pOvfl);
    if( rc ) return rc;
    needUnref = 1;
  }
  rc = sqlitepager_write(pOvfl);
  if( rc ){
    if( needUnref ) sqlitepager_unref(pOvfl);
    return rc;
  }
  // 维护链表
  pOvfl->iNext = pPage1->freeList;
  pPage1->freeList = pgno;
  // memset也保证了FreelistInfo.nFree == 0
  memset(pOvfl->aPayload, 0, OVERFLOW_SIZE);
  pMemPage = (MemPage*)pPage;
  pMemPage->isInit = 0;
  if( pMemPage->pParent ){
    sqlitepager_unref(pMemPage->pParent);
    pMemPage->pParent = 0;
  }
  if( needUnref ) rc = sqlitepager_unref(pOvfl);
  return rc;
}

从空闲页表分配一个页的实现如下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
static int allocatePage(Btree *pBt, MemPage **ppPage, Pgno *pPgno){
  PageOne *pPage1 = pBt->page1;
  int rc;
  // 如果有空闲页,从空闲页分配;否则从磁盘加载,分配一个新页
  if( pPage1->freeList ){
    OverflowPage *pOvfl;
    FreelistInfo *pInfo;

    rc = sqlitepager_write(pPage1);
    if( rc ) return rc;
    pPage1->nFree--;
    rc = sqlitepager_get(pBt->pPager, pPage1->freeList, (void**)&pOvfl);
    if( rc ) return rc;
    rc = sqlitepager_write(pOvfl);
    if( rc ){
      sqlitepager_unref(pOvfl);
      return rc;
    }
    pInfo = (FreelistInfo*)pOvfl->aPayload;
    if( pInfo->nFree==0 ){
      // 这个空闲页信息页只的空闲页为0,把这个页本身分配出去。
      *pPgno = pPage1->freeList;
      pPage1->freeList = pOvfl->iNext;
      *ppPage = (MemPage*)pOvfl;
    }else{
      // 从中拿一个页
      pInfo->nFree--;
      *pPgno = pInfo->aFree[pInfo->nFree];
      rc = sqlitepager_get(pBt->pPager, *pPgno, (void**)ppPage);
      sqlitepager_unref(pOvfl);
      if( rc==SQLITE_OK ){
          // 设置空闲页的内容不需要回滚
        sqlitepager_dont_rollback(*ppPage);
        rc = sqlitepager_write(*ppPage);
      }
    }
  }else{
    *pPgno = sqlitepager_pagecount(pBt->pPager) + 1;
    // 从Pager分配一个新页
    rc = sqlitepager_get(pBt->pPager, *pPgno, (void**)ppPage);
    if( rc ) return rc;
    rc = sqlitepager_write(*ppPage);
  }
  return rc;
}

B树节点页实现

节点使用下面的结构体来表示。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
struct MemPage {
  union {
    char aDisk[SQLITE_PAGE_SIZE];  /* Page data stored on disk */
    PageHdr hdr;                   /* Overlay page header */
  } u;
  int isInit;                    /* True if auxiliary data is initialized */
  MemPage *pParent;              /* The parent of this page.  NULL for root */
  // 空闲字节数。
  int nFree;                     /* Number of free bytes in u.aDisk[] */
  // 存储的cell个数。
  int nCell;                     /* Number of entries on this page */
  // 是否已经overfull,即超出page size
  int isOverfull;                /* Some apCell[] points outside u.aDisk[] */
  // 排序的Cell列表。
  Cell *apCell[MX_CELL+2];       /* All data entires in sorted order */
};

#define EXTRA_SIZE (sizeof(MemPage)-SQLITE_PAGE_SIZE)

MemPage.u对应磁盘上的page,之后的数据只在内存中存储。在创建Pager时,会设置Pager.nExtra = EXTRA_SIZE以便在创建内存中的页时分配足够的内存。

每个节点的KV和左子指针,使用Cell结构来表示。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
struct CellHdr {
  Pgno leftChild; /* Child page that comes before this cell */
  u16 nKey;       /* Number of bytes in the key */
  u16 iNext;      /* Index in MemPage.u.aDisk[] of next cell in sorted order */
  u8 nKeyHi;      /* Upper 8 bits of key size for keys larger than 64K bytes */
  u8 nDataHi;     /* Upper 8 bits of data size when the size is more than 64K */
  u16 nData;      /* Number of bytes of data */
};

#define NKEY(h)  (h.nKey + h.nKeyHi*65536)
#define NDATA(h) (h.nData + h.nDataHi*65536)

// Cell的最小大小
#define MIN_CELL_SIZE  (sizeof(CellHdr)+4)
// 一个页中最多可以存多少个Cell
#define MX_CELL ((SQLITE_PAGE_SIZE-sizeof(PageHdr))/MIN_CELL_SIZE)
#define USABLE_SPACE  (SQLITE_PAGE_SIZE - sizeof(PageHdr))
// 不使用overflow页时Cell的最大字节数,保证一个page至少可以存4个Cell
#define MX_LOCAL_PAYLOAD ((USABLE_SPACE/4-(sizeof(CellHdr)+sizeof(Pgno)))&~3)

struct Cell {
  CellHdr h;                        /* The cell header */
  char aPayload[MX_LOCAL_PAYLOAD];  /* Key and data */
  // 第一个overflow页的页号
  Pgno ovfl;                        /* The first overflow page */
};

Cell由CellHdr和payload数据以及第一个overflow页的页号构成。虽然Cell的结构体的长度是固定的,但实际Cell是变长的,通过CellHdr来确定Cell的实际长度。payload至少4个字节,最多MX_LOCAL_PAYLOAD个字节,如果payload需要的空间大于MX_LOCAL_PAYLOAD时会分配overflow页。

CellHdr描述了key和value的长度,都使用了24个bit表示,为了节省空间拆分为高位8bit的nKeyHi和16 bit的nKey(如果不这么拆分的话要占用32个bit),定义了NKEY和NDATA宏来得到K和V的长度。CellHdr.leftChild指向了左子树的页号,其中存储的key都小于这个Cell的key。

分配一个Cell的实现如下,如果空间不足,会分配overflow页来存储。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
static int fillInCell(
  Btree *pBt,              /* The whole Btree.  Needed to allocate pages */
  Cell *pCell,             /* Populate this Cell structure */
  const void *pKey, int nKey,    /* The key */
  const void *pData,int nData    /* The data */
){
  OverflowPage *pOvfl, *pPrior;
  Pgno *pNext;
  int spaceLeft;
  int n, rc;
  int nPayload;
  const char *pPayload;
  char *pSpace;

  pCell->h.leftChild = 0;
  // 设置size
  pCell->h.nKey = nKey & 0xffff;
  pCell->h.nKeyHi = nKey >> 16;
  pCell->h.nData = nData & 0xffff;
  pCell->h.nDataHi = nData >> 16;
  pCell->h.iNext = 0;

  pNext = &pCell->ovfl;
  pSpace = pCell->aPayload;
  spaceLeft = MX_LOCAL_PAYLOAD;
  pPayload = pKey;
  pKey = 0;
  nPayload = nKey;
  pPrior = 0;
  while( nPayload>0 ){
    if( spaceLeft==0 ){
      // 分配一个overflow页,页号设置到pCell->ovfl或者pOvfl->iNext中
      rc = allocatePage(pBt, (MemPage**)&pOvfl, pNext);
      if( rc ){
        *pNext = 0;
      }
      if( pPrior ) sqlitepager_unref(pPrior);
      if( rc ){
        clearCell(pBt, pCell);
        return rc;
      }
      pPrior = pOvfl;
      spaceLeft = OVERFLOW_SIZE;
      pSpace = pOvfl->aPayload;
      pNext = &pOvfl->iNext;
    }
    n = nPayload;
    if( n>spaceLeft ) n = spaceLeft;
    // 挎包
    memcpy(pSpace, pPayload, n);
    nPayload -= n;
    if( nPayload==0 && pData ){
      // key写完后接着写value
      pPayload = pData;
      nPayload = nData;
      pData = 0;
    }else{
      pPayload += n;
    }
    spaceLeft -= n;
    pSpace += n;
  }
  *pNext = 0;
  if( pPrior ){
    sqlitepager_unref(pPrior);
  }
  return SQLITE_OK;
}

释放Cell涉及到两个函数。clearCell 负责释放Cell分配的overflow页。dropCell 负责将Cell非溢出的内存加入空闲链表中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
static int clearCell(Btree *pBt, Cell *pCell){
  Pager *pPager = pBt->pPager;
  OverflowPage *pOvfl;
  Pgno ovfl, nextOvfl;
  int rc;

  if( NKEY(pCell->h) + NDATA(pCell->h) <= MX_LOCAL_PAYLOAD ){
    return SQLITE_OK;
  }
  ovfl = pCell->ovfl;
  pCell->ovfl = 0;
  while( ovfl ){
    // 释放overflow页
    rc = sqlitepager_get(pPager, ovfl, (void**)&pOvfl);
    if( rc ) return rc;
    nextOvfl = pOvfl->iNext;
    rc = freePage(pBt, pOvfl, ovfl);
    if( rc ) return rc;
    sqlitepager_unref(pOvfl);
    ovfl = nextOvfl;
  }
  return SQLITE_OK;
}

static void dropCell(MemPage *pPage, int idx, int sz){
  int j;
  assert( idx>=0 && idx<pPage->nCell );
  assert( sz==cellSize(pPage->apCell[idx]) );
  assert( sqlitepager_iswriteable(pPage) );
  // 释放Cell本身占的空间
  freeSpace(pPage, Addr(pPage->apCell[idx]) - Addr(pPage), sz);
  for(j=idx; j<pPage->nCell-1; j++){
    pPage->apCell[j] = pPage->apCell[j+1];
  }
  pPage->nCell--;
}

/
** Most of the effort here is involved in coalesing adjacent
** free blocks into a single big free block.
*/
static void freeSpace(MemPage *pPage, int start, int size){
  int end = start + size;
  u16 *pIdx, idx;
  FreeBlk *pFBlk;
  FreeBlk *pNew;
  FreeBlk *pNext;

  assert( sqlitepager_iswriteable(pPage) );
  assert( size == ROUNDUP(size) );
  assert( start == ROUNDUP(start) );
  pIdx = &pPage->u.hdr.firstFree;
  idx = *pIdx;
  while( idx!=0 && idx<start ){
    pFBlk = (FreeBlk*)&pPage->u.aDisk[idx];
    if( idx + pFBlk->iSize == start ){
      pFBlk->iSize += size;
      if( idx + pFBlk->iSize == pFBlk->iNext ){
        // 如果可以和下一个区间合并
        pNext = (FreeBlk*)&pPage->u.aDisk[pFBlk->iNext];
        pFBlk->iSize += pNext->iSize;
        pFBlk->iNext = pNext->iNext;
      }
      pPage->nFree += size;
      return;
    }
    pIdx = &pFBlk->iNext;
    idx = *pIdx;
  }
  pNew = (FreeBlk*)&pPage->u.aDisk[start];
  // 检查是否可以和后面的区间合并
  if( idx != end ){
    pNew->iSize = size;
    pNew->iNext = idx;
  }else{
    pNext = (FreeBlk*)&pPage->u.aDisk[idx];
    pNew->iSize = size + pNext->iSize;
    pNew->iNext = pNext->iNext;
  }
  *pIdx = start;
  pPage->nFree += size;
}

一个MemPage逻辑上来说由多个排序的Cell组成,但在物理存储时即不保证有序也不保证连续。由于插入数据时不一定有序,因此要保证物理存储有序需要移动数据而导致额外的开销,因此实现时并不会移动数据,但通过CellHdr.iNext构成了一个有序的Cell链表。由于删除数据时也不会移动数据,因此Cell之间可能有空洞,这些空洞使用FreeBlk表示,并通过FreeBlk.iNext构成了一个链表。

1
2
3
4
struct FreeBlk {
  u16 iSize;      /* Number of bytes in this block of free space */
  u16 iNext;      /* Index in MemPage.u.aDisk[] of the next free block */
};

从磁盘加载page后,通过initPage维护内存中的辅助信息,将所有Cell加载到内存中的apCell数组,计算总的空闲字节数信息。initPage后,可以通过apCell随机访问一个Cell,而不需要通过Cell链表顺序访问。在插入时,新的Cell可能超出page size,在短暂的时间内apCell会有指向aDisk外的Cell。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
static int initPage(MemPage *pPage, Pgno pgnoThis, MemPage *pParent){
  int idx;           /* An index into pPage->u.aDisk[] */
  Cell *pCell;       /* A pointer to a Cell in pPage->u.aDisk[] */
  FreeBlk *pFBlk;    /* A pointer to a free block in pPage->u.aDisk[] */
  int sz;            /* The size of a Cell in bytes */
  int freeSpace;     /* Amount of free space on the page */

  if( pPage->pParent ){
    assert( pPage->pParent==pParent );
    return SQLITE_OK;
  }
  if( pParent ){
    pPage->pParent = pParent;
    // 增加引用计数
    sqlitepager_ref(pParent);
  }
  if( pPage->isInit ) return SQLITE_OK;
  pPage->isInit = 1;
  pPage->nCell = 0;
  freeSpace = USABLE_SPACE;
  idx = pPage->u.hdr.firstCell;
  while( idx!=0 ){
    // 剩余空间存不下一个Cell  
    if( idx>SQLITE_PAGE_SIZE-MIN_CELL_SIZE ) goto page_format_error;
    // 起始地址在PageHdr内
    if( idx<sizeof(PageHdr) ) goto page_format_error;
    // 没有4字节对齐
    if( idx!=ROUNDUP(idx) ) goto page_format_error;
    pCell = (Cell*)&pPage->u.aDisk[idx];
    sz = cellSize(pCell);
    // 超过了page size
    if( idx+sz > SQLITE_PAGE_SIZE ) goto page_format_error;
    freeSpace -= sz;
    // 加载到cell数组
    pPage->apCell[pPage->nCell++] = pCell;
    idx = pCell->h.iNext;
  }
  pPage->nFree = 0;
  idx = pPage->u.hdr.firstFree;
  while( idx!=0 ){
    if( idx>SQLITE_PAGE_SIZE-sizeof(FreeBlk) ) goto page_format_error;
    if( idx<sizeof(PageHdr) ) goto page_format_error;
    pFBlk = (FreeBlk*)&pPage->u.aDisk[idx];
    // 维护空闲byte数统计信息
    pPage->nFree += pFBlk->iSize;
    if( pFBlk->iNext>0 && pFBlk->iNext <= idx ) goto page_format_error;
    idx = pFBlk->iNext;
  }
  if( pPage->nCell==0 && pPage->nFree==0 ){
    /* As a special case, an uninitialized root page appears to be
    ** an empty database */
    return SQLITE_OK;
  }
  if( pPage->nFree!=freeSpace ) goto page_format_error;
  return SQLITE_OK;

page_format_error:
  return SQLITE_CORRUPT;
}

B树操作

创建B树

先看创建数据库的实现。如果已经至少有2个页,则直接返回。否则创建2个页,并初始化第1个页为PageOne,对第2个页调用zeroPage来重置为一个包含0项的B树节点页。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
static int newDatabase(Btree *pBt){
  MemPage *pRoot;
  PageOne *pP1;
  int rc;
  if( sqlitepager_pagecount(pBt->pPager)>1 ) return SQLITE_OK;
  pP1 = pBt->page1;
  rc = sqlitepager_write(pBt->page1);
  if( rc ) return rc;
  rc = sqlitepager_get(pBt->pPager, 2, (void**)&pRoot);
  if( rc ) return rc;
  rc = sqlitepager_write(pRoot);
  if( rc ){
    sqlitepager_unref(pRoot);
    return rc;
  }
  strcpy(pP1->zMagic, zMagicHeader);
  pP1->iMagic = MAGIC;
  zeroPage(pRoot);
  sqlitepager_unref(pRoot);
  return SQLITE_OK;
}

sqliteBtreeCreateTable 要做的事情很简单,分配一个页作为根页,返回页号。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
int sqliteBtreeCreateTable(Btree *pBt, int *piTable){
  MemPage *pRoot;
  Pgno pgnoRoot;
  int rc;
  if( !pBt->inTrans ){
    return SQLITE_ERROR;  /* Must start a transaction first */
  }
  if( pBt->readOnly ){
    return SQLITE_READONLY;
  }
  // 如果有空闲的,用空闲的,否则分配一个新page
  rc = allocatePage(pBt, &pRoot, &pgnoRoot);
  if( rc ) return rc;
  assert( sqlitepager_iswriteable(pRoot) );
  zeroPage(pRoot);
  // 不在使用中就调unref
  sqlitepager_unref(pRoot);
  // 返回根所在的页号
  *piTable = (int)pgnoRoot;
  return SQLITE_OK;
}

Cursor操作

Cursor用于B树的遍历或者定位,其定义如下。mPage指向当前的page,idx指向当前的项。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
/*
** A cursor is a pointer to a particular entry in the BTree.
** The entry is identified by its MemPage and the index in
** MemPage.apCell[] of the entry.
*/
struct BtCursor {
  Btree *pBt;               /* The Btree to which this cursor belongs */
  BtCursor *pNext, *pPrev;  /* Forms a linked list of all cursors */
  Pgno pgnoRoot;            /* The root page of this tree */
  MemPage *pPage;           /* Page that contains the entry */
  int idx;                  /* Index of the entry in pPage->apCell[] */
  u8 wrFlag;                /* True if writable */
  u8 bSkipNext;             /* sqliteBtreeNext() is no-op if true */
  u8 iMatch;                /* compare result from last sqliteBtreeMoveto() */
};

调用sqliteBtreeCursor 创建一个Cursor

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
int sqliteBtreeCursor(Btree *pBt, int iTable, int wrFlag, BtCursor **ppCur){
  int rc;
  BtCursor *pCur;
  ptr nLock;

  if( pBt->page1==0 ){
    // 打开PageOne,并获取一个数据库读锁
    rc = lockBtree(pBt);
    if( rc!=SQLITE_OK ){
      *ppCur = 0;
      return rc;
    }
  }
  if( wrFlag && pBt->readOnly ){
    *ppCur = 0;
    return SQLITE_READONLY;
  }
  pCur = sqliteMalloc( sizeof(*pCur) );
  if( pCur==0 ){
    rc = SQLITE_NOMEM;
    goto create_cursor_exception;
  }
  // 设置root页号
  pCur->pgnoRoot = (Pgno)iTable;
  // 获取当前页
  rc = sqlitepager_get(pBt->pPager, pCur->pgnoRoot, (void**)&pCur->pPage);
  if( rc!=SQLITE_OK ){
    goto create_cursor_exception;
  }
  // 初始化page在内存中的额外辅助信息
  rc = initPage(pCur->pPage, pCur->pgnoRoot, 0);
  if( rc!=SQLITE_OK ){
    goto create_cursor_exception;
  }
  nLock = (ptr)sqliteHashFind(&pBt->locks, 0, iTable);
  // nLock < 0说明已被别人上了写锁
  // nLock > 0 说明已被别人上了读锁,如果wrFlag为true,则获取写锁失败
  if( nLock<0 || (nLock>0 && wrFlag) ){
    rc = SQLITE_LOCKED;
    goto create_cursor_exception;
  }
  // 写锁为负,读锁为正,0即NULL表示没有锁
  nLock = wrFlag ? -1 : nLock+1;
  sqliteHashInsert(&pBt->locks, 0, iTable, (void*)nLock);
  pCur->pBt = pBt;
  pCur->wrFlag = wrFlag;
  pCur->idx = 0;
  // 维护cursor链表
  pCur->pNext = pBt->pCursor;
  if( pCur->pNext ){
    pCur->pNext->pPrev = pCur;
  }
  pCur->pPrev = 0;
  pBt->pCursor = pCur;
  *ppCur = pCur;
  return SQLITE_OK;

create_cursor_exception:
  *ppCur = 0;
  if( pCur ){
    if( pCur->pPage ) sqlitepager_unref(pCur->pPage);
    sqliteFree(pCur);
  }
  unlockBtreeIfUnused(pBt);
  return rc;
}

这个版本的Cursor支持的操作为First, Last, Next, Moveto,分别指向第一个、最后一个、下一个、指定的key。

sqliteBtreeFirst 先将Cursor指向root,然后沿着左子指针不断移动。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
int sqliteBtreeFirst(BtCursor *pCur, int *pRes){
  int rc;
  if( pCur->pPage==0 ) return SQLITE_ABORT;
  rc = moveToRoot(pCur);
  if( rc ) return rc;
  if( pCur->pPage->nCell==0 ){
    *pRes = 1;
    return SQLITE_OK;
  }
  *pRes = 0;
  rc = moveToLeftmost(pCur);
  pCur->bSkipNext = 0;
  return rc;
}

static int moveToRoot(BtCursor *pCur){
  MemPage *pNew;
  int rc;
  // 载入page,并修改cursor指向的page
  rc = sqlitepager_get(pCur->pBt->pPager, pCur->pgnoRoot, (void**)&pNew);
  if( rc ) return rc;
  rc = initPage(pNew, pCur->pgnoRoot, 0);
  if( rc ) return rc;
  sqlitepager_unref(pCur->pPage);
  pCur->pPage = pNew;
  // 重置idx为0
  pCur->idx = 0;
  return SQLITE_OK;
}

static int moveToLeftmost(BtCursor *pCur){
  Pgno pgno;
  int rc;
  // 沿着最左的child指针不断移动
  while( (pgno = pCur->pPage->apCell[pCur->idx]->h.leftChild)!=0 ){
    rc = moveToChild(pCur, pgno);
    if( rc ) return rc;
  }
  return SQLITE_OK;
}

static int moveToChild(BtCursor *pCur, int newPgno){
  int rc;
  MemPage *pNewPage;

  // 载入page,并修改cursor指向的page
  rc = sqlitepager_get(pCur->pBt->pPager, newPgno, (void**)&pNewPage);
  if( rc ) return rc;
  rc = initPage(pNewPage, newPgno, pCur->pPage);
  if( rc ) return rc;
  sqlitepager_unref(pCur->pPage);
  pCur->pPage = pNewPage;
  pCur->idx = 0;
  return SQLITE_OK;
}

sqliteBtreeNext 移动到下一个KV。如果这一层还没遍历完,先将idx指向下一个Cell,然后移动到该Cell下层的最左节点。否则向上层回溯,由于采用的是B树实现,回溯到上层的有效节点就可以返回了,因为中间节点也存储着数据。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
int sqliteBtreeNext(BtCursor *pCur, int *pRes){
  int rc;
  if( pCur->pPage==0 ){
    if( pRes ) *pRes = 1;
    return SQLITE_ABORT;
  }
  // 之前的操作已经指向了下一个元素,直接返回
  if( pCur->bSkipNext && pCur->idx<pCur->pPage->nCell ){
    pCur->bSkipNext = 0;
    if( pRes ) *pRes = 0;
    return SQLITE_OK;
  }
  // idx + 1
  pCur->idx++;
  // 已经遍历完这个page
  if( pCur->idx>=pCur->pPage->nCell ){
    // 如果存在右子树
    if( pCur->pPage->u.hdr.rightChild ){
      // 移动到右子树
      rc = moveToChild(pCur, pCur->pPage->u.hdr.rightChild);
      if( rc ) return rc;
      // 移动到右子树下的最左节点
      rc = moveToLeftmost(pCur);
      if( rc ) return rc;
      if( pRes ) *pRes = 0;
      return SQLITE_OK;
    }
    do{
      // pParent == 0 说明整个树已遍历完
      if( pCur->pPage->pParent==0 ){
        if( pRes ) *pRes = 1;
        return SQLITE_OK;
      }
      // 移动到parent
      rc = moveToParent(pCur);
      if( rc ) return rc;
    }while( pCur->idx>=pCur->pPage->nCell );
    // B树的中间节点也存储数据,因此这里返回了。
    if( pRes ) *pRes = 0;
    return SQLITE_OK;
  }
  // 移动到该节点下的最左节点
  rc = moveToLeftmost(pCur);
  if( rc ) return rc;
  if( pRes ) *pRes = 0;
  return SQLITE_OK;
}

static int moveToParent(BtCursor *pCur){
  Pgno oldPgno;
  MemPage *pParent;
  int i;
  pParent = pCur->pPage->pParent;
  if( pParent==0 ) return SQLITE_INTERNAL;
  oldPgno = sqlitepager_pagenumber(pCur->pPage);
  sqlitepager_ref(pParent);
  sqlitepager_unref(pCur->pPage);
  pCur->pPage = pParent;
  pCur->idx = pParent->nCell;
  for(i=0; i<pParent->nCell; i++){
    if( pParent->apCell[i]->h.leftChild==oldPgno ){
      // 设置idx为包含原child的Cell的下标
      pCur->idx = i;
      break;
    }
  }
  return SQLITE_OK;
}

sqliteBtreeMoveto 将Cursor移动到指定key附近的KV。在key不存在时,有可能指向在该key之前或之后的key,使用*pRes的值来表示不同的结果(<0 之前的key,=0 恰好指向,>0 之后的key)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
int sqliteBtreeMoveto(BtCursor *pCur, const void *pKey, int nKey, int *pRes){
  int rc;
  if( pCur->pPage==0 ) return SQLITE_ABORT;
  pCur->bSkipNext = 0;
  // 先移动到root页
  rc = moveToRoot(pCur);
  if( rc ) return rc;
  for(;;){
    int lwr, upr;
    Pgno chldPg;
    MemPage *pPage = pCur->pPage;
    int c = -1;
    lwr = 0;
    upr = pPage->nCell-1;
    // 二分查找
    while( lwr<=upr ){
      pCur->idx = (lwr+upr)/2;
      // 比较当前idx指向的Cell和key的大小关系,必要时会加载overflow页的数据。
      rc = sqliteBtreeKeyCompare(pCur, pKey, nKey, 0, &c);
      if( rc ) return rc;
      if( c==0 ){
        pCur->iMatch = c;
        if( pRes ) *pRes = 0;
        return SQLITE_OK;
      }
      if( c<0 ){
        lwr = pCur->idx+1;
      }else{
        upr = pCur->idx-1;
      }
    }
    assert( lwr==upr+1 );
    if( lwr>=pPage->nCell ){
      chldPg = pPage->u.hdr.rightChild;
    }else{
      chldPg = pPage->apCell[lwr]->h.leftChild;
    }
    // 没有child页了则返回
    if( chldPg==0 ){
      pCur->iMatch = c;
      if( pRes ) *pRes = c;
      return SQLITE_OK;
    }
    // 从该child处继续找
    rc = moveToChild(pCur, chldPg);
    if( rc ) return rc;
  }
  /* NOT REACHED */
}

插入删除

在插入KV时,先将Cursor移动到key附近,然后根据是否存在key做一些不同apCell 数组的维护工作,最后调用rebalance来做树平衡。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
/*
** Insert a new record into the BTree.  The key is given by (pKey,nKey)
** and the data is given by (pData,nData).  The cursor is used only to
** define what database the record should be inserted into.  The cursor
** is left pointing at the new record.
*/
int sqliteBtreeInsert(
  BtCursor *pCur,                /* Insert data into the table of this cursor */
  const void *pKey, int nKey,    /* The key of the new record */
  const void *pData, int nData   /* The data of the new record */
){
  Cell newCell;
  int rc;
  int loc;
  int szNew;
  MemPage *pPage;
  Btree *pBt = pCur->pBt;

  if( pCur->pPage==0 ){
    return SQLITE_ABORT;  /* A rollback destroyed this cursor */
  }
  if( !pCur->pBt->inTrans || nKey+nData==0 ){
    return SQLITE_ERROR;  /* Must start a transaction first */
  }
  if( !pCur->wrFlag ){
    return SQLITE_PERM;   /* Cursor not open for writing */
  }
  // 移动到key附近
  rc = sqliteBtreeMoveto(pCur, pKey, nKey, &loc);
  if( rc ) return rc;
  pPage = pCur->pPage;
  rc = sqlitepager_write(pPage);
  if( rc ) return rc;
  // 分配一个cell来存放key/value,可能有overflow 页
  rc = fillInCell(pBt, &newCell, pKey, nKey, pData, nData);
  if( rc ) return rc;
  szNew = cellSize(&newCell);
  if( loc==0 ){
    // 恰好匹配,需要删除之前的Cell。
    // 调用clearCell释放涉及的overflow页,调用dropCell来从apCell数组中删除。
    newCell.h.leftChild = pPage->apCell[pCur->idx]->h.leftChild;
    rc = clearCell(pBt, pPage->apCell[pCur->idx]);
    if( rc ) return rc;
    dropCell(pPage, pCur->idx, cellSize(pPage->apCell[pCur->idx]));
  }else if( loc<0 && pPage->nCell>0 ){
    assert( pPage->u.hdr.rightChild==0 );  /* Must be a leaf page */
    // idx+1指向下一个位置,和下面的else得到相同的结果
    pCur->idx++;
  }else{
      // rightChild说明是leaf页
    assert( pPage->u.hdr.rightChild==0 );  /* Must be a leaf page */
  }
  // 将新的Cell插入,执行完成后有可能超出page size,后面的balance会处理。
  insertCell(pPage, pCur->idx, &newCell, szNew);
  rc = balance(pCur->pBt, pPage, pCur);
  /* sqliteBtreePageDump(pCur->pBt, pCur->pgnoRoot, 1); */
  /* fflush(stdout); */
  return rc;
}

static void insertCell(MemPage *pPage, int i, Cell *pCell, int sz){
  int idx, j;
  assert( i>=0 && i<=pPage->nCell );
  assert( sz==cellSize(pCell) );
  assert( sqlitepager_iswriteable(pPage) );
  // 分配空间不足,idx==0说明没有足够的空间。
  idx = allocateSpace(pPage, sz);
  for(j=pPage->nCell; j>i; j--){
    pPage->apCell[j] = pPage->apCell[j-1];
  }
  pPage->nCell++;
  if( idx<=0 ){
    // 空间不足,标记已overfull
    pPage->isOverfull = 1;
    pPage->apCell[i] = pCell;
  }else{
    memcpy(&pPage->u.aDisk[idx], pCell, sz);
    pPage->apCell[i] = (Cell*)&pPage->u.aDisk[idx];
  }
}

static int allocateSpace(MemPage *pPage, int nByte){
  FreeBlk *p;
  u16 *pIdx;
  int start;
  int cnt = 0;

  assert( sqlitepager_iswriteable(pPage) );
  assert( nByte==ROUNDUP(nByte) );
  // 空间不足或者之前已经不足过
  if( pPage->nFree<nByte || pPage->isOverfull ) return 0;
  pIdx = &pPage->u.hdr.firstFree;
  p = (FreeBlk*)&pPage->u.aDisk[*pIdx];
  while( p->iSize<nByte ){
    assert( cnt++ < SQLITE_PAGE_SIZE/4 );
    if( p->iNext==0 ){
      // 没有一个足够大的空闲块,需要整理页。将所有的Cell移动到页的开始,最后有一个大的空闲块。
      defragmentPage(pPage);
      pIdx = &pPage->u.hdr.firstFree;
    }else{
      pIdx = &p->iNext;
    }
    p = (FreeBlk*)&pPage->u.aDisk[*pIdx];
  }
  // 正好匹配
  if( p->iSize==nByte ){
    start = *pIdx;
    *pIdx = p->iNext;
  }else{
    // 分裂出一个新块
    FreeBlk *pNew;
    start = *pIdx;
    pNew = (FreeBlk*)&pPage->u.aDisk[start + nByte];
    pNew->iNext = p->iNext;
    pNew->iSize = p->iSize - nByte;
    *pIdx = start + nByte;
  }
  pPage->nFree -= nByte;
  return start;
}

sqliteBtreeDelete 用于删除Cursor当前指向的KV。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
int sqliteBtreeDelete(BtCursor *pCur){
  MemPage *pPage = pCur->pPage;
  Cell *pCell;
  int rc;
  Pgno pgnoChild;

  if( pCur->pPage==0 ){
    return SQLITE_ABORT;  /* A rollback destroyed this cursor */
  }
  if( !pCur->pBt->inTrans ){
    return SQLITE_ERROR;  /* Must start a transaction first */
  }
  if( pCur->idx >= pPage->nCell ){
    return SQLITE_ERROR;  /* The cursor is not pointing to anything */
  }
  if( !pCur->wrFlag ){
    return SQLITE_PERM;   /* Did not open this cursor for writing */
  }
  rc = sqlitepager_write(pPage);
  if( rc ) return rc;
  pCell = pPage->apCell[pCur->idx];
  pgnoChild = pCell->h.leftChild;
  clearCell(pCur->pBt, pCell);
  if( pgnoChild ){
    /*
    ** The entry we are about to delete is not a leaf so if we do not
    ** do something we will leave a hole on an internal page.
    ** We have to fill the hole by moving in a cell from a leaf.  The
    ** next Cell after the one to be deleted is guaranteed to exist and
    ** to be a leaf so we can use it.
    */
    // 如果不是叶节点,将该节点的下一个节点移动过来,以满足B树的要求。
    BtCursor leafCur;
    Cell *pNext;
    int szNext;
    getTempCursor(pCur, &leafCur);
    // 找到next节点
    rc = sqliteBtreeNext(&leafCur, 0);
    if( rc!=SQLITE_OK ){
      return SQLITE_CORRUPT;
    }
    rc = sqlitepager_write(leafCur.pPage);
    if( rc ) return rc;
    // 从当前page drop当前cell
    dropCell(pPage, pCur->idx, cellSize(pCell));
    pNext = leafCur.pPage->apCell[leafCur.idx];
    szNext = cellSize(pNext);
    // 修改next节点的leftChild值
    pNext->h.leftChild = pgnoChild;
    // 将next节点插入到当前的page
    insertCell(pPage, pCur->idx, pNext, szNext);
    // 平衡
    rc = balance(pCur->pBt, pPage, pCur);
    if( rc ) return rc;
    pCur->bSkipNext = 1;
    // 将next节点drop掉
    dropCell(leafCur.pPage, leafCur.idx, szNext);
    // 平衡
    rc = balance(pCur->pBt, leafCur.pPage, pCur);
    releaseTempCursor(&leafCur);
  }else{
    // drop cell  
    dropCell(pPage, pCur->idx, cellSize(pCell));
    if( pCur->idx>=pPage->nCell ){
      pCur->idx = pPage->nCell-1;
      if( pCur->idx<0 ){ 
        pCur->idx = 0;
        pCur->bSkipNext = 1;
      }else{
        pCur->bSkipNext = 0;
      }
    }else{
      pCur->bSkipNext = 1;
    }
    rc = balance(pCur->pBt, pPage, pCur);
  }
  return rc;
}

平衡操作

在看balance实现之前,先看几个辅助函数

relinkCellList 用于在插入或删除Cell后,更新MemPage.u.aDisk 中的next指针,这是因为在insertCell和dropCell中只处理了MemPage.apCell,没有处理aDisk中的iNext。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
static void relinkCellList(MemPage *pPage){
  int i;
  u16 *pIdx;
  assert( sqlitepager_iswriteable(pPage) );
  pIdx = &pPage->u.hdr.firstCell;
  for(i=0; i<pPage->nCell; i++){
    int idx = Addr(pPage->apCell[i]) - Addr(pPage);
    assert( idx>0 && idx<SQLITE_PAGE_SIZE );
    *pIdx = idx;
    pIdx = &pPage->apCell[i]->h.iNext;
  }
  *pIdx = 0;
}

copyPage 用于拷贝存在overfull时的page,对于不在page的Cell,拷贝后的page仍然指向原内存。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
static void copyPage(MemPage *pTo, MemPage *pFrom){
  uptr from, to;
  int i;
  memcpy(pTo->u.aDisk, pFrom->u.aDisk, SQLITE_PAGE_SIZE);
  pTo->pParent = 0;
  pTo->isInit = 1;
  pTo->nCell = pFrom->nCell;
  pTo->nFree = pFrom->nFree;
  pTo->isOverfull = pFrom->isOverfull;
  to = Addr(pTo);
  from = Addr(pFrom);
  for(i=0; i<pTo->nCell; i++){
    uptr x = Addr(pFrom->apCell[i]);
    // 如果在page范围内,更新,否则用from的值
    if( x>from && x<from+SQLITE_PAGE_SIZE ){
      *((uptr*)&pTo->apCell[i]) = x + to - from;
    }else{
      pTo->apCell[i] = pFrom->apCell[i];
    }
  }
}

reparentChildPages 用于在移动page后修改内存中的child页的pParent 指针,因为pParent 指针仅在内存中存储,所以在实现时不需要从磁盘加载没有缓存的page。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static void reparentChildPages(Pager *pPager, MemPage *pPage){
  int i;
  for(i=0; i<pPage->nCell; i++){
    reparentPage(pPager, pPage->apCell[i]->h.leftChild, pPage);
  }
  reparentPage(pPager, pPage->u.hdr.rightChild, pPage);
}

static void reparentPage(Pager *pPager, Pgno pgno, MemPage *pNewParent){
  MemPage *pThis;

  if( pgno==0 ) return;
  assert( pPager!=0 );
  // 只需要修改内存中缓存的页的parent,因此调lookup而不是get
  pThis = sqlitepager_lookup(pPager, pgno);
  if( pThis && pThis->isInit ){
    if( pThis->pParent!=pNewParent ){
      if( pThis->pParent ) sqlitepager_unref(pThis->pParent);
      pThis->pParent = pNewParent;
      if( pNewParent ) sqlitepager_ref(pNewParent);
    }
    sqlitepager_unref(pThis);
  }
}

铺垫完成,下面来看B树的平衡实现。

balance 用于实现B树的平衡,它的基本思想很简单,找到页面相邻的两个兄弟页面,然后根据这三个页面的总空间需求,分配新的页面来存储,新的页面的free空间尽可能均匀。最终兄弟页面的个数可能会加1或减1,目标是页面的空间使用在66%到100%之间。

1 先看root page的处理。

  • 如果Cell个数为0,且有rightChild,将rightChild的页拷贝过来作为新的root page。拷贝之后需要修改内存中child页的parent指针。
  • 如果root页overfull,创建一个新的child页拷贝过去,然后将root页设置为空,并且rightChild指向新页,后面的逻辑会执行新页的split。通过这样的处理可以复用非root页的balance逻辑。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
static int balance(Btree *pBt, MemPage *pPage, BtCursor *pCur){
  // 省略一些变量定义

  pParent = pPage->pParent;
  // 1 root页处理
  if( pParent==0 ){
    Pgno pgnoChild;
    MemPage *pChild;
    if( pPage->nCell==0 ){
      // 如果root page没有cell但rightChild不为空
      if( pPage->u.hdr.rightChild ){
        /*
        ** The root page is empty.  Copy the one child page
        ** into the root page and return.  This reduces the depth
        ** of the BTree by one.
        */
        pgnoChild = pPage->u.hdr.rightChild;
        rc = sqlitepager_get(pBt->pPager, pgnoChild, (void**)&pChild);
        if( rc ) return rc;
        // 拷贝过来后重新初始化
        memcpy(pPage, pChild, SQLITE_PAGE_SIZE);
        pPage->isInit = 0;
        rc = initPage(pPage, sqlitepager_pagenumber(pPage), 0);
        assert( rc==SQLITE_OK );
        // 拷贝之后,修改内存中的child页的parent指针
        reparentChildPages(pBt->pPager, pPage);
        if( pCur && pCur->pPage==pChild ){
          sqlitepager_unref(pChild);
          pCur->pPage = pPage;
          sqlitepager_ref(pPage);
        }
        freePage(pBt, pChild, pgnoChild);
        sqlitepager_unref(pChild);
      }else{
        relinkCellList(pPage);
      }
      return SQLITE_OK;
    }
    if( !pPage->isOverfull ){
      /* It is OK for the root page to be less than half full.
      */
      relinkCellList(pPage);
      return SQLITE_OK;
    }
    /*
    ** If we get to here, it means the root page is overfull.
    ** When this happens, Create a new child page and copy the
    ** contents of the root into the child.  Then make the root
    ** page an empty page with rightChild pointing to the new
    ** child.  Then fall thru to the code below which will cause
    ** the overfull child page to be split.
    */
    // root页overfull,创建一个新的child页拷贝过去,
    // 然后将root页设置为空,并且rightChild指向新页,后面的逻辑会执行新页的split
    rc = sqlitepager_write(pPage);
    if( rc ) return rc;
    rc = allocatePage(pBt, &pChild, &pgnoChild);
    if( rc ) return rc;
    assert( sqlitepager_iswriteable(pChild) );
    // 和上面memcpy然后initPage的区别在于,copyPage会考虑overfull,这些cell仍然指向pFrom的原始内存
    copyPage(pChild, pPage);
    pChild->pParent = pPage;
    sqlitepager_ref(pPage);
    pChild->isOverfull = 1;
    if( pCur && pCur->pPage==pPage ){
      sqlitepager_unref(pPage);
      pCur->pPage = pChild;
    }else{
      extraUnref = pChild;
    }
    zeroPage(pPage);
    pPage->u.hdr.rightChild = pgnoChild;
    pParent = pPage;
    pPage = pChild;
  }
  // 需要修改parent页
  rc = sqlitepager_write(pParent);
  if( rc ) return rc;

2 在页的左右各取一个兄弟。如果页是最左或最右时,会取同一边的两个兄弟。如果兄弟个数少于等于3,会用上所有的兄弟页。这些页的Cell加上parent中分割的Cell会一起计算总的空间需求,用于分配新页。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
  // 省略一些逻辑,设置idx为parent页指向本页的Cell的下标

  // 2 在页左右各取一个兄弟。如果页是最左或最右时,会取同一边的两个兄弟。
  // 如果兄弟个数少于等于3,会用上所有的兄弟页。
  if( idx==pParent->nCell ){
    nxDiv = idx - 2;
  }else{
    nxDiv = idx - 1;
  }
  if( nxDiv<0 ) nxDiv = 0;
  nDiv = 0;
  for(i=0, k=nxDiv; i<3; i++, k++){
    if( k<pParent->nCell ){
      // parent只分割Cell的index
      idxDiv[i] = k;
      // 获取parent中的分割Cell,会和兄弟page的Cell一起用于分配新页
      apDiv[i] = pParent->apCell[k];
      nDiv++;
      pgnoOld[i] = apDiv[i]->h.leftChild;
    }else if( k==pParent->nCell ){
      pgnoOld[i] = pParent->u.hdr.rightChild;
    }else{
      break;
    }
    // 加载兄弟页
    rc = sqlitepager_get(pBt->pPager, pgnoOld[i], (void**)&apOld[i]);
    if( rc ) goto balance_cleanup;
    rc = initPage(apOld[i], pgnoOld[i], pParent);
    if( rc ) goto balance_cleanup;
    nOld++;
  }

3 将找到的兄弟页拷贝到内存中的临时页,因为这些兄弟页后续可能会修改。

1
2
3
4
5
6
7
8
9

  // 3 将这些页拷贝到内存中新页,因为原始页有可能修改。
  for(i=0; i<nOld; i++){
    copyPage(&aOld[i], apOld[i]);
    rc = freePage(pBt, apOld[i], pgnoOld[i]);
    if( rc ) goto balance_cleanup;
    sqlitepager_unref(apOld[i]);
    apOld[i] = &aOld[i];
  }

4 计算兄弟页和parent中的分割Cell需要使用的空间,把这些Cell都拷贝到apCell数组,然后计算需要分配多少新页,每个新页的剩余空间保证相对均匀。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
  
  // 4 计算兄弟页和parent中的分割Cell需要使用的空间,把这些Cell都拷贝到apCell数组
  Cell *apCell[MX_CELL*3+5];   /* All cells from pages being balanceed */
  int szCell[MX_CELL*3+5];     /* Local size of all cells */
  nCell = 0;
  for(i=0; i<nOld; i++){
    MemPage *pOld = apOld[i];
    for(j=0; j<pOld->nCell; j++){
      apCell[nCell] = pOld->apCell[j];
      szCell[nCell] = cellSize(apCell[nCell]);
      nCell++;
    }
    // 拷贝parent中的分割Cell
    if( i<nOld-1 ){
      szCell[nCell] = cellSize(apDiv[i]);
      memcpy(&aTemp[i], apDiv[i], szCell[nCell]);
      apCell[nCell] = &aTemp[i];
      // 临时从parent中删除
      dropCell(pParent, nxDiv, szCell[nCell]);
      assert( apCell[nCell]->h.leftChild==pgnoOld[i] );
      apCell[nCell]->h.leftChild = pOld->u.hdr.rightChild;
      nCell++;
    }
  }

  // 计算总的size
  totalSize = 0;
  for(i=0; i<nCell; i++){
    totalSize += szCell[i];
  }

  int cntNew[4];               /* Index in apCell[] of cell after i-th page */
  int szNew[4];                /* Combined size of cells place on i-th page */
  for(subtotal=k=i=0; i<nCell; i++){
    subtotal += szCell[i];
    // #define USABLE_SPACE  (SQLITE_PAGE_SIZE - sizeof(PageHdr))
    // 分配一个新页
    if( subtotal > USABLE_SPACE ){
      szNew[k] = subtotal - szCell[i];
      cntNew[k] = i;
      subtotal = 0;
      k++;
    }
  }
  szNew[k] = subtotal;
  cntNew[k] = nCell;
  k++;
  // 经过上一个循环,前面的页会接近满,而后面的页有可能接近空
  // 下面的循环再进行调整,至少占满USABLE_SPACE/2
  for(i=k-1; i>0; i--){
    while( szNew[i]<USABLE_SPACE/2 ){
      // 从前面挪Cell
      cntNew[i-1]--;
      assert( cntNew[i-1]>0 );
      szNew[i] += szCell[cntNew[i-1]];
      szNew[i-1] -= szCell[cntNew[i-1]-1];
    }
  }
  assert( cntNew[0]>0 );
  
  // 分配k个新页
  for(i=0; i<k; i++){
    rc = allocatePage(pBt, &apNew[i], &pgnoNew[i]);
    if( rc ) goto balance_cleanup;
    nNew++;
    zeroPage(apNew[i]);
    apNew[i]->isInit = 1;
  }

5 将新页按照页号排序,然后写入Cell到新页,并把分割的Cell插入到parent中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
  // 5 将新页按照页号排序,排序后磁盘文件是有序的,scan操作是线性的。注释中说排序可以让大的插入和删除快25%。
  for(i=0; i<k-1; i++){
    int minV = pgnoNew[i];
    int minI = i;
    for(j=i+1; j<k; j++){
      if( pgnoNew[j]<minV ){
        minI = j;
        minV = pgnoNew[j];
      }
    }
    if( minI>i ){
      int t;
      MemPage *pT;
      t = pgnoNew[i];
      pT = apNew[i];
      pgnoNew[i] = pgnoNew[minI];
      apNew[i] = apNew[minI];
      pgnoNew[minI] = t;
      apNew[minI] = pT
    }
  }

  /*
  ** Evenly distribute the data in apCell[] across the new pages.
  ** Insert divider cells into pParent as necessary.
  */
  j = 0;
  for(i=0; i<nNew; i++){
    MemPage *pNew = apNew[i];
    while( j<cntNew[i] ){
      assert( pNew->nFree>=szCell[j] );
      // 插入到页中
      insertCell(pNew, pNew->nCell, apCell[j], szCell[j]);
      j++;
    }
    assert( pNew->nCell>0 );
    assert( !pNew->isOverfull );
    // 维护Cell的next指针
    relinkCellList(pNew);
    if( i<nNew-1 && j<nCell ){
      // 按照B树的性质,右兄弟节点的子结点也大于自己,所以这里是设置rightChild
      pNew->u.hdr.rightChild = apCell[j]->h.leftChild;
      // 设置parent的分割Cell的leftChild为pgnoNew[i]
      apCell[j]->h.leftChild = pgnoNew[i];
      // 分割的Cell插入parent
      insertCell(pParent, nxDiv, apCell[j], szCell[j]);
      j++;
      nxDiv++;
    }
  }
  assert( j==nCell );
  apNew[nNew-1]->u.hdr.rightChild = apOld[nOld-1]->u.hdr.rightChild;
  if( nxDiv==pParent->nCell ){
    pParent->u.hdr.rightChild = pgnoNew[nNew-1];
  }else{
    pParent->apCell[nxDiv]->h.leftChild = pgnoNew[nNew-1];
  }

6 最后做一些维护工作。由于balance可能修改parent,所以对parent调用balance。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13

  /*
  ** Reparent children of all cells.
  */
  for(i=0; i<nNew; i++){
    reparentChildPages(pBt->pPager, apNew[i]);
  }
  reparentChildPages(pBt->pPager, pParent);

  /*
  ** balance the parent page.
  */
  rc = balance(pBt, pParent, pCur);

事务操作

多个读事务可以同时执行。

写事务执行时,其他读写事务都不能执行。

事务实现基本就是调用下Pager的事务操作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
int sqliteBtreeBeginTrans(Btree *pBt){
  int rc;
  if( pBt->inTrans ) return SQLITE_ERROR;
  if( pBt->page1==0 ){
      // 加载页面,如果数据库不为空,校验magic number
    rc = lockBtree(pBt);
    if( rc!=SQLITE_OK ){
      return rc;
    }
  }
  if( pBt->readOnly ){
    rc = SQLITE_OK;
  }else{
    rc = sqlitepager_begin(pBt->page1);
    if( rc==SQLITE_OK ){
      rc = newDatabase(pBt);
    }
  }
  if( rc==SQLITE_OK ){
    pBt->inTrans = 1;
    pBt->inCkpt = 0;
  }else{
    unlockBtreeIfUnused(pBt);
  }
  return rc;
}

int sqliteBtreeRollback(Btree *pBt){
  int rc;
  BtCursor *pCur;
  if( pBt->inTrans==0 ) return SQLITE_OK;
  pBt->inTrans = 0;
  pBt->inCkpt = 0;
  for(pCur=pBt->pCursor; pCur; pCur=pCur->pNext){
    if( pCur->pPage ){
      sqlitepager_unref(pCur->pPage);
      pCur->pPage = 0;
    }
  }
  rc = pBt->readOnly ? SQLITE_OK : sqlitepager_rollback(pBt->pPager);
  unlockBtreeIfUnused(pBt);
  return rc;
}

int sqliteBtreeCommit(Btree *pBt){
  int rc;
  if( pBt->inTrans==0 ) return SQLITE_ERROR;
  rc = pBt->readOnly ? SQLITE_OK : sqlitepager_commit(pBt->pPager);
  pBt->inTrans = 0;
  pBt->inCkpt = 0;
  unlockBtreeIfUnused(pBt);
  return rc;
}

对比BoltDB

  1. sqlite自己做page buffer管理,需要处理page淘汰、脏页写盘,而BoltDB使用系统mmap来管理内存,很多管理逻辑都委托给OS,实现相对简单。
  2. 这个版本的sqlite只有B树实现,而BoltDB使用B+树实现。
  3. sqlite的rebalance实现更加复杂,BoltDB相对简单。

代码技巧

sqlite实现中的一些代码技巧值得学习

  1. 将地址当作数组访问,然后方便的定位到该结构体前后的内存。

    1
    
    #define PGHDR_TO_DATA(P)  ((void*)(&(P)[1]))
    
  2. 通过指针来简化一些处理,类似于Linus:利用二级指针删除单向链表 | 酷 壳 - CoolShell的技巧。

    比如下面的代码中,如果不用指针,firstCell和iNext需要分别去设置,但通过指针把它们的设置逻辑统一了起来。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    
    static void relinkCellList(MemPage *pPage){
      int i;
      u16 *pIdx;
      assert( sqlitepager_iswriteable(pPage) );
      pIdx = &pPage->u.hdr.firstCell;
      for(i=0; i<pPage->nCell; i++){
        int idx = Addr(pPage->apCell[i]) - Addr(pPage);
        assert( idx>0 && idx<SQLITE_PAGE_SIZE );
        *pIdx = idx;
        pIdx = &pPage->apCell[i]->h.iNext;
      }
      *pIdx = 0;
    }