Skip to content

Commit

Permalink
scmstore: use "metrics" crate to manage fetch metrics
Browse files Browse the repository at this point in the history
Summary:
Completely rework how we handle counters during the "fetch" codepaths of the scmstore FileStore and TreeStore. These are the goals:
1. Avoid having to allocate Strings related to metrics more than once. Currently we allocate a bunch of metrics strings (multiple times) whenever we flush metrics to Sapling's metrics store or ODS.
2. Consolidate ODS counter propagation into a reusable crate: the "metrics" crate. Now there is no special code needed to send counters to ODS (via EdenFS) - just make a Counter and increment it.
3. Transition to more of a "normal" counter approach where counters are essentially global atomic counters that get incremented immediately (and then collected from some outside process intermittently).

I achieved the goals, but I had to use some macros to bridge the gap between static global counters and the current heavily factored metrics collection of scmstore. Basically, I re-used the existing metrics structs, but made them `'static`, containing shared `'static` counters.

This also addresses the "gotcha" I introduced in D66731805 where scmstore counters in EdenFS aren't up-to-date until the scmstores are flushed (now they are immediately up to date).

Note that I did not update the "write" and "api" sets of metrics for FileStore. These are less important and are fine to leave as-is for now.

Reviewed By: quark-zju

Differential Revision: D66848951

fbshipit-source-id: 289b8d39eb74a4f06a7abaf0b74fa10d2cb75665
  • Loading branch information
