CMU15445 (Fall 2020) 数据库系统 Project#2 - B+ Tree 详解(上篇)
阅读原文时间:2023年08月18日阅读:12

前言

考虑到 B+ 树较为复杂,CMU15-445 将 B+ 树实验拆成了两部分,这篇博客将介绍 Checkpoint#1 部分的实现过程,搭配教材 《DataBase System Concepts》食用更佳。

B+ 树索引

许多查询只涉及文件中的少量记录,例如“找出物理系所有教师”的查询就只涉及教师记录中的一小部分,如果数据库系统读取表中每一个元组并检查系名称是否为“物理”,这样是十分低效的。理想情况下,数据库系统应能直接定位这些记录。为了支持这种快速访问方式,数据库系统使用了索引。

有两种基本的索引类型:

  • 顺序索引:基于值的顺序排序。
  • 散列索引:基于将值平均分布到若干散列桶中。一个值所属的散列桶是由一个函数决定的,该函数称为散列函数(hash function)。

对于等值查询,哈希索引的速度最快,时间复杂度为 \(O(1)\) ,但是对于范围查询,哈希索引表示无能为力。这时候顺序索引就可以派上用场了,目前数据库系统用的最多的顺序索引是 B+ 树索引,时间复杂度为 \(O(logN)\)。下图显示了一棵 B+ 树,它是右下角数据表第二列的索引:

可以看到 B+ 树是一种多路平衡搜索树,由叶节点和内部节点组成,其中叶节点可以保存整条记录或者是记录的行 ID,内部节点只会保存键和指向子节点的指针,这样数据库系统就能读入更多的内部节点到内存中,减少磁盘 IO 次数。除了根节点外,其他节点都是至少半满的。

代码实现

B+ Tree Parent Page

B+ 树的每个节点对应着一个 Page,内部节点对应 BPlusTreeInternalPage,叶节点对应 BPlusTreeLeafPage,这两种页结构相似,所以先实现父类 BPlusTreePage,它的结构如下图所示(图片使用 Excalidraw 绘制):

header 占用了 24 字节,各个字段的大小和描述如下表所示:

Variable Name

Size

Description

page_type_

4

Page Type (internal or leaf)

lsn_

4

Log sequence number (Used in Project 4)

size_

4

Number of Key & Value pairs in page

max_size_

4

Max number of Key & Value pairs in page

parent_page_id_

4

Parent Page Id

page_id_

4

Self Page Id

BPlusTreePage 声明如下:

#define MappingType std::pair<KeyType, ValueType>

#define INDEX_TEMPLATE_ARGUMENTS template <typename KeyType, typename ValueType, typename KeyComparator>

// define page type enum
enum class IndexPageType { INVALID_INDEX_PAGE = 0, LEAF_PAGE, INTERNAL_PAGE };

class BPlusTreePage {
 public:
  bool IsLeafPage() const;
  bool IsRootPage() const;
  void SetPageType(IndexPageType page_type);

  int GetSize() const;
  void SetSize(int size);
  void IncreaseSize(int amount);

  int GetMaxSize() const;
  void SetMaxSize(int max_size);
  int GetMinSize() const;

  page_id_t GetParentPageId() const;
  void SetParentPageId(page_id_t parent_page_id);

  page_id_t GetPageId() const;
  void SetPageId(page_id_t page_id);

  void SetLSN(lsn_t lsn = INVALID_LSN);

 private:
  IndexPageType page_type_ __attribute__((__unused__));
  lsn_t lsn_ __attribute__((__unused__));
  int size_ __attribute__((__unused__));
  int max_size_ __attribute__((__unused__));
  page_id_t parent_page_id_ __attribute__((__unused__));
  page_id_t page_id_ __attribute__((__unused__));
};

