diff --git a/Cargo.toml b/Cargo.toml index 4f6f4846..cb1ccd22 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,8 +42,11 @@ hex = "0.4" if_chain = "1.0" lazy_static = "1.3" libc = "0.2" -log = { version = "0.4", features = ["max_level_trace", "release_max_level_debug"] } -lz4-sys = "1.9" +log = { version = "0.4", features = [ + "max_level_trace", + "release_max_level_debug", +] } +lz4-sys = { version = "=1.9.5" } memmap2 = { version = "0.9", optional = true } nix = "0.26" num-derive = "0.4" @@ -55,7 +58,7 @@ protobuf = "2" rayon = "1.5" rhai = { version = "1.7", features = ["sync"], optional = true } scopeguard = "1.1" -serde = { version = "1.0", features = ["derive"] } +serde = { version = "=1.0.194", features = ["derive"] } serde_repr = "0.1" strum = { version = "0.25.0", features = ["derive"] } thiserror = "1.0" @@ -64,8 +67,12 @@ thiserror = "1.0" criterion = "0.4" ctor = "0.2" env_logger = "0.10" -kvproto = { git = "https://github.com/pingcap/kvproto.git", default-features = false, features = ["protobuf-codec"] } -raft = { git = "https://github.com/tikv/raft-rs", branch = "master", default-features = false, features = ["protobuf-codec"] } +kvproto = { git = "https://github.com/pingcap/kvproto.git", default-features = false, features = [ + "protobuf-codec", +] } +raft = { git = "https://github.com/tikv/raft-rs", branch = "master", default-features = false, features = [ + "protobuf-codec", +] } rand = "0.8" rand_distr = "0.4" tempfile = "3.6" @@ -74,19 +81,10 @@ toml = "0.8" [features] default = ["internals", "scripting"] internals = [] -nightly = [ - "prometheus/nightly", -] -failpoints = [ - "fail/failpoints", -] -scripting = [ - "rhai", -] -swap = [ - "nightly", - "memmap2", -] +nightly = ["prometheus/nightly"] +failpoints = ["fail/failpoints"] +scripting = ["rhai"] +swap = ["nightly", "memmap2"] std_fs = [] nightly_group = ["nightly", "swap"] @@ -95,6 +93,8 @@ nightly_group = ["nightly", "swap"] raft-proto = { git = "https://github.com/tikv/raft-rs", branch = "master" } protobuf = { git = "https://github.com/pingcap/rust-protobuf", branch = "v2.8" } protobuf-codegen = { git = "https://github.com/pingcap/rust-protobuf", branch = "v2.8" } +# Lock the version of cc-rs to avoid build failure on MacOS, ref https://github.com/rust-lang/cc-rs/issues/984. +cc = { git = "https://github.com/rust-lang/cc-rs", tag = "1.0.98" } [workspace] members = ["stress", "ctl"] diff --git a/src/engine.rs b/src/engine.rs index 1a09b397..2c5e0c74 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -333,9 +333,12 @@ where let _t = StopWatch::new(&*ENGINE_READ_ENTRY_DURATION_HISTOGRAM); if let Some(memtable) = self.memtables.get(region_id) { let mut ents_idx: Vec = Vec::with_capacity((end - begin) as usize); - memtable - .read() - .fetch_entries_to(begin, end, max_size, &mut ents_idx)?; + // Ensure that the corresponding memtable is locked with a read lock before + // completing the fetching of entries from the raft logs. This + // prevents the scenario where the index could become stale while + // being concurrently updated by the `rewrite` operation. + let immutable = memtable.read(); + immutable.fetch_entries_to(begin, end, max_size, &mut ents_idx)?; for i in ents_idx.iter() { vec.push(read_entry_from_file::(self.pipe_log.as_ref(), i)?); } @@ -634,9 +637,11 @@ pub(crate) mod tests { use crate::util::ReadableSize; use kvproto::raft_serverpb::RaftLocalState; use raft::eraftpb::Entry; + use rand::{thread_rng, Rng}; use std::collections::{BTreeSet, HashSet}; use std::fs::OpenOptions; use std::path::PathBuf; + use std::sync::atomic::{AtomicBool, Ordering}; pub(crate) type RaftLogEngine = Engine; impl RaftLogEngine { @@ -1928,8 +1933,6 @@ pub(crate) mod tests { #[cfg(feature = "nightly")] #[bench] fn bench_engine_fetch_entries(b: &mut test::Bencher) { - use rand::{thread_rng, Rng}; - let dir = tempfile::Builder::new() .prefix("bench_engine_fetch_entries") .tempdir() @@ -2586,6 +2589,53 @@ pub(crate) mod tests { assert!(data.is_empty(), "data loss {:?}", data); } + #[test] + fn test_fetch_with_concurrently_rewrite() { + let dir = tempfile::Builder::new() + .prefix("test_fetch_with_concurrently_rewrite") + .tempdir() + .unwrap(); + let cfg = Config { + dir: dir.path().to_str().unwrap().to_owned(), + target_file_size: ReadableSize(2048), + ..Default::default() + }; + let fs = Arc::new(DeleteMonitoredFileSystem::new()); + let engine = Arc::new(RaftLogEngine::open_with_file_system(cfg, fs.clone()).unwrap()); + let entry_data = vec![b'x'; 128]; + // Set up a concurrent write with purge, and fetch. + let mut vec: Vec = Vec::new(); + let fetch_engine = engine.clone(); + let flag = Arc::new(AtomicBool::new(false)); + let start_flag = flag.clone(); + let th = std::thread::spawn(move || { + while !start_flag.load(Ordering::Acquire) { + std::thread::sleep(Duration::from_millis(10)); + } + for _ in 0..10 { + let region_id = thread_rng().gen_range(1..=10); + // Should not return file seqno out of range error. + let _ = fetch_engine + .fetch_entries_to::(region_id, 1, 101, None, &mut vec) + .map_err(|e| { + assert!(!format!("{e}").contains("file seqno out of")); + }); + vec.clear(); + } + }); + for i in 0..10 { + for rid in 1..=10 { + engine.append(rid, 1 + i * 10, 1 + i * 10 + 10, Some(&entry_data)); + } + flag.store(true, Ordering::Release); + for rid in 1..=10 { + engine.clean(rid); + } + engine.purge_expired_files().unwrap(); + } + th.join().unwrap(); + } + #[test] fn test_internal_key_filter() { let dir = tempfile::Builder::new() diff --git a/src/env/log_fd/unix.rs b/src/env/log_fd/unix.rs index 608cca70..e9b75542 100644 --- a/src/env/log_fd/unix.rs +++ b/src/env/log_fd/unix.rs @@ -83,7 +83,7 @@ impl LogFd { while readed < buf.len() { let bytes = match pread(self.0, &mut buf[readed..], offset as i64) { Ok(bytes) => bytes, - Err(e) if e == Errno::EINTR => continue, + Err(Errno::EINTR) => continue, Err(e) => return Err(from_nix_error(e, "pread")), }; // EOF @@ -106,7 +106,7 @@ impl LogFd { while written < content.len() { let bytes = match pwrite(self.0, &content[written..], offset as i64) { Ok(bytes) => bytes, - Err(e) if e == Errno::EINTR => continue, + Err(Errno::EINTR) => continue, Err(e) if e == Errno::ENOSPC => return Err(from_nix_error(e, "nospace")), Err(e) => return Err(from_nix_error(e, "pwrite")), }; diff --git a/src/file_pipe_log/pipe.rs b/src/file_pipe_log/pipe.rs index 43b3483a..27ea8267 100644 --- a/src/file_pipe_log/pipe.rs +++ b/src/file_pipe_log/pipe.rs @@ -713,7 +713,7 @@ mod tests { // Retire files. assert_eq!(pipe_log.purge_to(last).unwrap() as u64, last - first); // Try to read recycled file. - for (_, handle) in handles.into_iter().enumerate() { + for handle in handles.into_iter() { assert!(pipe_log.read_bytes(handle).is_err()); } // Try to reuse.