Skip to content

Commit

Permalink
revisionstore: auto-sync indexedlog stores when they change on disk
Browse files Browse the repository at this point in the history
Summary:
Use the new super-lightweight is_changed_on_disk() indexedlog check to aggressively refresh indexedlogs to pick up new data if another process has updated the logs.

Various things in revisionstore use indexedlog to store/cache data (for example, tree or file data fetched from EdenApi). The logs get flushed typically at the end of sapling commands. This works okay since most sapling commands are short lived, so there isn't much risk of duplicated work due to stale reads. However, eden is a long lived daemon that uses the same stores. This means if you run "sl prefetch ..." for example, eden previously would not benefit since its stores would not refresh automatically from disk to see the new entries.

Reviewed By: zzl0

Differential Revision: D55160209

fbshipit-source-id: 4301bc60ffbe26ed4b5e0a3319d4f650d5f56385
  • Loading branch information
muirdm authored and facebook-github-bot committed Mar 21, 2024
1 parent ab1dcc2 commit 3d85abb
Showing 1 changed file with 78 additions and 3 deletions.
81 changes: 78 additions & 3 deletions eden/scm/lib/revisionstore/src/indexedlogutil.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
use std::io::ErrorKind;
use std::path::Path;
use std::path::PathBuf;
use std::sync::atomic;
use std::sync::atomic::AtomicU64;

use anyhow::Result;
use indexedlog::log;
Expand All @@ -23,6 +25,7 @@ use indexedlog::Result as IndexedlogResult;
use minibytes::Bytes;
use parking_lot::RwLock;
use parking_lot::RwLockReadGuard;
use parking_lot::RwLockUpgradableReadGuard;
use parking_lot::RwLockWriteGuard;
use tracing::debug;

Expand All @@ -31,6 +34,7 @@ use tracing::debug;
/// with the subtle differences.
pub struct Store {
inner: RwLock<Inner>,
auto_sync_count: AtomicU64,
}

pub enum Inner {
Expand All @@ -46,7 +50,7 @@ pub enum StoreType {

impl Store {
pub fn read(&self) -> RwLockReadGuard<'_, Inner> {
self.inner.read()
self.sync_if_changed_on_disk()
}

pub fn write(&self) -> RwLockWriteGuard<'_, Inner> {
Expand All @@ -70,6 +74,29 @@ impl Store {
pub fn flush(&self) -> Result<()> {
self.write().flush()
}

fn sync_if_changed_on_disk(&self) -> RwLockReadGuard<'_, Inner> {
let log = self.inner.read();

if log.is_changed_on_disk() {
drop(log);

let mut log = self.inner.upgradable_read();
if log.is_changed_on_disk() {
tracing::debug!("auto-syncing indexedlog because it changed on disk");
self.auto_sync_count.fetch_add(1, atomic::Ordering::Relaxed);
log.with_upgraded(|log| {
if let Err(err) = log.flush() {
tracing::warn!(?err, "error auto-syncing indexedlog store");
}
})
}

RwLockUpgradableReadGuard::downgrade(log)
} else {
log
}
}
}

impl Inner {
Expand Down Expand Up @@ -118,10 +145,10 @@ impl Inner {
pub fn flush(&mut self) -> Result<()> {
match self {
Self::Local(log) => {
log.flush()?;
log.sync()?;
}
Self::Shared(log) => {
if let Err(err) = log.flush() {
if let Err(err) = log.sync() {
if !err.is_corruption() && err.io_error_kind() == ErrorKind::NotFound {
// File-not-found errors can happen when the hg cache
// was blown away during command execution. Ignore the
Expand All @@ -136,6 +163,13 @@ impl Inner {
};
Ok(())
}

fn is_changed_on_disk(&self) -> bool {
match self {
Self::Local(log) => log.is_changed_on_disk(),
Self::Shared(log) => log.is_changed_on_disk(),
}
}
}

/// Iterator returned from `Self::lookup`.
Expand Down Expand Up @@ -221,6 +255,7 @@ impl StoreOpenOptions {
self.into_local_open_options()
.open_with_repair(path.as_ref())?,
)),
auto_sync_count: AtomicU64::new(0),
})
}

Expand Down Expand Up @@ -259,6 +294,7 @@ impl StoreOpenOptions {
}
Ok(Store {
inner: RwLock::new(Inner::Shared(rotate_log)),
auto_sync_count: AtomicU64::new(0),
})
}

Expand Down Expand Up @@ -365,4 +401,43 @@ mod tests {
assert_eq!(store.read().lookup(0, b"aa")?.count(), 0);
Ok(())
}

#[test]
fn test_transparent_sync() -> Result<()> {
let dir = TempDir::new()?;

let store1 = StoreOpenOptions::new()
.index("hex", |_| vec![IndexOutput::Reference(0..2)])
.local(&dir)?;

let store2 = StoreOpenOptions::new()
.index("hex", |_| vec![IndexOutput::Reference(0..2)])
.local(&dir)?;

store1.append(b"aabcd")?;
store1.flush()?;

// store2 sees the write immediately
assert_eq!(
store2
.read()
.lookup(0, b"aa")?
.collect::<Result<Vec<_>>>()?,
vec![b"aabcd"]
);

store2.append(b"abcd")?;
assert_eq!(
store2
.read()
.lookup(0, b"ab")?
.collect::<Result<Vec<_>>>()?,
vec![b"abcd"]
);

// Make sure we only synced once:
assert_eq!(store2.auto_sync_count.load(atomic::Ordering::Relaxed), 1);

Ok(())
}
}

0 comments on commit 3d85abb

Please sign in to comment.