该类的实现较为简单,但是 GetMinSize() 函数值得细细推敲。根据教材中的描述,当最大键值对数量为 \(N\) 时,节点的最少键值对数量存在三种情况:

  • 根节点:

    • 根节点是叶节点时,内部至少需要 1 个键值对,这个很好理解,比如往空的 B+ 树插入一个键值对后,新建的根节点肯定只会有一个键值对;
    • 根节点是内部节点是,内部至少需要 2 个键值对,除去最左侧的无效键,还需要一个键与输入的键进行比较;
  • 内部节点:节点插入数据之后可能溢出,这时需要进行分裂操作,为了简化分裂代码的编写,内部节点和根节点会留出一个键值对的位置作为哨兵,实际最大键值对数量为 \(N-1\),加上最左侧的无效键,最小键值对数量为 \(\lceil (N-2)/2 \rceil+1\);

  • 根节点:最小键值对数量为 \(\lceil (N-1)/2 \rceil\)

    /*

    • Helper methods to get/set page type
    • Page type enum class is defined in b_plus_tree_page.h
      */
      bool BPlusTreePage::IsLeafPage() const { return page_type_ == IndexPageType::LEAF_PAGE; }
      bool BPlusTreePage::IsRootPage() const { return parent_page_id_ == INVALID_PAGE_ID; }
      void BPlusTreePage::SetPageType(IndexPageType page_type) { page_type_ = page_type; }

    /*

    • Helper methods to get/set size (number of key/value pairs stored in that
    • page)
      */
      int BPlusTreePage::GetSize() const { return size_; }
      void BPlusTreePage::SetSize(int size) { size_ = size; }
      void BPlusTreePage::IncreaseSize(int amount) { size_ += amount; }

    /*

    • Helper methods to get/set max size (capacity) of the page
      */
      int BPlusTreePage::GetMaxSize() const { return max_size_; }
      void BPlusTreePage::SetMaxSize(int size) { max_size_ = size; }

    /*

    • Helper method to get min page size

    • Generally, min page size == max page size / 2
      */
      int BPlusTreePage::GetMinSize() const {
      // 数据库系统概念中文第六版 Page #486
      // 叶节点最少有一个键,而内部节点除了无效的第一个键外还需一个键
      if (IsRootPage()) {
      return IsLeafPage() ? 1 : 2;
      }

      // 内部节点的最小键值对数量为 ⌈(max_size_ - 1 - 1)/2⌉ + 1
      if (!IsLeafPage()) {
      return (max_size_ - 2 + 1) / 2 + 1;
      }

      // 叶节点最小键值对数量为 ⌈(max_size_ - 1)/2⌉
      return (max_size_ - 1 + 1) / 2;
      }

    /*

    • Helper methods to get/set parent page id
      */
      page_id_t BPlusTreePage::GetParentPageId() const { return parent_page_id_; }
      void BPlusTreePage::SetParentPageId(page_id_t parent_page_id) { parent_page_id_ = parent_page_id; }

    /*

    • Helper methods to get/set self page id
      */
      page_id_t BPlusTreePage::GetPageId() const { return page_id_; }
      void BPlusTreePage::SetPageId(page_id_t page_id) { page_id_ = page_id; }

    /*

    • Helper methods to set lsn
      */
      void BPlusTreePage::SetLSN(lsn_t lsn) { lsn_ = lsn; }

B+ Tree Internal Page

内部节点页的结构如下图所示,它比父类多了一个 array 数组成员,用于存放 key | page_id 键值对,其中 page_id 是子节点的页 id:

,声明如下述代码所示。可以看到 array 的长度声明为 0,这种写法比较神奇,它的意思是 array 的实际长度是在运行时确定的。实验一中我们实现了缓冲池管理器,用于缓存 Page,而节点页就存放在 char Page::data_[PAGE_SIZE] 中,当 PAGE_SIZE 为 4096 字节时,array 最多能放 INTERNAL_PAGE_SIZE 个键值对:

