Skip to content

Commit

Permalink
[ENG-8173] share query cache for queries that don't check auth (#32821)
Browse files Browse the repository at this point in the history
Detect in the isolate whether `ctx.auth.getUserIdentity` is ever called. If not, store the query result with `identity: None` in the query cache.

When checking whether a cache entry matches, we first check for the precise identity, and then check if there is an entry with `identity: None`.

When updating a cache entry, we add a waiting entry for the precise identity, and when the query completes we remove the waiting entry and insert an entry based on whether the query read the identity.

NOTE the part i'm worried about is whether this screws with assumptions, since the waiting entry might not be at the same cache key as the eventual result. I think it's okay because if multiple requests are waiting, they already have to handle the executing request not populating the cache.

Added tests which confirm desired behavior. Open to suggestions for more tests.

GitOrigin-RevId: f6a2c9cf8607077d62b3e0a89057fff2f9615330
  • Loading branch information
ldanilek authored and Convex, Inc. committed Jan 10, 2025
1 parent c2461f6 commit 350accb
Show file tree
Hide file tree
Showing 11 changed files with 287 additions and 96 deletions.
174 changes: 129 additions & 45 deletions crates/application/src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,13 @@ impl InstanceId {
}
}

/// A cache key representing a specific query request, before it runs.
/// It may have more specific information than a `CacheKey`, because multiple
/// query requests may be served by the same cache entry.
/// e.g. if a query does not check `ctx.auth`, then `TargetCacheKey`
/// contains the identity, but `CacheKey` does not.
#[derive(Clone, Eq, PartialEq, Hash, Debug)]
pub struct CacheKey {
pub struct RequestedCacheKey {
instance: InstanceId,
path: PublicFunctionPath,
args: ConvexArray,
Expand All @@ -127,7 +132,76 @@ pub struct CacheKey {
allowed_visibility: AllowedVisibility,
}

impl CacheKey {
impl RequestedCacheKey {
// In order from most specific to least specific.
fn _possible_cache_keys(&self) -> Vec<StoredCacheKey> {
vec![
self.precise_cache_key(),
StoredCacheKey {
instance: self.instance,
path: self.path.clone(),
args: self.args.clone(),
// Include queries that did not read `ctx.auth`.
identity: None,
journal: self.journal.clone(),
allowed_visibility: self.allowed_visibility,
},
]
}

fn precise_cache_key(&self) -> StoredCacheKey {
StoredCacheKey {
instance: self.instance,
path: self.path.clone(),
args: self.args.clone(),
identity: Some(self.identity.clone()),
journal: self.journal.clone(),
allowed_visibility: self.allowed_visibility,
}
}

fn get_cache_entry<'a>(
&'a self,
cache: &'a mut LruCache<StoredCacheKey, CacheEntry>,
) -> (Option<&'a CacheEntry>, StoredCacheKey) {
for key in self._possible_cache_keys() {
if cache.contains(&key) {
return (Some(cache.get(&key).unwrap()), key);
}
}
(None, self.precise_cache_key())
}

fn cache_key_after_execution(&self, outcome: &UdfOutcome) -> StoredCacheKey {
let identity = if outcome.observed_identity {
Some(self.identity.clone())
} else {
None
};
StoredCacheKey {
instance: self.instance,
path: self.path.clone(),
args: self.args.clone(),
identity,
journal: outcome.journal.clone(),
allowed_visibility: self.allowed_visibility,
}
}
}

/// A cache key representing a persisted query result.
#[derive(Clone, Eq, PartialEq, Hash, Debug)]
pub struct StoredCacheKey {
instance: InstanceId,
path: PublicFunctionPath,
args: ConvexArray,
// None means that the query did not read `ctx.auth`.
identity: Option<IdentityCacheKey>,
journal: QueryJournal,
allowed_visibility: AllowedVisibility,
}

impl StoredCacheKey {
/// Approximate size in-memory of the CacheEntry structure, including stack
/// and heap allocated memory.
fn size(&self) -> usize {
Expand Down Expand Up @@ -253,7 +327,7 @@ impl<RT: Runtime> CacheManager<RT> {
) -> anyhow::Result<(QueryReturn, bool)> {
let start = self.rt.monotonic_now();
let identity_cache_key = identity.cache_key();
let key = CacheKey {
let requested_key = RequestedCacheKey {
instance: self.instance_id,
path,
args,
Expand All @@ -279,11 +353,16 @@ impl<RT: Runtime> CacheManager<RT> {

// Step 1: Decide what we're going to do this iteration: use a cached value,
// wait on someone else to run a UDF, or run the UDF ourselves.
let maybe_op =
self.cache
.plan_cache_op(&key, start, now, &identity, ts, context.clone());
let op: CacheOp = match maybe_op {
Some(op) => op,
let maybe_op = self.cache.plan_cache_op(
&requested_key,
start,
now,
&identity,
ts,
context.clone(),
);
let (op, stored_key) = match maybe_op {
Some(op_key) => op_key,
None => continue 'top,
};

Expand All @@ -296,7 +375,7 @@ impl<RT: Runtime> CacheManager<RT> {
_ => None,
};
let mut waiting_entry_guard =
WaitingEntryGuard::new(waiting_entry_id, &key, self.cache.clone());
WaitingEntryGuard::new(waiting_entry_id, &stored_key, self.cache.clone());

// Step 2: Perform our cache operation, potentially running the UDF.
let is_cache_hit = match op {
Expand All @@ -309,7 +388,7 @@ impl<RT: Runtime> CacheManager<RT> {
CacheOp::Go { .. } => false,
};
let (result, table_stats) = match self
.perform_cache_op(&key, op, usage_tracker.clone())
.perform_cache_op(&stored_key, op, usage_tracker.clone())
.await?
{
Some(r) => r,
Expand All @@ -319,7 +398,7 @@ impl<RT: Runtime> CacheManager<RT> {
// Step 3: Validate that the cache result we got is good enough. Is our desired
// timestamp in its validity interval? If it looked at system time, is it not
// too old?
let cache_result = match self.validate_cache_result(&key, ts, result).await? {
let cache_result = match self.validate_cache_result(&stored_key, ts, result).await? {
Some(r) => r,
None => continue 'top,
};
Expand All @@ -328,8 +407,10 @@ impl<RT: Runtime> CacheManager<RT> {
// bump the cache result's token. This method will discard the new value if the
// UDF failed or if a newer (i.e. higher `original_ts`) value is in the cache.
if cache_result.outcome.result.is_ok() {
let actual_stored_key =
requested_key.cache_key_after_execution(&cache_result.outcome);
// We do not cache JSErrors
waiting_entry_guard.complete(cache_result.clone());
waiting_entry_guard.complete(actual_stored_key, cache_result.clone());
} else {
drop(waiting_entry_guard);
}
Expand Down Expand Up @@ -364,7 +445,7 @@ impl<RT: Runtime> CacheManager<RT> {
#[minitrace::trace]
async fn perform_cache_op(
&self,
key: &CacheKey,
key: &StoredCacheKey,
op: CacheOp<'_>,
usage_tracker: FunctionUsageTracker,
) -> anyhow::Result<Option<(CacheResult, BTreeMap<TableName, TableStats>)>> {
Expand Down Expand Up @@ -503,7 +584,7 @@ impl<RT: Runtime> CacheManager<RT> {
#[minitrace::trace]
async fn validate_cache_result(
&self,
key: &CacheKey,
key: &StoredCacheKey,
ts: Timestamp,
mut result: CacheResult,
) -> anyhow::Result<Option<CacheResult>> {
Expand Down Expand Up @@ -562,12 +643,12 @@ impl<RT: Runtime> CacheManager<RT> {
// canceled.
struct WaitingEntryGuard<'a> {
entry_id: Option<u64>,
key: &'a CacheKey,
key: &'a StoredCacheKey,
cache: QueryCache,
}

impl<'a> WaitingEntryGuard<'a> {
fn new(entry_id: Option<u64>, key: &'a CacheKey, cache: QueryCache) -> Self {
fn new(entry_id: Option<u64>, key: &'a StoredCacheKey, cache: QueryCache) -> Self {
Self {
entry_id,
key,
Expand All @@ -576,11 +657,11 @@ impl<'a> WaitingEntryGuard<'a> {
}

// Marks the waiting entry as removed, so we don't have to remove it on Drop
fn complete(&mut self, result: CacheResult) {
self.cache.put_ready(self.key.clone(), result);
// We just performed put_ready so there is no need to perform remove_waiting
// on Drop.
self.entry_id.take();
fn complete(&mut self, actual_stored_key: StoredCacheKey, result: CacheResult) {
if let Some(entry_id) = self.entry_id.take() {
self.cache.remove_waiting(self.key, entry_id);
self.cache.put_ready(actual_stored_key, result);
}
}
}

Expand All @@ -594,7 +675,7 @@ impl Drop for WaitingEntryGuard<'_> {
}

struct Inner {
cache: LruCache<CacheKey, CacheEntry>,
cache: LruCache<StoredCacheKey, CacheEntry>,
size: usize,
size_limit: usize,

Expand All @@ -621,13 +702,13 @@ impl QueryCache {

fn plan_cache_op<'a>(
&self,
key: &'a CacheKey,
key: &'a RequestedCacheKey,
start: tokio::time::Instant,
now: tokio::time::Instant,
identity: &'a Identity,
ts: Timestamp,
context: ExecutionContext,
) -> Option<CacheOp<'a>> {
) -> Option<(CacheOp<'a>, StoredCacheKey)> {
let go = |sender: Option<(Sender<_>, u64)>| {
let (sender, waiting_entry_id) = match sender {
Some((sender, waiting_entry_id)) => (sender, Some(waiting_entry_id)),
Expand All @@ -646,22 +727,23 @@ impl QueryCache {
identity,
ts,
journal: &key.journal,
allowed_visibility: key.allowed_visibility.clone(),
allowed_visibility: key.allowed_visibility,
context,
}
};
let mut inner = self.inner.lock();
let op = match inner.cache.get(key) {
let (entry, stored_key) = key.get_cache_entry(&mut inner.cache);
let op = match entry {
Some(CacheEntry::Ready(r)) => {
if ts < r.original_ts {
// If another request has already executed this UDF at a
// newer timestamp, we can't use the cache. Re-execute
// in this case.
tracing::debug!("Cache value too new for {:?}", key);
tracing::debug!("Cache value too new for {:?}", stored_key);
log_plan_go(GoReason::PeerTimestampTooNew);
go(None)
} else {
tracing::debug!("Cache value ready for {:?}", key);
tracing::debug!("Cache value ready for {:?}", stored_key);
log_plan_ready();
CacheOp::Ready { result: r.clone() }
}
Expand All @@ -675,7 +757,7 @@ impl QueryCache {
let entry_id = *id;
if *peer_ts > ts {
log_plan_go(GoReason::PeerTimestampTooNew);
return Some(go(None));
return Some((go(None), stored_key));
}
// We don't serialize sampling `now` under the cache lock, and since it can
// occur on different threads, we're not guaranteed that
Expand All @@ -688,13 +770,13 @@ impl QueryCache {
"Peer timed out ({:?}), removing cache entry and retrying",
peer_elapsed
);
inner.remove_waiting(key, entry_id);
inner.remove_waiting(&stored_key, entry_id);
log_plan_peer_timeout();
return None;
}
let get_elapsed = now - start;
let remaining = *TOTAL_QUERY_TIMEOUT - cmp::max(peer_elapsed, get_elapsed);
tracing::debug!("Waiting for peer to compute {:?}", key);
tracing::debug!("Waiting for peer to compute {:?}", stored_key);
log_plan_wait();
CacheOp::Wait {
waiting_entry_id: *id,
Expand All @@ -703,32 +785,32 @@ impl QueryCache {
}
},
None => {
tracing::debug!("No cache value for {:?}, executing UDF...", key);
let (sender, executor_id) = inner.put_waiting(key.clone(), now, ts);
tracing::debug!("No cache value for {:?}, executing UDF...", stored_key);
let (sender, executor_id) = inner.put_waiting(stored_key.clone(), now, ts);
log_plan_go(GoReason::NoCacheResult);
go(Some((sender, executor_id)))
},
};
Some(op)
Some((op, stored_key))
}

fn remove_waiting(&self, key: &CacheKey, entry_id: u64) {
fn remove_waiting(&self, key: &StoredCacheKey, entry_id: u64) {
self.inner.lock().remove_waiting(key, entry_id)
}

fn remove_ready(&self, key: &CacheKey, original_ts: Timestamp) {
fn remove_ready(&self, key: &StoredCacheKey, original_ts: Timestamp) {
self.inner.lock().remove_ready(key, original_ts)
}

fn put_ready(&self, key: CacheKey, result: CacheResult) {
fn put_ready(&self, key: StoredCacheKey, result: CacheResult) {
self.inner.lock().put_ready(key, result)
}
}

impl Inner {
// Remove only a `CacheEntry::Ready` from the cache, predicated on its
// `executor_id` matching.
fn remove_waiting(&mut self, key: &CacheKey, entry_id: u64) {
fn remove_waiting(&mut self, key: &StoredCacheKey, entry_id: u64) {
match self.cache.get(key) {
Some(CacheEntry::Waiting { id, .. }) if *id == entry_id => {
let (actual_key, entry) = self.cache.pop_entry(key).unwrap();
Expand All @@ -741,7 +823,7 @@ impl Inner {

// Remove only a `CacheEntry::Ready` from the cache, predicated on its
// `original_ts` matching.
fn remove_ready(&mut self, key: &CacheKey, original_ts: Timestamp) {
fn remove_ready(&mut self, key: &StoredCacheKey, original_ts: Timestamp) {
match self.cache.get(key) {
Some(CacheEntry::Ready(ref result)) if result.original_ts == original_ts => {
let (actual_key, entry) = self.cache.pop_entry(key).unwrap();
Expand All @@ -754,7 +836,7 @@ impl Inner {

fn put_waiting(
&mut self,
key: CacheKey,
key: StoredCacheKey,
now: tokio::time::Instant,
ts: Timestamp,
) -> (Sender<CacheResult>, u64) {
Expand Down Expand Up @@ -786,7 +868,7 @@ impl Inner {

// Put a `CacheEntry::Ready` into the cache, potentially dropping it if there's
// already a value with a higher `original_ts`.
fn put_ready(&mut self, key: CacheKey, result: CacheResult) {
fn put_ready(&mut self, key: StoredCacheKey, result: CacheResult) {
match self.cache.get_mut(&key) {
Some(entry @ CacheEntry::Waiting { .. }) => {
let new_entry = CacheEntry::Ready(result);
Expand Down Expand Up @@ -899,23 +981,23 @@ mod tests {
};

use super::{
CacheKey,
CacheResult,
InstanceId,
QueryCache,
StoredCacheKey,
};

// Construct a cache key where as many fields as possible have extra capacity in
// them
fn make_cache_key() -> CacheKey {
fn make_cache_key() -> StoredCacheKey {
macro_rules! with_extra_capacity {
($e:expr) => {{
let mut r = $e;
r.reserve(100);
r
}};
}
CacheKey {
StoredCacheKey {
instance: InstanceId(0),
path: PublicFunctionPath::RootExport(ExportPath::from(CanonicalizedUdfPath::new(
CanonicalizedModulePath::new(
Expand All @@ -929,7 +1011,9 @@ mod tests {
))),
args: ConvexArray::try_from(with_extra_capacity!(vec![ConvexValue::from(100.)]))
.unwrap(),
identity: IdentityCacheKey::InstanceAdmin(with_extra_capacity!("admin".to_string())),
identity: Some(IdentityCacheKey::InstanceAdmin(with_extra_capacity!(
"admin".to_string()
))),
journal: QueryJournal {
end_cursor: Some(Cursor {
position: CursorPosition::After(IndexKeyBytes(with_extra_capacity!(
Expand Down
Loading

0 comments on commit 350accb

Please sign in to comment.