0%

Day4: Sorted String Table (SST)

Week1 Day4 的任务:

  1. 实现 SST 编码和元数据解码
  2. 实现 SST 解码
  3. 实现 SST 的迭代器

Week1 Day4

Tasks

Task 1: SST Builder

SST 由存储在磁盘上的数据块和索引块组成。数据块是懒加载的,也就是说只有当用户使用的时候才会被加载到内存中。索引块也是可以按照需要加载的,但是在这里的实现是简单地将索引块全部加载到内存中。一个 SST 的组成如下:block 区存储的是 day3 实现的 block,meta 区存储的是每个 block 的第一个/最后一个键,以及每个 block 的偏移量。

text
1
2
3
4
5
-------------------------------------------------------------------------------------------
| Block Section | Meta Section | Extra |
-------------------------------------------------------------------------------------------
| data block | ... | data block | metadata | meta block offset (u32) |
-------------------------------------------------------------------------------------------

首先是根据 block 大小创建一个 SST builder。其余所有元素全部用默认初值:

1
2
3
4
5
6
7
8
9
10
11
/// Create a builder based on target block size.
pub fn new(block_size: usize) -> Self {
Self {
builder: BlockBuilder::new(block_size),
first_key: Vec::new(),
last_key: Vec::new(),
data: Vec::new(),
meta: Vec::new(),
block_size,
}
}

而整个预估的大小可以近似用 block 区的大小来表示(因为block 区的大小比 meta 区大得多):

1
2
3
pub fn estimated_size(&self) -> usize {
self.data.len()
}

SST builder 可以通过往 block 里面插入键值对的方式来加入数据。如果现有的 block 大小达到了设定的限制,就需要将一个 block 「分裂」成两个(新增一个 block)。加入的过程中,首先判断当前第一个键是否为空,如果为空,说明当前插入的是第一个键,将其作为第一个键。然后尝试往 block 里面插入数据,如果插入成功(BlockBuilder)返回 true,则更新当前键为最后一个键,并返回。如果插入失败,分裂当前的 block,并重新插入,更新第一个和最后一个键为当前键(因为当前键是新的 block 目前的唯一一个元素)。分裂的过程就是将当前的 block 编码,将编码完成的数据写入到 block 区,将偏移量、第一个/最后一个键写入到 meta 区。然后新建一个 block builder 替换掉旧的 block builder。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
fn split_block(&mut self) {
let builder = std::mem::replace(&mut self.builder, BlockBuilder::new(self.block_size));
let encoded_builder = builder.build().encode();

self.meta.push(BlockMeta {
offset: self.data.len(),
first_key: KeyBytes::from_bytes(self.first_key.clone().into()),
last_key: KeyBytes::from_bytes(self.last_key.clone().into()),
});
self.data.extend(encoded_builder);
}

pub fn add(&mut self, key: KeySlice, value: &[u8]) {
if self.first_key.is_empty() {
self.first_key = Bytes::copy_from_slice(key.raw_ref()).into();
}

if self.builder.add(key, value) {
self.last_key = Bytes::copy_from_slice(key.raw_ref()).into();
return;
}

self.split_block();

assert!(self.builder.add(key, value));
self.first_key = Bytes::copy_from_slice(key.raw_ref()).into();
self.last_key = Bytes::copy_from_slice(key.raw_ref()).into();
}

build 函数就是将当前的 SsTableBuilder 编码写入到磁盘中。首先通过分裂 block,将当前 block builder 的数据写入到 SST builder 中。buf = encode(data) + encode(meta) + meta_offset,将编码后到 meta 数据加入到 buf 中,最后增加一个 u32 的偏移量。通过 FileObject::create 方法写入文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
pub fn build(
mut self,
id: usize,
block_cache: Option<Arc<BlockCache>>,
path: impl AsRef<Path>,
) -> Result<SsTable> {
self.split_block();

let mut buf = self.data;
let meta_offset = buf.len();
BlockMeta::encode_block_meta(&self.meta, &mut buf);
buf.put_u32(meta_offset as u32);

let file = FileObject::create(path.as_ref(), buf)?;
Ok(SsTable {
file,
block_meta_offset: meta_offset,
id,
block_cache,
first_key: self.meta.first().unwrap().first_key.clone(),
last_key: self.meta.last().unwrap().last_key.clone(),
block_meta: self.meta,
bloom: None,
max_ts: 0,
})
}