#define B_PLUS_TREE_INTERNAL_PAGE_TYPE BPlusTreeInternalPage<KeyType, ValueType, KeyComparator>
#define INTERNAL_PAGE_HEADER_SIZE 24
#define INTERNAL_PAGE_SIZE ((PAGE_SIZE - INTERNAL_PAGE_HEADER_SIZE) / (sizeof(MappingType)))
/**
 * Store n indexed keys and n+1 child pointers (page_id) within internal page.
 * Pointer PAGE_ID(i) points to a subtree in which all keys K satisfy:
 * K(i) <= K < K(i+1).
 * NOTE: since the number of keys does not equal to number of child pointers,
 * the first key always remains invalid. That is to say, any search/lookup
 * should ignore the first key.
 */
INDEX_TEMPLATE_ARGUMENTS
class BPlusTreeInternalPage : public BPlusTreePage {
 public:
  void Init(page_id_t page_id, page_id_t parent_id = INVALID_PAGE_ID, int max_size = INTERNAL_PAGE_SIZE);

  KeyType KeyAt(int index) const;
  void SetKeyAt(int index, const KeyType &key);
  int ValueIndex(const ValueType &value) const;
  ValueType ValueAt(int index) const;

  ValueType Lookup(const KeyType &key, const KeyComparator &comparator) const;
  void PopulateNewRoot(const ValueType &old_value, const KeyType &new_key, const ValueType &new_value);
  int InsertNodeAfter(const ValueType &old_value, const KeyType &new_key, const ValueType &new_value);
  void Remove(int index);
  ValueType RemoveAndReturnOnlyChild();

  // Split and Merge utility methods
  void MoveAllTo(BPlusTreeInternalPage *recipient, const KeyType &middle_key, BufferPoolManager *buffer_pool_manager);
  void MoveHalfTo(BPlusTreeInternalPage *recipient, BufferPoolManager *buffer_pool_manager);

 private:
  void CopyNFrom(MappingType *items, int size, BufferPoolManager *buffer_pool_manager);
  MappingType array[0];
};

先来实现几个简单的函数,其中 Init() 函数必须在内部节点页在缓冲池中被创建出来后调用:

/*
 * Init method after creating a new internal page
 * Including set page type, set current size, set page id, set parent id and set max page size
 */
INDEX_TEMPLATE_ARGUMENTS
void B_PLUS_TREE_INTERNAL_PAGE_TYPE::Init(page_id_t page_id, page_id_t parent_id, int max_size) {
  SetPageType(IndexPageType::INTERNAL_PAGE);
  SetSize(0);
  SetPageId(page_id);
  SetParentPageId(parent_id);
  SetMaxSize(max_size);
}

/*
 * Helper method to get/set the key associated with input "index"(a.k.a array offset)
 */
INDEX_TEMPLATE_ARGUMENTS
KeyType B_PLUS_TREE_INTERNAL_PAGE_TYPE::KeyAt(int index) const { return array[index].first; }

INDEX_TEMPLATE_ARGUMENTS
void B_PLUS_TREE_INTERNAL_PAGE_TYPE::SetKeyAt(int index, const KeyType &key) { array[index].first = key; }

/*
 * Helper method to find and return array index(or offset), so that its value equals to input "value"
 */
INDEX_TEMPLATE_ARGUMENTS
int B_PLUS_TREE_INTERNAL_PAGE_TYPE::ValueIndex(const ValueType &value) const {
  for (int i = 0; i < GetSize(); ++i) {
    if (ValueAt(i) == value) {
      return i;
    }
  }
  return -1;
}

/*
 * Helper method to get the value associated with input "index"(a.k.a array offset)
 */
INDEX_TEMPLATE_ARGUMENTS
ValueType B_PLUS_TREE_INTERNAL_PAGE_TYPE::ValueAt(int index) const { return array[index].second; }

B+ Tree Leaf Page

BPlusTreeLeafPage 的结构和 BPlusTreeInternalPage 相似,只是在 header 区域多了一个 next_page_id_ 字段,该字段是下一个叶节点的指针,用于将叶节点串成单向链表。同时 array 内部键值对存放的是 key | RID,指向实际数据的位置。

