0%

Day5: Read Path

Week 1 Day 5 的任务:

  1. 将 SST 集成到 LSM 的读路径中
  2. 实现带 SST 的 LSM 读路径的 getscan 方法

week1 day5

Tasks

Task 1: Two Merge Iterator

之前的章节中实现了将多个同种类型的迭代器合并为一个的方法。现在我们实现了在磁盘上的 SST 的结构,也有在内存中的 memtable 的结构。在扫描整个存储引擎时,需要将两者合并。因此,我们需要实现一个能将不同类型的迭代器合并为一个的通用方法。因为这里只需要合并两个迭代器,因此不需要保持「堆」的结构,用一个标记 choose_a 来指定当前是选择 a 还是 btrue 代表选择 a,否则选择 b。如果 a 或者 b 有一个是无效的,则选择另一个。都有效,就选择 key 更小的那个(优先级更高)。

1
2
3
4
5
6
7
8
9
10
11
fn choose_a(a: &A, b: &B) -> bool {
if !a.is_valid() {
return false;
}

if !b.is_valid() {
return true;
}

a.key() < b.key()
}

如果 ab 中存在相同的键,在二者都有效的情况下,跳过 b 的元素。为什么这么选择跳过 b 呢?在后面的实现中可以看到,type LsmIteratorInner =TwoMergeIterator<MergeIterator<MemTableIterator>, MergeIterator<SsTableIterator>>;MemTableIterator 对应的是内存中的 memtable,SsTableIterator 对应的是磁盘上的 SST。在 LSM 中,内存中的数据是优先级更高的,因此当二者有相同 key 时,选择 memtable 的数据。

1
2
3
4
5
6
fn skip_b(&mut self) -> Result<()> {
if self.a.is_valid() && self.b.is_valid() && self.a.key() == self.b.key() {
self.b.next()?;
}
Ok(())
}

这样,我们就可以创建一个能合并两个迭代器的迭代器了。

1
2
3
4
5
6
7
8
9
10
pub fn create(a: A, b: B) -> Result<Self> {
let mut iter = Self {
a,
b,
choose_a: false,
};
iter.skip_b()?;
iter.choose_a = Self::choose_a(&iter.a, &iter.b);
Ok(iter)
}

我们还需要实现基本的获取键/值、是否有效、移动到下一个元素的方法。键和值的获取、迭代器是否有效根据当前选择的是哪一个迭代器来决定。而移动到下一个元素时,首先需要将当前选择的迭代器移动到下一个元素,然后判断是否有相同的键需要跳过,最后更新选择哪一个迭代器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
fn key(&self) -> Self::KeyType<'_> {
if self.choose_a {
return self.a.key();
}
self.b.key()
}

fn value(&self) -> &[u8] {
if self.choose_a {
return self.a.value();
}
self.b.value()
}

fn is_valid(&self) -> bool {
if self.choose_a {
return self.a.is_valid();
}
self.b.is_valid()
}

fn next(&mut self) -> Result<()> {
if self.choose_a {
self.a.next()?;
} else {
self.b.next()?;
}
self.skip_b()?;
self.choose_a = Self::choose_a(&self.a, &self.b);
Ok(())
}

Task 2: Read Path - Scan

获取了合并后的迭代器后,原有的 scan 方法也需要做出响应的改变。目前,我们的 SST 迭代器还不支持有边界的扫描方式。为了解决这个问题,我们需要在 LsmIterator 的内部实现边界的检查,因此 LsmIterator::new 创建方法就需要新传入一个边界参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
pub struct LsmIterator {
inner: LsmIteratorInner,
end_bound: Bound<Bytes>, // new
is_valid: bool,
}

pub(crate) fn new(iter: LsmIteratorInner, end_bound: Bound<Bytes>) -> Result<Self> {
let mut iter = Self {
is_valid: iter.is_valid(),
inner: iter,
end_bound, // new
};

iter.move_to_non_delete()?;

Ok(iter)
}