这其中就涉及到 meta 区的编码和解码。二者是相反的操作。encode 依次将每个 block 的偏移量、第一个键的长度和第一个键、最后一个键的长度和最后一个键写入到 buf 中。decode 则是从 buf 中依次读取每个 block 的偏移量、第一个键的长度和第一个键、最后一个键的长度和最后一个键,组成 BlockMeta 结构体。第 5~19 行是提前预分配空间,避免多次扩容(详见问题 4)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
pub fn encode_block_meta(
block_meta: &[BlockMeta],
buf: &mut Vec<u8>,
) {
let mut estimate_size = std::mem::size_of::<u32>();

for meta in block_meta {
// sizeof offset
estimate_size += std::mem::size_of::<u32>();
// sizeof keylen
estimate_size += std::mem::size_of::<u16>();
// sizeof first key
estimate_size += meta.first_key.len();
// sizeof keylen
estimate_size += std::mem::size_of::<u16>();
// sizeof last key
estimate_size += meta.last_key.len();
}
buf.reserve(estimate_size);

buf.put_u32(block_meta.len() as u32);
for meta in block_meta {
buf.put_u32(meta.offset as u32);
buf.put_u16(meta.first_key.len() as u16);
buf.put_slice(meta.first_key.raw_ref());
buf.put_u16(meta.last_key.len() as u16);
buf.put_slice(meta.last_key.raw_ref());
}
}

/// Decode block meta from a buffer.
pub fn decode_block_meta(mut buf: impl Buf) -> Vec<BlockMeta> {
let mut block_metas = Vec::new();
let num = buf.get_u32() as usize;
for _ in 0..num {
let offset = buf.get_u32() as usize;
let first_key_len = buf.get_u16() as usize;
let first_key = KeyBytes::from_bytes(buf.copy_to_bytes(first_key_len));
let last_key_len = buf.get_u16() as usize;
let last_key = KeyBytes::from_bytes(buf.copy_to_bytes(last_key_len));
block_metas.push(BlockMeta {
offset,
first_key,
last_key,
});
}
block_metas
}

最后是从文件中读取 SST 的方法。首先读取最后 4Bytes,也就是 block 区的偏移量。然后通过偏移量确定 block 区和 meta 区的位置。将 meta 区的数据解码,和 block 区数据构成 SsTable 结构体。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
pub fn open(id: usize, block_cache: Option<Arc<BlockCache>>, file: FileObject) -> Result<Self> {
let len = file.size();

let raw_meta_offset = file.read(len - 4, 4)?;
let block_meta_offset = (&raw_meta_offset[..]).get_u32() as u64;
let raw_meta = file.read(block_meta_offset, len - block_meta_offset - 4)?;
let block_meta = BlockMeta::decode_block_meta(&raw_meta[..]);

Ok(Self {
file,
block_meta_offset: block_meta_offset as usize,
id,
block_cache,
first_key: block_meta.first().unwrap().first_key.clone(),
last_key: block_meta.last().unwrap().last_key.clone(),
block_meta,
bloom: None,
max_ts: 0,
})
}

Task 2: SST Iterator

实现 SST 的迭代器。Block 是懒加载的,因此只需要使用当前 block 的迭代器,直到进行到下一个 block。首先是创建指向第一个 block 的迭代器,与将迭代器指向第一个 block 的方法。索引数为 0,从第 0 个 block 创建其对应的 block 迭代器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/// Create a new iterator and seek to the first key-value pair in the first data block.
pub fn create_and_seek_to_first(table: Arc<SsTable>) -> Result<Self> {
let blk_idx = 0;
let blk_iter = BlockIterator::create_and_seek_to_first(table.read_block(0)?);

Ok(Self {
table,
blk_iter,
blk_idx,
})
}

/// Seek to the first key-value pair in the first data block.
pub fn seek_to_first(&mut self) -> Result<()> {
self.blk_idx = 0;
self.blk_iter = BlockIterator::create_and_seek_to_first(self.table.read_block(0)?);

Ok(())
}

然后是指向特定键的迭代器。需要考虑这样一个问题:如果当前查找的键在表中不存在,这时候迭代器应该指向哪里?应该指向大于该键的第一个键。首先根据 key 找到对应的 block,如果当前 block 里的元素都比这个键小,就将 block 索引加一,指向下一个 block 的第一个键。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
fn seek_to_key_inner(table: Arc<SsTable>, key: KeySlice) -> Result<(usize, BlockIterator)> {
let mut blk_idx = table.find_block_idx(key);
let mut blk_iter =
BlockIterator::create_and_seek_to_key(table.read_block(blk_idx)?, key);

if !blk_iter.is_valid() {
blk_idx += 1;
if blk_idx < table.num_of_blocks() {
blk_iter =
BlockIterator::create_and_seek_to_first(table.read_block(blk_idx)?);
}
}
Ok((blk_idx, blk_iter))
}