类声明如下:

#define B_PLUS_TREE_LEAF_PAGE_TYPE BPlusTreeLeafPage<KeyType, ValueType, KeyComparator>
#define LEAF_PAGE_HEADER_SIZE 28
#define LEAF_PAGE_SIZE ((PAGE_SIZE - LEAF_PAGE_HEADER_SIZE) / sizeof(MappingType))

/**
 * Store indexed key and record id(record id = page id combined with slot id,
 * see include/common/rid.h for detailed implementation) together within leaf
 * page. Only support unique key.
 */
INDEX_TEMPLATE_ARGUMENTS
class BPlusTreeLeafPage : public BPlusTreePage {
 public:
  void Init(page_id_t page_id, page_id_t parent_id = INVALID_PAGE_ID, int max_size = LEAF_PAGE_SIZE);
  // helper methods
  page_id_t GetNextPageId() const;
  void SetNextPageId(page_id_t next_page_id);
  KeyType KeyAt(int index) const;
  int KeyIndex(const KeyType &key, const KeyComparator &comparator) const;
  const MappingType &GetItem(int index);

  // insert and delete methods
  int Insert(const KeyType &key, const ValueType &value, const KeyComparator &comparator);
  bool Lookup(const KeyType &key, ValueType *value, const KeyComparator &comparator) const;
  int RemoveAndDeleteRecord(const KeyType &key, const KeyComparator &comparator);

  // Split and Merge utility methods
  void MoveHalfTo(BPlusTreeLeafPage *recipient, BufferPoolManager *buffer_pool_manager);
  void MoveAllTo(BPlusTreeLeafPage *recipient);
  void MoveFirstToEndOf(BPlusTreeLeafPage *recipient);
  void MoveLastToFrontOf(BPlusTreeLeafPage *recipient);

 private:
  void CopyNFrom(MappingType *items, int size);
  page_id_t next_page_id_;
  MappingType array[0];
};

先来实现几个简单的函数:

/**
 * Init method after creating a new leaf page
 * Including set page type, set current size to zero, set page id/parent id, set next page id and set max size
 */
INDEX_TEMPLATE_ARGUMENTS
void B_PLUS_TREE_LEAF_PAGE_TYPE::Init(page_id_t page_id, page_id_t parent_id, int max_size) {
  SetPageType(IndexPageType::LEAF_PAGE);
  SetSize(0);
  SetPageId(page_id);
  SetParentPageId(parent_id);
  SetNextPageId(INVALID_PAGE_ID);
  SetMaxSize(max_size);
}

/**
 * Helper methods to set/get next page id
 */
INDEX_TEMPLATE_ARGUMENTS
page_id_t B_PLUS_TREE_LEAF_PAGE_TYPE::GetNextPageId() const { return next_page_id_; }

INDEX_TEMPLATE_ARGUMENTS
void B_PLUS_TREE_LEAF_PAGE_TYPE::SetNextPageId(page_id_t next_page_id) { next_page_id_ = next_page_id; }

/*
 * Helper method to find and return the key associated with input "index"(a.k.a array offset)
 */
INDEX_TEMPLATE_ARGUMENTS
KeyType B_PLUS_TREE_LEAF_PAGE_TYPE::KeyAt(int index) const { return array[index].first; }

/*
 * Helper method to find and return the key & value pair associated with input "index"(a.k.a array offset)
 */
INDEX_TEMPLATE_ARGUMENTS
const MappingType &B_PLUS_TREE_LEAF_PAGE_TYPE::GetItem(int index) { return array[index]; }

至此可以画出整棵 B+ 树的结构:

B+ 树的搜索较为简单,先从根节点开始,将内部节点与搜索的 key 进行二分搜索,找出包含 key 的子节点的指针,然后向缓冲池要子节点页,接着在子节点上重复上述过程直到叶节点为止,最后在叶节点上进行二分搜索。

/**
 * Return the only value that associated with input key
 * This method is used for point query
 * @return : true means key exists
 */
