From 63758abcca093a6f09f5ed21e39d3eba34e877ea Mon Sep 17 00:00:00 2001 From: Michael Constant Date: Fri, 1 Nov 2024 02:41:33 -0700 Subject: [PATCH] Fix several crashes in repair from corrupted pages During repair, it's normal to encounter a partially-written tree, which can contain arbitrary invalid data. We can't crash when that happens; we need to gracefully fail so the repair can try the other commit slot instead. This first batch of fixes only covers the crashes most likely to occur in practice: the ones that can happen on a 64-bit machine, without malicious input. --- src/tree_store/btree_base.rs | 54 +++++++++++++++++++++++++++--------- 1 file changed, 41 insertions(+), 13 deletions(-) diff --git a/src/tree_store/btree_base.rs b/src/tree_store/btree_base.rs index 0bd73d94..17fdcbb2 100644 --- a/src/tree_store/btree_base.rs +++ b/src/tree_store/btree_base.rs @@ -22,7 +22,19 @@ pub(super) fn leaf_checksum( fixed_value_size: Option, ) -> Result { let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size); - let end = accessor.value_end(accessor.num_pairs() - 1).unwrap(); + let last_pair = accessor.num_pairs().checked_sub(1).ok_or_else(|| { + StorageError::Corrupted(format!( + "Leaf page {:?} corrupted. Number of pairs is zero", + page.get_page_number() + )) + })?; + let end = accessor.value_end(last_pair).ok_or_else(|| { + StorageError::Corrupted(format!( + "Leaf page {:?} corrupted. Couldn't find offset for pair {}", + page.get_page_number(), + last_pair, + )) + })?; if end > page.memory().len() { Err(StorageError::Corrupted(format!( "Leaf page {:?} corrupted. Last offset {} beyond end of data {}", @@ -40,7 +52,19 @@ pub(super) fn branch_checksum( fixed_key_size: Option, ) -> Result { let accessor = BranchAccessor::new(page, fixed_key_size); - let end = accessor.key_end(accessor.num_keys() - 1); + let last_key = accessor.num_keys().checked_sub(1).ok_or_else(|| { + StorageError::Corrupted(format!( + "Branch page {:?} corrupted. Number of keys is zero", + page.get_page_number() + )) + })?; + let end = accessor.key_end(last_key).ok_or_else(|| { + StorageError::Corrupted(format!( + "Branch page {:?} corrupted. Can't find offset for key {}", + page.get_page_number(), + last_key + )) + })?; if end > page.memory().len() { Err(StorageError::Corrupted(format!( "Branch page {:?} corrupted. Last offset {} beyond end of data {}", @@ -370,7 +394,8 @@ impl<'a> LeafAccessor<'a> { } let offset = 4 + size_of::() * n; let end = u32::from_le_bytes( - self.page[offset..(offset + size_of::())] + self.page + .get(offset..(offset + size_of::()))? .try_into() .unwrap(), ) as usize; @@ -391,14 +416,15 @@ impl<'a> LeafAccessor<'a> { None } else { if let Some(fixed) = self.fixed_value_size { - return Some(self.key_end(self.num_pairs - 1).unwrap() + fixed * (n + 1)); + return Some(self.key_end(self.num_pairs.checked_sub(1)?)? + fixed * (n + 1)); } let mut offset = 4 + size_of::() * n; if self.fixed_key_size.is_none() { offset += size_of::() * self.num_pairs; } let end = u32::from_le_bytes( - self.page[offset..(offset + size_of::())] + self.page + .get(offset..(offset + size_of::()))? .try_into() .unwrap(), ) as usize; @@ -1141,7 +1167,7 @@ impl<'a: 'b, 'b, T: Page + 'a> BranchAccessor<'a, 'b, T> { pub(crate) fn total_length(&self) -> usize { // Keys are stored at the end - self.key_end(self.num_keys() - 1) + self.key_end(self.num_keys() - 1).unwrap() } pub(super) fn child_for_key(&self, query: &[u8]) -> (usize, PageNumber) { @@ -1179,22 +1205,24 @@ impl<'a: 'b, 'b, T: Page + 'a> BranchAccessor<'a, 'b, T> { if n == 0 { self.key_section_start() } else { - self.key_end(n - 1) + self.key_end(n - 1).unwrap() } } - fn key_end(&self, n: usize) -> usize { + fn key_end(&self, n: usize) -> Option { if let Some(fixed) = self.fixed_key_size { - return self.key_section_start() + fixed * (n + 1); + return Some(self.key_section_start() + fixed * (n + 1)); } let offset = 8 + (PageNumber::serialized_size() + size_of::()) * self.count_children() + size_of::() * n; - u32::from_le_bytes( - self.page.memory()[offset..(offset + size_of::())] + Some(u32::from_le_bytes( + self.page + .memory() + .get(offset..(offset + size_of::()))? .try_into() .unwrap(), - ) as usize + ) as usize) } pub(super) fn key(&self, n: usize) -> Option<&[u8]> { @@ -1202,7 +1230,7 @@ impl<'a: 'b, 'b, T: Page + 'a> BranchAccessor<'a, 'b, T> { return None; } let offset = self.key_offset(n); - let end = self.key_end(n); + let end = self.key_end(n)?; Some(&self.page.memory()[offset..end]) }