Skip to content

Commit

Permalink
Instrument code push more (#23536)
Browse files Browse the repository at this point in the history
Additional instrumentation for code pushes

GitOrigin-RevId: 3cdbbef5f3f753101db4237ef91cdc3d034ec672
  • Loading branch information
thomasballinger authored and Convex, Inc. committed Mar 16, 2024
1 parent e1062f9 commit 9df8476
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 17 deletions.
46 changes: 39 additions & 7 deletions crates/application/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ use database::{
FastForwardIndexWorker,
IndexModel,
IndexWorker,
OccRetryStats,
SearchIndexWorker,
ShortBoxFuture,
Snapshot,
Expand Down Expand Up @@ -1670,11 +1671,13 @@ impl<RT: Runtime> Application<RT> {
&self,
identity: Identity,
apply_config_args: ApplyConfigArgs,
) -> anyhow::Result<ConfigMetadataAndSchema> {
) -> anyhow::Result<(ConfigMetadataAndSchema, OccRetryStats)> {
let runner = self.runner.clone();
self.execute_with_audit_log_events_and_occ_retries(identity, "apply_config", |tx| {
Self::_apply_config(runner.clone(), tx, apply_config_args.clone()).into()
})
self.execute_with_audit_log_events_and_occ_retries_reporting_stats(
identity,
"apply_config",
|tx| Self::_apply_config(runner.clone(), tx, apply_config_args.clone()).into(),
)
.await
}

Expand Down Expand Up @@ -2387,6 +2390,34 @@ impl<RT: Runtime> Application<RT> {
write_source: impl Into<WriteSource>,
f: F,
) -> anyhow::Result<T>
where
F: Send + Sync,
T: Send + 'static,
F: for<'b> Fn(
&'b mut Transaction<RT>,
) -> ShortBoxFuture<
'_,
'a,
'b,
anyhow::Result<(T, Vec<DeploymentAuditLogEvent>)>,
>,
{
self.execute_with_audit_log_events_and_occ_retries_with_pause_client(
identity,
PauseClient::new(),
write_source,
f,
)
.await
.map(|(t, _)| t)
}

pub async fn execute_with_audit_log_events_and_occ_retries_reporting_stats<'a, F, T>(
&self,
identity: Identity,
write_source: impl Into<WriteSource>,
f: F,
) -> anyhow::Result<(T, OccRetryStats)>
where
F: Send + Sync,
T: Send + 'static,
Expand Down Expand Up @@ -2414,7 +2445,7 @@ impl<RT: Runtime> Application<RT> {
pause_client: PauseClient,
write_source: impl Into<WriteSource>,
f: F,
) -> anyhow::Result<T>
) -> anyhow::Result<(T, OccRetryStats)>
where
F: Send + Sync,
T: Send + 'static,
Expand All @@ -2428,7 +2459,7 @@ impl<RT: Runtime> Application<RT> {
>,
{
let db = self.database.clone();
let (ts, (t, events)) = db
let (ts, (t, events), stats) = db
.execute_with_occ_retries(
identity,
FunctionUsageTracker::new(),
Expand All @@ -2447,7 +2478,7 @@ impl<RT: Runtime> Application<RT> {
.try_collect()?;

self.log_sender.send_logs(logs);
Ok(t)
Ok((t, stats))
}

pub async fn execute_with_occ_retries<'a, T, F>(
Expand All @@ -2466,6 +2497,7 @@ impl<RT: Runtime> Application<RT> {
self.database
.execute_with_occ_retries(identity, usage, pause_client, write_source, f)
.await
.map(|(ts, t, _)| (ts, t))
}

pub async fn shutdown(&self) -> anyhow::Result<()> {
Expand Down
4 changes: 2 additions & 2 deletions crates/application/src/snapshot_import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1490,7 +1490,7 @@ async fn finalize_import<RT: Runtime>(
// If we inserted into an existing table, we're done because the table is
// now populated and active.
// If we inserted into an Hidden table, make it Active.
let (ts, documents_deleted) = database
let (ts, documents_deleted, _) = database
.execute_with_occ_retries(
identity,
FunctionUsageTracker::new(),
Expand Down Expand Up @@ -2078,7 +2078,7 @@ async fn prepare_table_for_import<RT: Runtime>(
insert_into_existing_table_id
} else {
let table_number = table_number.or(existing_table_id.map(|id| id.table_number));
let (_, table_id) = database
let (_, table_id, _) = database
.execute_with_occ_retries(
identity.clone(),
FunctionUsageTracker::new(),
Expand Down
32 changes: 28 additions & 4 deletions crates/database/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ use std::{
LazyLock,
OnceLock,
},
time::Duration,
time::{
Duration,
Instant,
},
};

use anyhow::{
Expand Down Expand Up @@ -1260,7 +1263,7 @@ impl<RT: Runtime> Database<RT> {
mut pause_client: PauseClient,
write_source: impl Into<WriteSource>,
f: F,
) -> anyhow::Result<(Timestamp, T)>
) -> anyhow::Result<(Timestamp, T, OccRetryStats)>
where
T: Send,
R: Fn(&Error) -> bool,
Expand All @@ -1274,6 +1277,7 @@ impl<RT: Runtime> Database<RT> {
.begin_with_usage(identity.clone(), usage.clone())
.await?;
pause_client.wait("retry_tx_loop_start").await;
let start = Instant::now();
let result = async {
let t = f(&mut tx).0.await?;
let ts = self
Expand All @@ -1282,6 +1286,7 @@ impl<RT: Runtime> Database<RT> {
Ok((ts, t))
}
.await;
let duration = Instant::now() - start;
match result {
Err(e) => {
if is_retriable(&e) {
Expand All @@ -1294,7 +1299,16 @@ impl<RT: Runtime> Database<RT> {
return Err(e);
}
},
Ok((ts, t)) => return Ok((ts, t)),
Ok((ts, t)) => {
return Ok((
ts,
t,
OccRetryStats {
retries: backoff.failures(),
duration,
},
))
},
}
}
let error =
Expand All @@ -1312,7 +1326,7 @@ impl<RT: Runtime> Database<RT> {
pause_client: PauseClient,
write_source: impl Into<WriteSource>,
f: F,
) -> anyhow::Result<(Timestamp, T)>
) -> anyhow::Result<(Timestamp, T, OccRetryStats)>
where
T: Send,
F: for<'b> Fn(&'b mut Transaction<RT>) -> ShortBoxFuture<'_, 'a, 'b, anyhow::Result<T>>,
Expand Down Expand Up @@ -1878,6 +1892,16 @@ impl<RT: Runtime> Database<RT> {
}
}

/// Transaction statistics reported for a retried transaction
#[derive(Debug, PartialEq, Eq)]
pub struct OccRetryStats {
/// Number of times the transaction was retried. 0 for a transaction that
/// succeeded the first time.
pub retries: u32,
/// The duration of the successful transaction
pub duration: Duration,
}

/// The read that conflicted as part of an OCC
#[derive(Debug, PartialEq, Eq)]
pub struct ConflictingRead {
Expand Down
1 change: 1 addition & 0 deletions crates/database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ pub use self::{
Database,
DatabaseSnapshot,
DocumentDeltas,
OccRetryStats,
ShortBoxFuture,
ShutdownSignal,
SnapshotPage,
Expand Down
50 changes: 46 additions & 4 deletions crates/local_backend/src/deploy_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use common::{
types::NodeDependency,
version::Version,
};
use database::OccRetryStats;
use errors::{
ErrorMetadata,
ErrorMetadataAnyhowExt,
Expand Down Expand Up @@ -139,6 +140,42 @@ pub struct ConfigJson {
pub bundled_module_infos: Option<Vec<BundledModuleInfoJson>>,
}

static NODE_ENVIRONMENT: &str = "node";
impl ConfigJson {
pub fn stats(&self) -> (usize, usize, usize, usize) {
let num_node_modules = self
.modules
.iter()
.filter(|module| module.environment.as_deref() == Some(NODE_ENVIRONMENT))
.count();
let size_node_modules = self
.modules
.iter()
.filter(|module| module.environment.as_deref() == Some(NODE_ENVIRONMENT))
.fold(0, |acc, e| {
acc + e.source.len() + e.source_map.as_ref().map_or(0, |sm| sm.len())
});
let size_v8_modules = self
.modules
.iter()
.filter(|module| module.environment.as_deref() != Some(NODE_ENVIRONMENT))
.fold(0, |acc, e| {
acc + e.source.len() + e.source_map.as_ref().map_or(0, |sm| sm.len())
});
let num_v8_modules = self
.modules
.iter()
.filter(|module| module.environment.as_deref() != Some(NODE_ENVIRONMENT))
.count();
(
num_v8_modules,
num_node_modules,
size_v8_modules,
size_node_modules,
)
}
}

/// API level structure for representing modules as Json
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -198,6 +235,7 @@ pub struct PushAnalytics {
pub udf_server_version: Version,
pub analyze_results: BTreeMap<CanonicalizedModulePath, AnalyzedModule>,
pub schema: Option<DatabaseSchema>,
pub occ_stats: OccRetryStats,
}

#[debug_handler]
Expand Down Expand Up @@ -298,10 +336,13 @@ pub async fn push_config_handler(
source_package.clone(),
)
.await?;
let ConfigMetadataAndSchema {
config_metadata,
schema,
} = application
let (
ConfigMetadataAndSchema {
config_metadata,
schema,
},
occ_stats,
) = application
.apply_config_with_retries(
identity.clone(),
ApplyConfigArgs {
Expand All @@ -324,6 +365,7 @@ pub async fn push_config_handler(
udf_server_version: udf_config.server_version,
analyze_results: analyze_result,
schema,
occ_stats,
},
))
}
Expand Down

0 comments on commit 9df8476

Please sign in to comment.