INDEX_TEMPLATE_ARGUMENTS
bool BPLUSTREE_TYPE::GetValue(const KeyType &key, std::vector<ValueType> *result, Transaction *transaction) {
  if (IsEmpty()) {
    return false;
  }

  // 在叶节点中寻找 key
  LeafPage *leaf = ToLeafPage(FindLeafPage(key));

  ValueType value;
  auto success = leaf->Lookup(key, &value, comparator_);
  if (success) {
    result->push_back(value);
  }

  buffer_pool_manager_->UnpinPage(leaf->GetPageId(), false);
  return success;
}

/*
 * Helper function to decide whether current b+tree is empty
 */
INDEX_TEMPLATE_ARGUMENTS
bool BPLUSTREE_TYPE::IsEmpty() const { return root_page_id_ == INVALID_PAGE_ID; }

/**
 * Find leaf page containing particular key, if leftMost flag == true, find
 * the left most leaf page
 */
INDEX_TEMPLATE_ARGUMENTS
Page *BPLUSTREE_TYPE::FindLeafPage(const KeyType &key, bool leftMost) {
  auto page_id = root_page_id_;
  auto page = buffer_pool_manager_->FetchPage(page_id);
  if (!page) {
    return nullptr;
  }

  // 定位到包含 key 的叶节点
  auto node = ToTreePage(page);
  while (!node->IsLeafPage()) {
    InternalPage *inode = ToInternalPage(node);

    // 寻找下一个包含 key 的节点
    if (!leftMost) {
      page_id = inode->Lookup(key, comparator_);
    } else {
      page_id = inode->ValueAt(0);
    }

    // 移动到子节点
    buffer_pool_manager_->UnpinPage(page->GetPageId(), false);
    page = buffer_pool_manager_->FetchPage(page_id);
    node = ToTreePage(page);
  }

  return page;
}

由于经常要进行 Page –> BPlusTreePage –> BPlusTreeLeafPage/BplusTreeInternalPage 的转换,所以在类声明内部添加了几个转换函数:

BPlusTreePage *ToTreePage(Page *page) { return reinterpret_cast<BPlusTreePage *>(page->GetData()); }
InternalPage *ToInternalPage(Page *page) { return reinterpret_cast<InternalPage *>(page->GetData()); }
LeafPage *ToLeafPage(Page *page) { return reinterpret_cast<LeafPage *>(page->GetData()); }

InternalPage *ToInternalPage(BPlusTreePage *page) { return reinterpret_cast<InternalPage *>(page); }
LeafPage *ToLeafPage(BPlusTreePage *page) { return reinterpret_cast<LeafPage *>(page); }

内部节点二分查找 InternalPage::LoopUp() 的实现如下所示,这里使用了一种大一统的二分查找写法,无需考虑索引是否加减 1 的问题以及 leftright 指针比较的时候要不要加等号的问题,只要写好分区条件就行,非常好用,墙裂推荐花几分钟看下 B 站的这个教程「二分查找为什么总是写错?」

因为第一个键无效,所以 left 的初始值为 0,而将索引分给 left 区域的条件就是 KeyAt(mid) 小于等于 key,这样循环结束的时候 left 对应的就是最后一个小于等于 key 的键,指针指向的子节点必包含 key

/*
 * Find and return the child pointer(page_id) which points to the child page
 * that contains input "key"
 * Start the search from the second key(the first key should always be invalid)
 */
INDEX_TEMPLATE_ARGUMENTS
ValueType B_PLUS_TREE_INTERNAL_PAGE_TYPE::Lookup(const KeyType &key, const KeyComparator &comparator) const {
  // 二分查找大一统写法:https://www.bilibili.com/video/BV1d54y1q7k7/
  int left = 0, right = GetSize();
  while (left + 1 < right) {
    int mid = left + (right - left) / 2;
    if (comparator(KeyAt(mid), key) <= 0) {
      left = mid;
    } else {
      right = mid;
    }
  }
  return ValueAt(left);
}