muirdm authored and facebook-github-bot committed Dec 6, 2024
1 parent cba4249 commit 79c30a5
Show file tree
Hide file tree
Showing 13 changed files with 225 additions and 342 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ cmdutil = { version = "0.1.0", path = "../../cmdutil" }
sapling-async-runtime = { version = "0.1.0", path = "../../../async-runtime" }
sapling-clidispatch = { version = "0.1.0", path = "../../../clidispatch" }
sapling-manifest = { version = "0.1.0", path = "../../../manifest" }
sapling-metrics = { version = "0.1.0", path = "../../../metrics" }
sapling-repo = { version = "0.1.0", path = "../../../repo" }
sapling-revisionstore = { version = "0.1.0", path = "../../../revisionstore" }
sapling-types = { version = "0.1.0", path = "../../../types" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ pub fn run(ctx: ReqCtx<DebugScmStoreOpts>, repo: &Repo) -> Result<u8> {
// We downloaded trees above when handling args. Let's make a
// fresh repo to recreate the cache state before we were invoked.
let fresh_repo = Repo::load_with_config(repo.path(), ConfigSet::wrap(repo.config().clone()))?;
// And reset counters so tests don't see counters from above arg handling.
metrics::Registry::global().reset();

let fetch_mode = FetchMode::deserialize(StringDeserializer::<value::Error>::new(
ctx.opts
Expand Down
2 changes: 1 addition & 1 deletion eden/scm/lib/commands/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,7 @@ fn log_metrics(io: &IO, config: &dyn Config) -> Result<()> {
.counters()
.into_iter()
.filter_map(|(n, c)| {
if c.is_gauge() {
if c.is_gauge() || c.value() == 0 {
None
} else {
Some((n.to_string(), c.value() as u64))
Expand Down
5 changes: 2 additions & 3 deletions eden/scm/lib/revisionstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ async-trait = "0.1.71"
byteorder = "1.3"
crossbeam = "0.8"
curl = { version = "0.4.41", features = ["http2"] }
fbinit = { version = "0.2.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main", optional = true }
fn-error-context = "0.2"
fs-err = { version = "2.6.0", features = ["tokio"] }
futures = { version = "0.3.30", features = ["async-await", "compat"] }
Expand All @@ -27,6 +26,7 @@ itertools = "0.13.0"
lfs_protocol = { version = "0.1.0", path = "../../../mononoke/lfs_protocol" }
once_cell = "1.12"
parking_lot = { version = "0.12.1", features = ["send_guard"] }
paste = "1.0.14"
quickcheck = "1.0"
quickcheck_arbitrary_derive = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
rand = { version = "0.8", features = ["small_rng"] }
Expand All @@ -45,6 +45,7 @@ sapling-http-client = { version = "0.1.0", path = "../http-client" }
sapling-indexedlog = { version = "0.1.0", path = "../indexedlog" }
sapling-lz4-pyframe = { version = "0.1.0", path = "../lz4-pyframe" }
sapling-manifest-tree = { version = "0.1.0", path = "../manifest-tree" }
sapling-metrics = { version = "0.1.0", path = "../metrics" }
sapling-mincode = { version = "0.1.0", path = "../mincode" }
sapling-minibytes = { version = "0.1.0", path = "../minibytes", features = ["frombytes"] }
sapling-progress-model = { version = "0.1.0", path = "../progress/model" }
Expand All @@ -60,7 +61,6 @@ serde_derive = "1.0.185"
serde_json = { version = "1.0.132", features = ["float_roundtrip", "unbounded_depth"] }
sha1 = "0.10.5"
sha2 = "0.10.6"
stats = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main", optional = true }
thiserror = "2"
tokio = { version = "1.41.0", features = ["full", "test-util", "tracing"] }
tokio-stream = { version = "0.1.16", features = ["fs", "io-util", "net", "signal", "sync", "time"] }
Expand All @@ -77,4 +77,3 @@ tempfile = "3.8"
default = []
fb = []
for-tests = []
ods = ["fbinit", "stats"]
1 change: 0 additions & 1 deletion eden/scm/lib/revisionstore/src/scmstore/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,6 @@ impl<'a> TreeStoreBuilder<'a> {
tree_metadata_mode,
fetch_tree_aux_data,
flush_on_drop: true,
metrics: Default::default(),
format,
})
}
Expand Down
20 changes: 14 additions & 6 deletions eden/scm/lib/revisionstore/src/scmstore/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ mod metrics;
mod types;

use std::path::PathBuf;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Instant;

use ::metrics::Counter;
use ::types::fetch_mode::FetchMode;
use ::types::HgId;
use ::types::Key;
Expand All @@ -24,6 +26,8 @@ use cas_client::CasClient;
use clientinfo::get_client_request_info_thread_local;
use clientinfo::set_client_request_info_thread_local;
use crossbeam::channel::unbounded;
use indexedlog::log::AUTO_SYNC_COUNT;
use indexedlog::log::SYNC_COUNT;
use itertools::Itertools;
use minibytes::Bytes;
use parking_lot::Mutex;
Expand Down Expand Up @@ -133,6 +137,10 @@ macro_rules! try_local_content {
};
}

static FILESTORE_FLUSH_COUNT: Counter = Counter::new_counter("scmstore.file.flush");
static INDEXEDLOG_SYNC_COUNT: Counter = Counter::new_counter("scmstore.indexedlog.sync");
static INDEXEDLOG_AUTO_SYNC_COUNT: Counter = Counter::new_counter("scmstore.indexedlog.auto_sync");

impl FileStore {
/// Get the "local content" without going through the heavyweight "fetch" API.
pub(crate) fn get_local_content_direct(&self, id: &HgId) -> Result<Option<Bytes>> {
Expand Down Expand Up @@ -199,7 +207,6 @@ impl FileStore {
let edenapi = self.edenapi.clone();
let cas_client = self.cas_client.clone();
let lfs_remote = self.lfs_remote.clone();
let metrics = self.metrics.clone();
let activity_logger = self.activity_logger.clone();
let format = self.format();

Expand Down Expand Up @@ -317,9 +324,12 @@ impl FileStore {

state.derive_computable(aux_cache.as_ref().map(|s| s.as_ref()));

metrics.write().fetch += state.metrics().clone();
state.finish();

// These aren't technically filestore specific, but this will keep them updated.
INDEXEDLOG_SYNC_COUNT.add(SYNC_COUNT.swap(0, Ordering::Relaxed) as usize);
INDEXEDLOG_AUTO_SYNC_COUNT.add(AUTO_SYNC_COUNT.swap(0, Ordering::Relaxed) as usize);

if let Some(activity_logger) = activity_logger {
if let Err(err) = activity_logger.lock().log_file_fetch(
activity_logger_keys,
Expand Down Expand Up @@ -463,10 +473,8 @@ impl FileStore {
for (k, v) in metrics.metrics() {
hg_metrics::increment_counter(k, v as u64);
}
hg_metrics::increment_counter("scmstore.file.flush", 1);
if let Err(err) = metrics.fetch.update_ods() {
tracing::error!("Error updating ods fetch metrics: {}", err);
}

FILESTORE_FLUSH_COUNT.increment();

result
}
Expand Down
5 changes: 3 additions & 2 deletions eden/scm/lib/revisionstore/src/scmstore/file/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use types::CasFetchedStats;
use types::Key;
use types::Sha256;

use super::metrics;
use crate::error::ClonableError;
use crate::indexedlogauxstore::AuxStore;
use crate::indexedlogdatastore::Entry;
Expand Down Expand Up @@ -75,7 +76,7 @@ pub struct FetchState {
lfs_progress: Arc<AggregatingProgressBar>,

/// Track fetch metrics,
metrics: FileStoreFetchMetrics,
metrics: &'static FileStoreFetchMetrics,

// Config
compute_aux_data: bool,
Expand All @@ -102,7 +103,7 @@ impl FetchState {
FetchState {
common: CommonFetchState::new(keys, attrs, found_tx, fetch_mode),
errors: FetchErrors::new(),
metrics: FileStoreFetchMetrics::default(),
metrics: &metrics::FILE_STORE_FETCH_METRICS,

lfs_pointers: HashMap::new(),

Expand Down
106 changes: 26 additions & 80 deletions eden/scm/lib/revisionstore/src/scmstore/file/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,83 +9,41 @@ use std::ops::AddAssign;
use std::sync::Arc;

use parking_lot::RwLock;
#[cfg(feature = "ods")]
use stats::prelude::*;

use crate::scmstore::metrics::namespaced;
use crate::scmstore::metrics::static_cas_backend_metrics;
use crate::scmstore::metrics::static_fetch_metrics;
use crate::scmstore::metrics::static_local_cache_fetch_metrics;
use crate::scmstore::metrics::ApiMetrics;
use crate::scmstore::metrics::CasBackendMetrics;
use crate::scmstore::metrics::FetchMetrics;
use crate::scmstore::metrics::LocalAndCacheFetchMetrics;
use crate::scmstore::metrics::WriteMetrics;

#[derive(Clone, Debug, Default)]
pub struct FileStoreFetchMetrics {
pub(crate) indexedlog: LocalAndCacheFetchMetrics,
pub(crate) lfs: LocalAndCacheFetchMetrics,
pub(crate) aux: LocalAndCacheFetchMetrics,
pub(crate) edenapi: FetchMetrics,
pub(crate) cas: FetchMetrics,
pub(crate) cas_backend: CasBackendMetrics,
}
static_local_cache_fetch_metrics!(INDEXEDLOG, "scmstore.file.fetch.indexedlog");
static_local_cache_fetch_metrics!(LFS, "scmstore.file.fetch.lfs");
static_local_cache_fetch_metrics!(AUX, "scmstore.file.fetch.aux");
static_fetch_metrics!(EDENAPI, "scmstore.file.fetch.edenapi");
static_fetch_metrics!(CAS, "scmstore.file.fetch.cas");

impl AddAssign for FileStoreFetchMetrics {
fn add_assign(&mut self, rhs: Self) {
self.indexedlog += rhs.indexedlog;
self.lfs += rhs.lfs;
self.aux += rhs.aux;
self.edenapi += rhs.edenapi;
self.cas += rhs.cas;
self.cas_backend += rhs.cas_backend;
}
}
static_cas_backend_metrics!(CAS_BACKEND, "scmstore.file.fetch.cas");

impl FileStoreFetchMetrics {
fn metrics(&self) -> impl Iterator<Item = (String, usize)> {
namespaced("indexedlog", self.indexedlog.metrics())
.chain(namespaced("lfs", self.lfs.metrics()))
.chain(namespaced("aux", self.aux.metrics()))
.chain(namespaced("edenapi", self.edenapi.metrics()))
.chain(namespaced("cas", self.cas.metrics()))
.chain(namespaced("cas", self.cas_backend.metrics()))
}
/// Update ODS stats.
/// This assumes that fbinit was called higher up the stack.
/// It is meant to be used when called from eden which uses the `revisionstore` with
/// the `ods` feature flag.
#[cfg(feature = "ods")]
pub(crate) fn update_ods(&self) -> anyhow::Result<()> {
use std::sync::atomic::Ordering;

use indexedlog::log::AUTO_SYNC_COUNT;
use indexedlog::log::SYNC_COUNT;

// Just give up if fbinit hasn't been called (e.g. in tests or from sl).
if !fbinit::was_performed() {
return Ok(());
}

let fb = fbinit::expect_init();

for (metric, value) in self.metrics() {
STATS::fetch.increment_value(fb, value.try_into()?, (metric,));
}

// Assume we are called from flush()
STATS::flush.increment_value(fb, 1);

// These aren't technically filestore specific, but we don't have a convenient generic ODS logging spot.
STATS::indexedlog_sync
.increment_value(fb, SYNC_COUNT.swap(0, Ordering::Relaxed).try_into()?);
STATS::indexedlog_auto_sync
.increment_value(fb, AUTO_SYNC_COUNT.swap(0, Ordering::Relaxed).try_into()?);

Ok(())
}
#[cfg(not(feature = "ods"))]
pub(crate) fn update_ods(&self) -> anyhow::Result<()> {
Ok(())
}
pub(crate) static FILE_STORE_FETCH_METRICS: FileStoreFetchMetrics = FileStoreFetchMetrics {
indexedlog: &INDEXEDLOG,
lfs: &LFS,
aux: &AUX,
edenapi: &EDENAPI,
cas: &CAS,
cas_backend: &CAS_BACKEND,
};

pub struct FileStoreFetchMetrics {
pub(crate) indexedlog: &'static LocalAndCacheFetchMetrics,
pub(crate) lfs: &'static LocalAndCacheFetchMetrics,
pub(crate) aux: &'static LocalAndCacheFetchMetrics,
pub(crate) edenapi: &'static FetchMetrics,
pub(crate) cas: &'static FetchMetrics,
pub(crate) cas_backend: &'static CasBackendMetrics,
}

#[derive(Clone, Debug, Default)]
Expand Down Expand Up @@ -179,7 +137,6 @@ impl FileStoreApiMetrics {

#[derive(Debug, Default, Clone)]
pub struct FileStoreMetrics {
pub(crate) fetch: FileStoreFetchMetrics,
pub(crate) write: FileStoreWriteMetrics,
pub(crate) api: FileStoreApiMetrics,
}
Expand All @@ -192,18 +149,7 @@ impl FileStoreMetrics {
pub fn metrics(&self) -> impl Iterator<Item = (String, usize)> {
namespaced(
"scmstore.file",
namespaced("fetch", self.fetch.metrics())
.chain(namespaced("write", self.write.metrics()))
.chain(namespaced("api", self.api.metrics())),
namespaced("write", self.write.metrics()).chain(namespaced("api", self.api.metrics())),
)
}
}

#[cfg(feature = "ods")]
define_stats! {
prefix = "scmstore";
fetch: dynamic_singleton_counter("file.fetch.{}", (specific_counter: String)),
flush: singleton_counter("file"),
indexedlog_sync: singleton_counter("indexedlog.sync"),
indexedlog_auto_sync: singleton_counter("indexedlog.auto_sync"),
}
Loading

0 comments on commit 79c30a5

Please sign in to comment.