在扫描过程中,首先去扫描存储在内存中的可变和不可变的 memtabl(之前实现的内容),然后去扫描 SST。目前的 SST 都存在 l0_sstables 里。在扫描 SST 时,需要判断当前的数据段与 SST 的数据是否有交叉,只有有交叉的情况下才需要将 SST 的数据段加入到迭代器中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
fn range_bound(
begin: Bound<&[u8]>,
end: Bound<&[u8]>,
table_begin: KeySlice,
table_end: KeySlice,
) -> bool {
match end {
Bound::Excluded(key) if key <= table_begin.raw_ref() => {
return false;
}
Bound::Included(key) if key < table_begin.raw_ref() => {
return false;
}
_ => {}
}
match begin {
Bound::Included(key) if key > table_end.raw_ref() => {
return false;
}
Bound::Excluded(key) if key >= table_end.raw_ref() => {
return false;
}
_ => {}
}

true
}
因此在扫描时,首先根据快照 l0_sstables 的 id,从 sstables 获取对应的 SST 的克隆(直接从磁盘中读 I/O 速率比较慢)。然后判断当前 SST 与数据段是否有交叉。如果有交叉,获取对应位置的迭代器,并加入到 sstable 的迭代器中。最后将 mem_table 的迭代器和 sstable 的合并。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
...
let mut sstable_iter = Vec::with_capacity(snapshot.l0_sstables.len());
for table_id in snapshot.l0_sstables.iter() {
let table = snapshot.sstables[table_id].clone();
if Self::range_bound(
_lower,
_upper,
table.first_key().as_key_slice(),
table.last_key().as_key_slice(),
) {
let iter = match _lower {
Bound::Included(key) => {
SsTableIterator::create_and_seek_to_key(table, KeySlice::from_slice(key))?
}
Bound::Excluded(key) => {
let mut iter = SsTableIterator::create_and_seek_to_key(
table,
KeySlice::from_slice(key),
)?;
if iter.is_valid() && iter.key().raw_ref() == key {
iter.next()?;
}
iter
}
Bound::Unbounded => SsTableIterator::create_and_seek_to_first(table)?,
};
sstable_iter.push(Box::new(iter));
}
}
let l0_table_iter = MergeIterator::create(sstable_iter);
let iter = TwoMergeIterator::create(memtable_iter, l0_table_iter)?;

Ok(FusedIterator::new(LsmIterator::new(
iter,
map_bound(_upper),
)?))

Task 3: Read Path - Get

最后是 get 方法的实现。get 方法首先从 memtable 开始查找,如果找不到,则从 SST 中查找。与 scan 类似,可以获取 SST 合并的迭代器,每个迭代器指向需要查找的 key。再判断迭代器是否有效、当前 key 是否等于查找的 key(因为如果找不到 key 的话会指向大于 key 的第一个位置)、对应的 value 是否有效。

1
2
3
4
5
6
7
8
9
10
11
12
let mut l0_iters = Vec::with_capacity(state.l0_sstables.len());
for table_id in state.l0_sstables.iter() {
let table = state.sstables[table_id].clone();
l0_iters.push(Box::new(SsTableIterator::create_and_seek_to_key(
table,
KeySlice::from_slice(_key),
)?));
}
let l0_iter = MergeIterator::create(l0_iters);
if l0_iter.is_valid() && l0_iter.key().raw_ref() == _key && !l0_iter.value().is_empty() {
return Ok(Some(Bytes::copy_from_slice(l0_iter.value())));
}

Result

result

Test Your Understanding

  1. Consider the case that a user has an iterator that iterates the whole storage engine, and the storage engine is 1TB large, so that it takes ~1 hour to scan all the data. What would be the problems if the user does so? (This is a good question and we will ask it several times at different points of the course…)
    除去显而易见的用户等待时间长、资源消耗大的问题,还有可能因为扫描过程中数据被修改,导致扫描结果不准确等问题。

  2. Another popular interface provided by some LSM-tree storage engines is multi-get (or vectored get). The user can pass a list of keys that they want to retrieve. The interface returns the value of each of the key. For example, multi_get(vec![“a”, “b”, “c”, “d”]) -> a=1,b=2,c=3,d=4. Obviously, an easy implementation is to simply doing a single get for each of the key. How will you implement the multi-get interface, and what optimizations you can do to make it more efficient? (Hint: some operations during the get process will only need to be done once for all keys, and besides that, you can think of an improved disk I/O interface to better support this multi-get interface).
    首先对需要查找的键进行排序预处理,然后顺序查找,将多个键的读取合并为单次的顺序扫描。