叶节点二分查找的代码为:

/*
 * For the given key, check to see whether it exists in the leaf page. If it
 * does, then store its corresponding value in input "value" and return true.
 * If the key does not exist, then return false
 */
INDEX_TEMPLATE_ARGUMENTS
bool B_PLUS_TREE_LEAF_PAGE_TYPE::Lookup(const KeyType &key, ValueType *value, const KeyComparator &comparator) const {
  int left = 0, right = GetSize() - 1;
  while (left <= right) {
    int mid = left + (right - left) / 2;
    int cmp = comparator(KeyAt(mid), key);
    if (cmp == 0) {
      *value = array[mid].second;
      return true;
    } else if (cmp > 0) {
      right = mid - 1;
    } else {
      left = mid + 1;
    }
  }
  return false;
}

B+ 树的插入较为复杂,定位到需要插入的叶节点后需要考虑节点溢出的问题,可以在这个算法可视化网站查看 B+ 树的插入过程。教材中给出了插入算法的伪代码:

总体流程就是先判断树是否为空,如果是空的就新建一个叶节点作为根节点并插入键值对,否则将键值对插到对应的叶节点 \(N\) 中:

/**
 * Insert constant key & value pair into b+ tree
 * if current tree is empty, start new tree, update root page id and insert
 * entry, otherwise insert into leaf page.
 * @return: since we only support unique key, if user try to insert duplicate
 * keys return false, otherwise return true.
 */
INDEX_TEMPLATE_ARGUMENTS
bool BPLUSTREE_TYPE::Insert(const KeyType &key, const ValueType &value, Transaction *transaction) {
  if (!IsEmpty()) {
    return InsertIntoLeaf(key, value, transaction);
  }

  StartNewTree(key, value);
  return true;
}

/**
 * Insert constant key & value pair into an empty tree
 * User needs to first ask for new page from buffer pool manager(NOTICE: throw
 * an "out of memory" exception if returned value is nullptr), then update b+
 * tree's root page id and insert entry directly into leaf page.
 */
INDEX_TEMPLATE_ARGUMENTS
void BPLUSTREE_TYPE::StartNewTree(const KeyType &key, const ValueType &value) {
  // 创建一个叶节点作为根节点,并插入新数据
  LeafPage *root = ToLeafPage(NewPage(&root_page_id_));
  root->Init(root_page_id_, INVALID_PAGE_ID, leaf_max_size_);
  root->Insert(key, value, comparator_);

  UpdateRootPageId(1);
  buffer_pool_manager_->UnpinPage(root_page_id_, true);
}

INDEX_TEMPLATE_ARGUMENTS
Page *BPLUSTREE_TYPE::NewPage(page_id_t *page_id) {
  auto page = buffer_pool_manager_->NewPage(page_id);
  if (!page) {
    throw Exception(ExceptionType::OUT_OF_MEMORY, "Can't create new page");
  }

  return page;
}