pub fn create_and_seek_to_key(table: Arc<SsTable>, key: KeySlice) -> Result<Self> {
let (blk_idx, blk_iter) = SsTableIterator::seek_to_key_inner(table.clone(), key)?;

Ok(Self {
table,
blk_iter,
blk_idx,
})
}

pub fn seek_to_key(&mut self, key: KeySlice) -> Result<()> {
let (blk_idx, blk_iter) = SsTableIterator::seek_to_key_inner(self.table.clone(), key)?;
self.blk_idx = blk_idx;
self.blk_iter = blk_iter;
Ok(())
}

我们还需要根据 block 索引/键寻找对应 block 的方法。根据键查找 block 索引采用的是二分搜索法(因为 block 数据是有序的)。根据索引取 block,首先根据 meta 区数据获取 block 的偏移量(起始地址),根据下一个 block 的偏移量获取当前 block 的大小,根据 block 的起始地址以及长度从文件中读取 block 的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
pub fn read_block(&self, block_idx: usize) -> Result<Arc<Block>> {
let offset = self.block_meta[block_idx].offset;
let offset_end = self
.block_meta
.get(block_idx + 1)
.map_or(self.block_meta_offset, |x| x.offset);
let block_len = offset_end - offset;
let block_data = self.file.read(offset as u64, block_len as u64)?;
Ok(Arc::new(Block::decode(&block_data[..block_len])))
}

pub fn find_block_idx(&self, key: KeySlice) -> usize {
self.block_meta
.partition_point(|meta| meta.first_key.as_key_slice() <= key)
.saturating_sub(1)
}
至此,day4 的所有检查点就可以通过了。

Task 3: Block Cache

从 cache 中根据 (sst_id, block_idx) 读取 block。如果 cache 中存在,直接返回;如果不存在,从文件中读取,并放入 cache。然后可以将之前所有的 read_block 替换为 read_block_cached

1
2
3
4
5
6
7
8
9
10
/// Read a block from disk, with block cache.
pub fn read_block_cached(&self, block_idx: usize) -> Result<Arc<Block>> {
if let Some(ref cache) = self.block_cache {
let blk = cache
.try_get_with((self.id, block_idx), || self.read_block(block_idx))
.map_err(|e| anyhow!("{}", e))?;
return Ok(blk);
}
self.read_block(block_idx)
}

Result

result

Test Your Understanding

  1. What is the time complexity of seeking a key in the SST?
    二分查找,\(O(\log N)\)

  2. Where does the cursor stop when you seek a non-existent key in your implementation?
    停留在大于该键的第一个键。

  3. Is it possible (or necessary) to do in-place updates of SST files?
    不需要且不能原地更新。LSM 将随机写转换为顺序写,所有写入都通过追加新记录完成,避免磁盘的随机 I/O。如果原地写可能会导致顺序被破坏,查找等操作效率大大降低。

  4. An SST is usually large (i.e., 256MB). In this case, the cost of copying/expanding the Vec would be significant. Does your implementation allocate enough space for your SST builder in advance? How did you implement it?
    提前分配了空间。通过计算偏移量、每个块的第一个/最后一个键的长度和偏移量预估大小,提前分配空间。

  5. Looking at the moka block cache, why does it return Arc<Error> instead of the original Error?
    主要是为了保证线程安全,异步友好。

  6. Does the usage of a block cache guarantee that there will be at most a fixed number of blocks in memory? For example, if you have a moka block cache of 4GB and block size of 4KB, will there be more than 4GB/4KB number of blocks in memory at the same time? Is it possible to store columnar data (i.e., a table of 100 integer columns) in an LSM engine? Is the current SST format still a good choice?
    这里的实现没有保证,只是确保了 block 的大小不会超过限制,但是没有限制 block 的数量。如果 block 数量过多,可能会占用过多内存。

  7. Consider the case that the LSM engine is built on object store services (i.e., S3). How would you optimize/change the SST format/parameters and the block cache to make it suitable for such services?
    可以考虑以下方法:增大块的大小、将 SST 的元数据前置一次性读完、调整 cache 的策略等。

  8. For now, we load the index of all SSTs into the memory. Assume you have a 16GB memory reserved for the indexes, can you estimate the maximum size of the database your LSM system can support? (That’s why you need an index cache!) 一个索引的长度为 64bits,因此可以存储 16GB/8B = 2G 个索引,也就是 2G 个块。而一个块最多包含 4KB 数据,因此总共可以存储 2G × 4KB = 8192GB 的数据。