From 350accbe843d4e807b6089cf331c94124896e1ca Mon Sep 17 00:00:00 2001 From: Lee Danilek Date: Thu, 9 Jan 2025 21:16:03 -0500 Subject: [PATCH] [ENG-8173] share query cache for queries that don't check auth (#32821) Detect in the isolate whether `ctx.auth.getUserIdentity` is ever called. If not, store the query result with `identity: None` in the query cache. When checking whether a cache entry matches, we first check for the precise identity, and then check if there is an entry with `identity: None`. When updating a cache entry, we add a waiting entry for the precise identity, and when the query completes we remove the waiting entry and insert an entry based on whether the query read the identity. NOTE the part i'm worried about is whether this screws with assumptions, since the waiting entry might not be at the same cache key as the eventual result. I think it's okay because if multiple requests are waiting, they already have to handle the executing request not populating the cache. Added tests which confirm desired behavior. Open to suggestions for more tests. GitOrigin-RevId: f6a2c9cf8607077d62b3e0a89057fff2f9615330 --- crates/application/src/cache/mod.rs | 174 +++++++++++++----- crates/application/src/tests/query_cache.rs | 72 +++++++- crates/common/src/types/functions.rs | 2 +- .../src/environment/udf/async_syscall.rs | 6 + crates/isolate/src/environment/udf/mod.rs | 2 + crates/isolate/src/environment/udf/outcome.rs | 10 + crates/isolate/src/environment/udf/phase.rs | 24 +++ crates/isolate/src/isolate2/environment.rs | 1 + crates/isolate/src/isolate2/runner.rs | 16 +- crates/isolate/src/tests/auth.rs | 74 ++++---- crates/pb/protos/outcome.proto | 2 + 11 files changed, 287 insertions(+), 96 deletions(-) diff --git a/crates/application/src/cache/mod.rs b/crates/application/src/cache/mod.rs index 46cafea7..bb347e38 100644 --- a/crates/application/src/cache/mod.rs +++ b/crates/application/src/cache/mod.rs @@ -117,8 +117,13 @@ impl InstanceId { } } +/// A cache key representing a specific query request, before it runs. +/// It may have more specific information than a `CacheKey`, because multiple +/// query requests may be served by the same cache entry. +/// e.g. if a query does not check `ctx.auth`, then `TargetCacheKey` +/// contains the identity, but `CacheKey` does not. #[derive(Clone, Eq, PartialEq, Hash, Debug)] -pub struct CacheKey { +pub struct RequestedCacheKey { instance: InstanceId, path: PublicFunctionPath, args: ConvexArray, @@ -127,7 +132,76 @@ pub struct CacheKey { allowed_visibility: AllowedVisibility, } -impl CacheKey { +impl RequestedCacheKey { + // In order from most specific to least specific. + fn _possible_cache_keys(&self) -> Vec { + vec![ + self.precise_cache_key(), + StoredCacheKey { + instance: self.instance, + path: self.path.clone(), + args: self.args.clone(), + // Include queries that did not read `ctx.auth`. + identity: None, + journal: self.journal.clone(), + allowed_visibility: self.allowed_visibility, + }, + ] + } + + fn precise_cache_key(&self) -> StoredCacheKey { + StoredCacheKey { + instance: self.instance, + path: self.path.clone(), + args: self.args.clone(), + identity: Some(self.identity.clone()), + journal: self.journal.clone(), + allowed_visibility: self.allowed_visibility, + } + } + + fn get_cache_entry<'a>( + &'a self, + cache: &'a mut LruCache, + ) -> (Option<&'a CacheEntry>, StoredCacheKey) { + for key in self._possible_cache_keys() { + if cache.contains(&key) { + return (Some(cache.get(&key).unwrap()), key); + } + } + (None, self.precise_cache_key()) + } + + fn cache_key_after_execution(&self, outcome: &UdfOutcome) -> StoredCacheKey { + let identity = if outcome.observed_identity { + Some(self.identity.clone()) + } else { + None + }; + StoredCacheKey { + instance: self.instance, + path: self.path.clone(), + args: self.args.clone(), + identity, + journal: outcome.journal.clone(), + allowed_visibility: self.allowed_visibility, + } + } +} + +/// A cache key representing a persisted query result. +#[derive(Clone, Eq, PartialEq, Hash, Debug)] +pub struct StoredCacheKey { + instance: InstanceId, + path: PublicFunctionPath, + args: ConvexArray, + // None means that the query did not read `ctx.auth`. + identity: Option, + journal: QueryJournal, + allowed_visibility: AllowedVisibility, +} + +impl StoredCacheKey { /// Approximate size in-memory of the CacheEntry structure, including stack /// and heap allocated memory. fn size(&self) -> usize { @@ -253,7 +327,7 @@ impl CacheManager { ) -> anyhow::Result<(QueryReturn, bool)> { let start = self.rt.monotonic_now(); let identity_cache_key = identity.cache_key(); - let key = CacheKey { + let requested_key = RequestedCacheKey { instance: self.instance_id, path, args, @@ -279,11 +353,16 @@ impl CacheManager { // Step 1: Decide what we're going to do this iteration: use a cached value, // wait on someone else to run a UDF, or run the UDF ourselves. - let maybe_op = - self.cache - .plan_cache_op(&key, start, now, &identity, ts, context.clone()); - let op: CacheOp = match maybe_op { - Some(op) => op, + let maybe_op = self.cache.plan_cache_op( + &requested_key, + start, + now, + &identity, + ts, + context.clone(), + ); + let (op, stored_key) = match maybe_op { + Some(op_key) => op_key, None => continue 'top, }; @@ -296,7 +375,7 @@ impl CacheManager { _ => None, }; let mut waiting_entry_guard = - WaitingEntryGuard::new(waiting_entry_id, &key, self.cache.clone()); + WaitingEntryGuard::new(waiting_entry_id, &stored_key, self.cache.clone()); // Step 2: Perform our cache operation, potentially running the UDF. let is_cache_hit = match op { @@ -309,7 +388,7 @@ impl CacheManager { CacheOp::Go { .. } => false, }; let (result, table_stats) = match self - .perform_cache_op(&key, op, usage_tracker.clone()) + .perform_cache_op(&stored_key, op, usage_tracker.clone()) .await? { Some(r) => r, @@ -319,7 +398,7 @@ impl CacheManager { // Step 3: Validate that the cache result we got is good enough. Is our desired // timestamp in its validity interval? If it looked at system time, is it not // too old? - let cache_result = match self.validate_cache_result(&key, ts, result).await? { + let cache_result = match self.validate_cache_result(&stored_key, ts, result).await? { Some(r) => r, None => continue 'top, }; @@ -328,8 +407,10 @@ impl CacheManager { // bump the cache result's token. This method will discard the new value if the // UDF failed or if a newer (i.e. higher `original_ts`) value is in the cache. if cache_result.outcome.result.is_ok() { + let actual_stored_key = + requested_key.cache_key_after_execution(&cache_result.outcome); // We do not cache JSErrors - waiting_entry_guard.complete(cache_result.clone()); + waiting_entry_guard.complete(actual_stored_key, cache_result.clone()); } else { drop(waiting_entry_guard); } @@ -364,7 +445,7 @@ impl CacheManager { #[minitrace::trace] async fn perform_cache_op( &self, - key: &CacheKey, + key: &StoredCacheKey, op: CacheOp<'_>, usage_tracker: FunctionUsageTracker, ) -> anyhow::Result)>> { @@ -503,7 +584,7 @@ impl CacheManager { #[minitrace::trace] async fn validate_cache_result( &self, - key: &CacheKey, + key: &StoredCacheKey, ts: Timestamp, mut result: CacheResult, ) -> anyhow::Result> { @@ -562,12 +643,12 @@ impl CacheManager { // canceled. struct WaitingEntryGuard<'a> { entry_id: Option, - key: &'a CacheKey, + key: &'a StoredCacheKey, cache: QueryCache, } impl<'a> WaitingEntryGuard<'a> { - fn new(entry_id: Option, key: &'a CacheKey, cache: QueryCache) -> Self { + fn new(entry_id: Option, key: &'a StoredCacheKey, cache: QueryCache) -> Self { Self { entry_id, key, @@ -576,11 +657,11 @@ impl<'a> WaitingEntryGuard<'a> { } // Marks the waiting entry as removed, so we don't have to remove it on Drop - fn complete(&mut self, result: CacheResult) { - self.cache.put_ready(self.key.clone(), result); - // We just performed put_ready so there is no need to perform remove_waiting - // on Drop. - self.entry_id.take(); + fn complete(&mut self, actual_stored_key: StoredCacheKey, result: CacheResult) { + if let Some(entry_id) = self.entry_id.take() { + self.cache.remove_waiting(self.key, entry_id); + self.cache.put_ready(actual_stored_key, result); + } } } @@ -594,7 +675,7 @@ impl Drop for WaitingEntryGuard<'_> { } struct Inner { - cache: LruCache, + cache: LruCache, size: usize, size_limit: usize, @@ -621,13 +702,13 @@ impl QueryCache { fn plan_cache_op<'a>( &self, - key: &'a CacheKey, + key: &'a RequestedCacheKey, start: tokio::time::Instant, now: tokio::time::Instant, identity: &'a Identity, ts: Timestamp, context: ExecutionContext, - ) -> Option> { + ) -> Option<(CacheOp<'a>, StoredCacheKey)> { let go = |sender: Option<(Sender<_>, u64)>| { let (sender, waiting_entry_id) = match sender { Some((sender, waiting_entry_id)) => (sender, Some(waiting_entry_id)), @@ -646,22 +727,23 @@ impl QueryCache { identity, ts, journal: &key.journal, - allowed_visibility: key.allowed_visibility.clone(), + allowed_visibility: key.allowed_visibility, context, } }; let mut inner = self.inner.lock(); - let op = match inner.cache.get(key) { + let (entry, stored_key) = key.get_cache_entry(&mut inner.cache); + let op = match entry { Some(CacheEntry::Ready(r)) => { if ts < r.original_ts { // If another request has already executed this UDF at a // newer timestamp, we can't use the cache. Re-execute // in this case. - tracing::debug!("Cache value too new for {:?}", key); + tracing::debug!("Cache value too new for {:?}", stored_key); log_plan_go(GoReason::PeerTimestampTooNew); go(None) } else { - tracing::debug!("Cache value ready for {:?}", key); + tracing::debug!("Cache value ready for {:?}", stored_key); log_plan_ready(); CacheOp::Ready { result: r.clone() } } @@ -675,7 +757,7 @@ impl QueryCache { let entry_id = *id; if *peer_ts > ts { log_plan_go(GoReason::PeerTimestampTooNew); - return Some(go(None)); + return Some((go(None), stored_key)); } // We don't serialize sampling `now` under the cache lock, and since it can // occur on different threads, we're not guaranteed that @@ -688,13 +770,13 @@ impl QueryCache { "Peer timed out ({:?}), removing cache entry and retrying", peer_elapsed ); - inner.remove_waiting(key, entry_id); + inner.remove_waiting(&stored_key, entry_id); log_plan_peer_timeout(); return None; } let get_elapsed = now - start; let remaining = *TOTAL_QUERY_TIMEOUT - cmp::max(peer_elapsed, get_elapsed); - tracing::debug!("Waiting for peer to compute {:?}", key); + tracing::debug!("Waiting for peer to compute {:?}", stored_key); log_plan_wait(); CacheOp::Wait { waiting_entry_id: *id, @@ -703,24 +785,24 @@ impl QueryCache { } }, None => { - tracing::debug!("No cache value for {:?}, executing UDF...", key); - let (sender, executor_id) = inner.put_waiting(key.clone(), now, ts); + tracing::debug!("No cache value for {:?}, executing UDF...", stored_key); + let (sender, executor_id) = inner.put_waiting(stored_key.clone(), now, ts); log_plan_go(GoReason::NoCacheResult); go(Some((sender, executor_id))) }, }; - Some(op) + Some((op, stored_key)) } - fn remove_waiting(&self, key: &CacheKey, entry_id: u64) { + fn remove_waiting(&self, key: &StoredCacheKey, entry_id: u64) { self.inner.lock().remove_waiting(key, entry_id) } - fn remove_ready(&self, key: &CacheKey, original_ts: Timestamp) { + fn remove_ready(&self, key: &StoredCacheKey, original_ts: Timestamp) { self.inner.lock().remove_ready(key, original_ts) } - fn put_ready(&self, key: CacheKey, result: CacheResult) { + fn put_ready(&self, key: StoredCacheKey, result: CacheResult) { self.inner.lock().put_ready(key, result) } } @@ -728,7 +810,7 @@ impl QueryCache { impl Inner { // Remove only a `CacheEntry::Ready` from the cache, predicated on its // `executor_id` matching. - fn remove_waiting(&mut self, key: &CacheKey, entry_id: u64) { + fn remove_waiting(&mut self, key: &StoredCacheKey, entry_id: u64) { match self.cache.get(key) { Some(CacheEntry::Waiting { id, .. }) if *id == entry_id => { let (actual_key, entry) = self.cache.pop_entry(key).unwrap(); @@ -741,7 +823,7 @@ impl Inner { // Remove only a `CacheEntry::Ready` from the cache, predicated on its // `original_ts` matching. - fn remove_ready(&mut self, key: &CacheKey, original_ts: Timestamp) { + fn remove_ready(&mut self, key: &StoredCacheKey, original_ts: Timestamp) { match self.cache.get(key) { Some(CacheEntry::Ready(ref result)) if result.original_ts == original_ts => { let (actual_key, entry) = self.cache.pop_entry(key).unwrap(); @@ -754,7 +836,7 @@ impl Inner { fn put_waiting( &mut self, - key: CacheKey, + key: StoredCacheKey, now: tokio::time::Instant, ts: Timestamp, ) -> (Sender, u64) { @@ -786,7 +868,7 @@ impl Inner { // Put a `CacheEntry::Ready` into the cache, potentially dropping it if there's // already a value with a higher `original_ts`. - fn put_ready(&mut self, key: CacheKey, result: CacheResult) { + fn put_ready(&mut self, key: StoredCacheKey, result: CacheResult) { match self.cache.get_mut(&key) { Some(entry @ CacheEntry::Waiting { .. }) => { let new_entry = CacheEntry::Ready(result); @@ -899,15 +981,15 @@ mod tests { }; use super::{ - CacheKey, CacheResult, InstanceId, QueryCache, + StoredCacheKey, }; // Construct a cache key where as many fields as possible have extra capacity in // them - fn make_cache_key() -> CacheKey { + fn make_cache_key() -> StoredCacheKey { macro_rules! with_extra_capacity { ($e:expr) => {{ let mut r = $e; @@ -915,7 +997,7 @@ mod tests { r }}; } - CacheKey { + StoredCacheKey { instance: InstanceId(0), path: PublicFunctionPath::RootExport(ExportPath::from(CanonicalizedUdfPath::new( CanonicalizedModulePath::new( @@ -929,7 +1011,9 @@ mod tests { ))), args: ConvexArray::try_from(with_extra_capacity!(vec![ConvexValue::from(100.)])) .unwrap(), - identity: IdentityCacheKey::InstanceAdmin(with_extra_capacity!("admin".to_string())), + identity: Some(IdentityCacheKey::InstanceAdmin(with_extra_capacity!( + "admin".to_string() + ))), journal: QueryJournal { end_cursor: Some(Cursor { position: CursorPosition::After(IndexKeyBytes(with_extra_capacity!( diff --git a/crates/application/src/tests/query_cache.rs b/crates/application/src/tests/query_cache.rs index 3b6fe6d0..4a18bd8a 100644 --- a/crates/application/src/tests/query_cache.rs +++ b/crates/application/src/tests/query_cache.rs @@ -101,6 +101,10 @@ async fn test_query_cache(rt: TestRuntime) -> anyhow::Result<()> { // The query gets the current time, but the result is cached so the results // should match. + // It's a bit weird to be using Date.now() to test this, since we just want + // to know that the query was cached. The purpose is to assert that the + // function is not re-executing. If it were re-executing, the Date.now() + // would be different. assert_eq!(result1, result2); Ok(()) @@ -133,6 +137,57 @@ async fn test_query_cache_data_invalidation(rt: TestRuntime) -> anyhow::Result<( Ok(()) } +#[convex_macro::test_runtime] +async fn test_query_cache_time_invalidation(rt: TestRuntime) -> anyhow::Result<()> { + let application = Application::new_for_tests(&rt).await?; + application.load_udf_tests_modules().await?; + + let time_result1 = run_query( + &application, + "basic:readTimeMs", + json!({}), + Identity::system(), + false, + ) + .await?; + let do_nothing_result1 = run_query( + &application, + "basic:doNothing", + json!({}), + Identity::system(), + false, + ) + .await?; + rt.advance_time(Duration::from_mins(20)).await; + + // Write a new object to bump timestamps. + // It doesn't have to succeed; it'll still bump the timestamp. + let _ = insert_object(&application).await; + + // After 20 minutes, the time query is not cached anymore. + let time_result2 = run_query( + &application, + "basic:readTimeMs", + json!({}), + Identity::system(), + false, + ) + .await?; + assert_ne!(time_result1, time_result2); + // The doNothing query is still cached. + let do_nothing_result2 = run_query( + &application, + "basic:doNothing", + json!({}), + Identity::system(), + true, + ) + .await?; + assert_eq!(do_nothing_result1, do_nothing_result2); + + Ok(()) +} + #[convex_macro::test_runtime] async fn test_query_cache_precise_data_invalidation(rt: TestRuntime) -> anyhow::Result<()> { let application = Application::new_for_tests(&rt).await?; @@ -245,17 +300,18 @@ async fn test_query_cache_without_checking_auth(rt: TestRuntime) -> anyhow::Resu ) .await?; rt.advance_time(Duration::from_secs(1)).await; - // TODO(lee) The query doesn't read ctx.auth, so should be cached even across - // identities, but it's not. + // The query doesn't read ctx.auth, so it's cached across identities. let result2 = run_query( &application, "basic:readTimeMs", json!({}), Identity::system(), - false, + true, ) .await?; - assert_ne!(result1, result2); + // Result is the same because the query doesn't re-execute and get a new + // Date.now(). + assert_eq!(result1, result2); Ok(()) } @@ -275,19 +331,20 @@ async fn test_query_cache_with_conditional_auth_check(rt: TestRuntime) -> anyhow ) .await?; rt.advance_time(Duration::from_secs(1)).await; - // TODO(lee) The query should be cached across identities, but it's not. + // The query is cached across identities. let result2 = run_query( &application, "auth:conditionallyCheckAuth", json!({}), Identity::system(), - false, + true, ) .await?; - assert_ne!(result1, result2); + assert_eq!(result1, result2); insert_object(&application).await?; + // Now that there's an object, the query checks auth. let result3 = run_query( &application, "auth:conditionallyCheckAuth", @@ -306,6 +363,7 @@ async fn test_query_cache_with_conditional_auth_check(rt: TestRuntime) -> anyhow .await?; assert_ne!(result1, result3); assert_ne!(result1, result4); + assert_ne!(result3, result4); // different auth Ok(()) } diff --git a/crates/common/src/types/functions.rs b/crates/common/src/types/functions.rs index f48b8ff0..c28d0d49 100644 --- a/crates/common/src/types/functions.rs +++ b/crates/common/src/types/functions.rs @@ -133,7 +133,7 @@ impl fmt::Display for UdfIdentifier { } } -#[derive(PartialEq, Eq, Clone, Debug, Hash)] +#[derive(PartialEq, Eq, Clone, Copy, Debug, Hash)] pub enum AllowedVisibility { PublicOnly, All, diff --git a/crates/isolate/src/environment/udf/async_syscall.rs b/crates/isolate/src/environment/udf/async_syscall.rs index 5b12f389..1b01e6c4 100644 --- a/crates/isolate/src/environment/udf/async_syscall.rs +++ b/crates/isolate/src/environment/udf/async_syscall.rs @@ -272,6 +272,7 @@ pub trait AsyncSyscallProvider { fn context(&self) -> &ExecutionContext; fn unix_timestamp(&self) -> anyhow::Result; + fn observe_identity(&self) -> anyhow::Result<()>; fn persistence_version(&self) -> PersistenceVersion; fn is_system(&self) -> bool; @@ -349,6 +350,10 @@ impl AsyncSyscallProvider for DatabaseUdfEnvironment { self.phase.unix_timestamp() } + fn observe_identity(&self) -> anyhow::Result<()> { + self.phase.observe_identity() + } + fn persistence_version(&self) -> PersistenceVersion { self.persistence_version } @@ -734,6 +739,7 @@ impl> DatabaseSyscallsV1 { #[convex_macro::instrument_future] async fn get_user_identity(provider: &mut P, _args: JsonValue) -> anyhow::Result { + provider.observe_identity()?; // TODO: Somehow make the Transaction aware of the dependency on the user. let tx = provider.tx()?; let user_identity = tx.user_identity(); diff --git a/crates/isolate/src/environment/udf/mod.rs b/crates/isolate/src/environment/udf/mod.rs index ed260b71..6b954db6 100644 --- a/crates/isolate/src/environment/udf/mod.rs +++ b/crates/isolate/src/environment/udf/mod.rs @@ -451,6 +451,7 @@ impl DatabaseUdfEnvironment { path: self.path.for_logging(), arguments: self.arguments, identity: self.identity, + observed_identity: self.phase.observed_identity(), rng_seed, observed_rng: self.phase.observed_rng(), unix_timestamp, @@ -470,6 +471,7 @@ impl DatabaseUdfEnvironment { path: self.path.for_logging(), arguments: self.arguments, identity: self.identity, + observed_identity: self.phase.observed_identity(), rng_seed, observed_rng: self.phase.observed_rng(), unix_timestamp, diff --git a/crates/isolate/src/environment/udf/outcome.rs b/crates/isolate/src/environment/udf/outcome.rs index f8b1eaa3..0fc451e6 100644 --- a/crates/isolate/src/environment/udf/outcome.rs +++ b/crates/isolate/src/environment/udf/outcome.rs @@ -46,6 +46,7 @@ pub struct UdfOutcome { pub path: CanonicalizedComponentFunctionPath, pub arguments: ConvexArray, pub identity: InertIdentity, + pub observed_identity: bool, pub rng_seed: [u8; 32], pub observed_rng: bool, @@ -81,6 +82,7 @@ impl Arbitrary for UdfOutcome { any::(), any::(), any::(), + any::(), any::(), any::(), any::>(), @@ -95,6 +97,7 @@ impl Arbitrary for UdfOutcome { observed_rng, unix_timestamp, observed_time, + observed_identity, log_lines, journal, result, @@ -107,6 +110,7 @@ impl Arbitrary for UdfOutcome { observed_rng, unix_timestamp, observed_time, + observed_identity, log_lines, journal, result, @@ -138,6 +142,7 @@ impl TryFrom for UdfOutcomeProto { path: _, arguments: _, identity: _, + observed_identity, rng_seed, observed_rng, unix_timestamp, @@ -164,6 +169,7 @@ impl TryFrom for UdfOutcomeProto { result: Some(result), }), syscall_trace: Some(syscall_trace.try_into()?), + observed_identity: Some(observed_identity), }) } } @@ -192,6 +198,7 @@ impl UdfOutcome { result: Err(js_error), syscall_trace: SyscallTrace::new(), udf_server_version, + observed_identity: false, }) } @@ -205,6 +212,7 @@ impl UdfOutcome { journal, result, syscall_trace, + observed_identity, }: UdfOutcomeProto, path_and_args: ValidatedPathAndArgs, identity: InertIdentity, @@ -245,6 +253,8 @@ impl UdfOutcome { .ok_or_else(|| anyhow::anyhow!("Missing syscall_trace"))? .try_into()?, udf_server_version, + // TODO(lee): Remove the default once we've pushed all services. + observed_identity: observed_identity.unwrap_or(true), }) } } diff --git a/crates/isolate/src/environment/udf/phase.rs b/crates/isolate/src/environment/udf/phase.rs index 1765a9d6..92fa65dd 100644 --- a/crates/isolate/src/environment/udf/phase.rs +++ b/crates/isolate/src/environment/udf/phase.rs @@ -94,6 +94,7 @@ enum UdfPreloaded { observed_rng_during_execution: bool, unix_timestamp: Option, observed_time_during_execution: AtomicBool, + observed_identity_during_execution: AtomicBool, env_vars: Option, component: ComponentId, component_arguments: Option>, @@ -175,6 +176,7 @@ impl UdfPhase { observed_rng_during_execution: false, unix_timestamp, observed_time_during_execution: AtomicBool::new(false), + observed_identity_during_execution: AtomicBool::new(false), env_vars, component, component_arguments: component_args, @@ -405,6 +407,18 @@ impl UdfPhase { Ok(unix_timestamp) } + pub fn observe_identity(&self) -> anyhow::Result<()> { + let UdfPreloaded::Ready { + ref observed_identity_during_execution, + .. + } = self.preloaded + else { + anyhow::bail!("Phase not initialized"); + }; + observed_identity_during_execution.store(true, Ordering::SeqCst); + Ok(()) + } + pub fn observed_rng(&self) -> bool { match self.preloaded { UdfPreloaded::Ready { @@ -425,6 +439,16 @@ impl UdfPhase { } } + pub fn observed_identity(&self) -> bool { + match self.preloaded { + UdfPreloaded::Ready { + ref observed_identity_during_execution, + .. + } => observed_identity_during_execution.load(Ordering::SeqCst), + UdfPreloaded::Created => false, + } + } + pub fn module_loader(&self) -> &Arc> { &self.module_loader } diff --git a/crates/isolate/src/isolate2/environment.rs b/crates/isolate/src/isolate2/environment.rs index e2b96d8c..5aa63573 100644 --- a/crates/isolate/src/isolate2/environment.rs +++ b/crates/isolate/src/isolate2/environment.rs @@ -20,6 +20,7 @@ use value::{ pub struct EnvironmentOutcome { pub observed_rng: bool, pub observed_time: bool, + pub observed_identity: bool, } pub trait Environment { diff --git a/crates/isolate/src/isolate2/runner.rs b/crates/isolate/src/isolate2/runner.rs index 60db2c4d..e9262db4 100644 --- a/crates/isolate/src/isolate2/runner.rs +++ b/crates/isolate/src/isolate2/runner.rs @@ -223,6 +223,7 @@ enum UdfPhase { rng: ChaCha12Rng, observed_time: bool, observed_rng: bool, + observed_identity: bool, }, Finalized, } @@ -442,18 +443,20 @@ impl Environment for UdfEnvironment { rng: ChaCha12Rng::from_seed(self.execution_time_seed.rng_seed), observed_time: false, observed_rng: false, + observed_identity: false, }; Ok(()) } fn finish_execution(&mut self) -> anyhow::Result { - let (observed_time, observed_rng) = match self.phase { - UdfPhase::Importing { .. } => (false, false), + let (observed_time, observed_rng, observed_identity) = match self.phase { + UdfPhase::Importing { .. } => (false, false, false), UdfPhase::Executing { observed_time, observed_rng, + observed_identity, .. - } => (observed_time, observed_rng), + } => (observed_time, observed_rng, observed_identity), UdfPhase::Finalized => { anyhow::bail!("Phase was already finalized") }, @@ -463,6 +466,7 @@ impl Environment for UdfEnvironment { Ok(EnvironmentOutcome { observed_rng, observed_time, + observed_identity, }) } @@ -548,6 +552,7 @@ async fn run_request( path: path.for_logging(), arguments, identity: tx.inert_identity(), + observed_identity: false, rng_seed: execution_time_seed.rng_seed, observed_rng: false, unix_timestamp: execution_time_seed.unix_timestamp, @@ -690,6 +695,7 @@ async fn run_request( path: path.for_logging(), arguments, identity: provider.tx.inert_identity(), + observed_identity: outcome.observed_identity, rng_seed: execution_time_seed.rng_seed, observed_rng: outcome.observed_rng, unix_timestamp: execution_time_seed.unix_timestamp, @@ -868,6 +874,10 @@ impl AsyncSyscallProvider for Isolate2SyscallProvider<'_, RT> { Ok(self.unix_timestamp) } + fn observe_identity(&self) -> anyhow::Result<()> { + todo!() + } + fn persistence_version(&self) -> PersistenceVersion { self.tx.persistence_version() } diff --git a/crates/isolate/src/tests/auth.rs b/crates/isolate/src/tests/auth.rs index 6998e9e1..b84f51e7 100644 --- a/crates/isolate/src/tests/auth.rs +++ b/crates/isolate/src/tests/auth.rs @@ -13,55 +13,49 @@ use must_let::must_let; use runtime::testing::TestRuntime; use sync_types::UserIdentityAttributes; -use crate::test_helpers::{ - UdfTest, - UdfTestType, -}; +use crate::test_helpers::UdfTest; #[convex_macro::test_runtime] async fn test_auth_basic(rt: TestRuntime) -> anyhow::Result<()> { - UdfTest::run_test_with_isolate2(rt, async move |t: UdfTestType| { - // UDF with no identity should return `null` - must_let!(let ConvexValue::Null = t.query("auth:getName", assert_obj!()).await?); - // With an identity, it should return the user's name - let identity = Identity::user(UserIdentity::test()); - must_let!(let ConvexValue::String(s) = t.query_with_identity("auth:getName", assert_obj!(), identity.clone()).await?); - assert_eq!(*s, UserIdentity::test().attributes.name.unwrap()); - must_let!(let ConvexValue::String(s) = t.query_with_identity("auth:getIdentifier", assert_obj!(), identity).await?); - assert_eq!(&*s, &*UserIdentity::test().attributes.token_identifier); - Ok(()) - }).await + let t = UdfTest::default(rt).await?; + // UDF with no identity should return `null` + must_let!(let ConvexValue::Null = t.query("auth:getName", assert_obj!()).await?); + // With an identity, it should return the user's name + let identity = Identity::user(UserIdentity::test()); + must_let!(let ConvexValue::String(s) = t.query_with_identity("auth:getName", assert_obj!(), identity.clone()).await?); + assert_eq!(*s, UserIdentity::test().attributes.name.unwrap()); + must_let!(let ConvexValue::String(s) = t.query_with_identity("auth:getIdentifier", assert_obj!(), identity).await?); + assert_eq!(&*s, &*UserIdentity::test().attributes.token_identifier); + Ok(()) } #[convex_macro::test_runtime] async fn test_auth_identity_for_acting_user(rt: TestRuntime) -> anyhow::Result<()> { - UdfTest::run_test_with_isolate2(rt, async move |t: UdfTestType| { - // UDF with no identity should return `null` - must_let!(let ConvexValue::Null = t.query("auth:getName", assert_obj!()).await?); - // With an identity, it should return the user's name - let identity = Identity::ActingUser( - AdminIdentity::new_for_test_only("chocolate-charlie-420".to_string(), MemberId(0)), - UserIdentityAttributes::test(), - ); - must_let!(let ConvexValue::String(s) = t.query_with_identity("auth:getName", assert_obj!(), identity.clone()).await?); - assert_eq!(*s, UserIdentityAttributes::test().name.unwrap()); - must_let!(let ConvexValue::String(s) = t.query_with_identity("auth:getIdentifier", assert_obj!(), identity).await?); - assert_eq!(&*s, &*UserIdentityAttributes::test().token_identifier); - Ok(()) - }).await + let t = UdfTest::default(rt).await?; + // UDF with no identity should return `null` + must_let!(let ConvexValue::Null = t.query("auth:getName", assert_obj!()).await?); + // With an identity, it should return the user's name + let identity = Identity::ActingUser( + AdminIdentity::new_for_test_only("chocolate-charlie-420".to_string(), MemberId(0)), + UserIdentityAttributes::test(), + ); + must_let!(let ConvexValue::String(s) = t.query_with_identity("auth:getName", assert_obj!(), identity.clone()).await?); + assert_eq!(*s, UserIdentityAttributes::test().name.unwrap()); + must_let!(let ConvexValue::String(s) = t.query_with_identity("auth:getIdentifier", assert_obj!(), identity).await?); + assert_eq!(&*s, &*UserIdentityAttributes::test().token_identifier); + Ok(()) } #[convex_macro::test_runtime] async fn test_auth_identity_for_admin(rt: TestRuntime) -> anyhow::Result<()> { - UdfTest::run_test_with_isolate2(rt, async move |t: UdfTestType| { - // UDF with no identity should return `null` - must_let!(let ConvexValue::Null = t.query("auth:getName", assert_obj!()).await?); - // With an identity, it should return the user's name - let identity = Identity::InstanceAdmin(AdminIdentity::new_for_test_only( - "bozotown".to_string(), - MemberId(77), - )); - must_let!(let ConvexValue::Null = t.query_with_identity("auth:getName", assert_obj!(), identity.clone()).await?); - Ok(()) - }).await + let t = UdfTest::default(rt).await?; + // UDF with no identity should return `null` + must_let!(let ConvexValue::Null = t.query("auth:getName", assert_obj!()).await?); + // With an identity, it should return the user's name + let identity = Identity::InstanceAdmin(AdminIdentity::new_for_test_only( + "bozotown".to_string(), + MemberId(77), + )); + must_let!(let ConvexValue::Null = t.query_with_identity("auth:getName", assert_obj!(), identity.clone()).await?); + Ok(()) } diff --git a/crates/pb/protos/outcome.proto b/crates/pb/protos/outcome.proto index fe71349a..dcf0c0f9 100644 --- a/crates/pb/protos/outcome.proto +++ b/crates/pb/protos/outcome.proto @@ -29,6 +29,8 @@ message UdfOutcome { common.FunctionResult result = 7; SyscallTrace syscall_trace = 8; + + optional bool observed_identity = 10; } message ActionOutcome {