如果叶节点 \(N\) 没有满,那么直接插入就行,否则需要对叶节点 \(N\) 进行分裂操作:

  1. 在缓冲池中新建一个页为叶节点 \(N'\);
  2. 将 \(N\) 的右半部分键值对移动到 \(N'\) 中,并更新 next_page_id_ 指针;
  3. 将 \(N'\) 最左侧的键 \(K'\) 和 \(N'\) 的指针插到 \(N\) 的父节点中

对应代码为:

/**
 * Insert constant key & value pair into leaf page
 * User needs to first find the right leaf page as insertion target, then look
 * through leaf page to see whether insert key exist or not. If exist, return
 * immdiately, otherwise insert entry. Remember to deal with split if necessary.
 * @return: since we only support unique key, if user try to insert duplicate
 * keys return false, otherwise return true.
 */
INDEX_TEMPLATE_ARGUMENTS
bool BPLUSTREE_TYPE::InsertIntoLeaf(const KeyType &key, const ValueType &value, Transaction *transaction) {
  // 定位到包含 key 的叶节点
  LeafPage *leaf = ToLeafPage(FindLeafPage(key));

  // 不能插入相同的键
  ValueType exist_value;
  if (leaf->Lookup(key, &exist_value, comparator_)) {
    buffer_pool_manager_->UnpinPage(leaf->GetPageId(), false);
    return false;
  }

  // 如果叶节点没有满,就直接插进去(array 最后留了一个空位),否则分裂叶节点并更新父节点
  auto size = leaf->Insert(key, value, comparator_);

  if (size == leaf_max_size_) {
    LeafPage *new_leaf = Split(leaf);
    InsertIntoParent(leaf, new_leaf->KeyAt(0), new_leaf, transaction);
    buffer_pool_manager_->UnpinPage(new_leaf->GetPageId(), true);
  }

  buffer_pool_manager_->UnpinPage(leaf->GetPageId(), true);
  return true;
}

/**
 * Split input page and return newly created page.
 * Using template N to represent either internal page or leaf page.
 * User needs to first ask for new page from buffer pool manager(NOTICE: throw
 * an "out of memory" exception if returned value is nullptr), then move half
 * of key & value pairs from input page to newly created page
 */
INDEX_TEMPLATE_ARGUMENTS
template <typename N>
N *BPLUSTREE_TYPE::Split(N *node) {
  // 创建新节点
  page_id_t new_page_id;
  auto new_page = NewPage(&new_page_id);
  N *new_node = reinterpret_cast<N *>(new_page->GetData());

  // 将右半边的 item 移动到新节点中
  auto max_size = node->IsLeafPage() ? leaf_max_size_ : internal_max_size_;
  new_node->Init(new_page_id, node->GetParentPageId(), max_size);
  node->MoveHalfTo(new_node, buffer_pool_manager_);

  return new_node;
}

叶节点使用 MoveHalfTo() 函数进行右半部分键值对的移动操作,这个函数的签名和初始代码不太一样,多加了一个 buffer_pool_manager 参数:

/*
 * Remove half of key & value pairs from this page to "recipient" page
 */
INDEX_TEMPLATE_ARGUMENTS
void B_PLUS_TREE_LEAF_PAGE_TYPE::MoveHalfTo(BPlusTreeLeafPage *recipient,
                                            __attribute__((unused)) BufferPoolManager *buffer_pool_manager) {
  int start = GetMinSize();
  int N = GetSize() - start;
  recipient->CopyNFrom(array + start, N);

  // 更新下一个页 id
  recipient->SetNextPageId(GetNextPageId());
  SetNextPageId(recipient->GetPageId());

  IncreaseSize(-N);
}

/*
 * Copy starting from items, and copy {size} number of elements into me.
 */
INDEX_TEMPLATE_ARGUMENTS
void B_PLUS_TREE_LEAF_PAGE_TYPE::CopyNFrom(MappingType *items, int size) {
  std::copy(items, items + size, array + GetSize());
  IncreaseSize(size);
}

父节点的插入算法较为复杂,如果父节点插入子节点传来的键值对之后也溢出了,需要进行分裂,并将新分裂出来的父节点的最左侧无效键和自己的指针插入父节点中。如果分裂的是根节点,需要新建根节点,之前的根节点变成内部节点,并将树的高度 +1。

/**
 * Insert key & value pair into internal page after split
 * @param   old_node      input page from split() method
 * @param   key
 * @param   new_node      returned page from split() method
 * User needs to first find the parent page of old_node, parent node must be
 * adjusted to take info of new_node into account. Remember to deal with split
 * recursively if necessary.
 */
INDEX_TEMPLATE_ARGUMENTS
void BPLUSTREE_TYPE::InsertIntoParent(BPlusTreePage *old_node, const KeyType &key, BPlusTreePage *new_node,
                                      Transaction *transaction) {
  // 根节点发生分裂需要新建一个根节点,B+树的高度 +1
  if (old_node->IsRootPage()) {
    auto root_page = NewPage(&root_page_id_);

    // 创建新节点并更新子节点指针
    InternalPage *root = ToInternalPage(root_page);
    root->Init(root_page_id_, INVALID_PAGE_ID, internal_max_size_);
    root->PopulateNewRoot(old_node->GetPageId(), key, new_node->GetPageId());

    // 更新父节点指针
    old_node->SetParentPageId(root_page_id_);
    new_node->SetParentPageId(root_page_id_);

    UpdateRootPageId(0);
    buffer_pool_manager_->UnpinPage(root_page_id_, true);
    return;
  }

  // 找到父节点并将新节点的最左侧 key 插入其中
  auto parent_id = old_node->GetParentPageId();
  InternalPage *parent = ToInternalPage(buffer_pool_manager_->FetchPage(parent_id));
  auto size = parent->InsertNodeAfter(old_node->GetPageId(), key, new_node->GetPageId());

  // 父节点溢出时需要再次分裂
  if (size == internal_max_size_) {
    InternalPage *new_page = Split(parent);
    InsertIntoParent(parent, new_page->KeyAt(0), new_page, transaction);
    buffer_pool_manager_->UnpinPage(new_page->GetPageId(), true);
  }

  buffer_pool_manager_->UnpinPage(parent_id, true);
}

内部节点分裂的代码为:

/*
 * Populate new root page with old_value + new_key & new_value
 * When the insertion cause overflow from leaf page all the way upto the root
 * page, you should create a new root page and populate its elements.
 * NOTE: This method is only called within InsertIntoParent()(b_plus_tree.cpp)
 */
INDEX_TEMPLATE_ARGUMENTS
void B_PLUS_TREE_INTERNAL_PAGE_TYPE::PopulateNewRoot(const ValueType &old_value, const KeyType &new_key,
                                                     const ValueType &new_value) {
  array[0].second = old_value;
  array[1] = {new_key, new_value};
  SetSize(2);
}

/*
 * Insert new_key & new_value pair right after the pair with its value == old_value
 * @return:  new size after insertion
 */
INDEX_TEMPLATE_ARGUMENTS
int B_PLUS_TREE_INTERNAL_PAGE_TYPE::InsertNodeAfter(const ValueType &old_value, const KeyType &new_key,
                                                    const ValueType &new_value) {
  // 找到 old_value 的位置并将后面的 item 后移一格
  auto index = ValueIndex(old_value) + 1;
  std::copy_backward(array + index, array + GetSize(), array + GetSize() + 1);

  array[index] = {new_key, new_value};
  IncreaseSize(1);
  return GetSize();
}

/*
 * Remove half of key & value pairs from this page to "recipient" page
 */
INDEX_TEMPLATE_ARGUMENTS
void B_PLUS_TREE_INTERNAL_PAGE_TYPE::MoveHalfTo(BPlusTreeInternalPage *recipient,
                                                BufferPoolManager *buffer_pool_manager) {
  // 第一个 key 本来就没有用,所以直接拷贝过来也没关系
  auto start = GetMinSize();
  int N = GetSize() - start;
  recipient->CopyNFrom(array + start, N, buffer_pool_manager);
  IncreaseSize(-N);
}

测试

在终端输入:

cd build
cmake ..
make

# 绘制 B+ 树的工具
make b_plus_tree_print_test
./test/b_plus_tree_print_test

# 从 grading scope 拔下来的测试代码
make b_plus_tree_checkpoint_1_test
./test/b_plus_tree_checkpoint_1_test

绘制 B+ 树的工具运行效果如下:

在 VSCode 中安装一个 Dot 文件查看器就能看到效果了:

grade scope 的测试结果如下,顺利通过了 4 个测试用例:

后记

这次实验实现了单线程 B+ 树的查找和插入操作,难度相比于实验一有了质的提升,写完之后成就感十足,可能这也是 Andy 将实验二拆成两部分的原因之一吧,下个实验就要实现二叉树的删除操作和并发了,期待一波~~