From af11bb91e1f182484202712ebbd7136dc9dcd63a Mon Sep 17 00:00:00 2001 From: Benjamin Boudreau Date: Thu, 9 Jan 2025 15:35:10 -0500 Subject: [PATCH] fix tests --- server/data_model/src/lib.rs | 2 +- server/processor/src/namespace.rs | 61 ++++++++++++++----------- server/processor/src/task_allocator.rs | 5 +- server/src/integration_test.rs | 34 +++++++++----- server/src/testing.rs | 2 +- server/state_store/src/state_machine.rs | 3 +- 6 files changed, 65 insertions(+), 42 deletions(-) diff --git a/server/data_model/src/lib.rs b/server/data_model/src/lib.rs index 11c9a4840..a42a34d6e 100644 --- a/server/data_model/src/lib.rs +++ b/server/data_model/src/lib.rs @@ -984,7 +984,7 @@ impl StateChangeId { } /// Return key to store in k/v db - pub fn to_key(&self) -> [u8; 8] { + fn to_key(&self) -> [u8; 8] { self.0.to_be_bytes() } } diff --git a/server/processor/src/namespace.rs b/server/processor/src/namespace.rs index 8264af627..6a553ba15 100644 --- a/server/processor/src/namespace.rs +++ b/server/processor/src/namespace.rs @@ -82,7 +82,10 @@ impl ProcessorLogic for NamespaceProcessor { "running namespace processor, requests_len={}", requests.len() ); - let timer_kvs = &[KeyValue::new("processor", "namespace")]; + let timer_kvs = &[KeyValue::new( + "processor", + ProcessorType::Namespace.as_ref().to_string(), + )]; let _timer = Timer::start_with_labels(&self.metrics.processors_duration, timer_kvs); for request in requests { @@ -98,17 +101,19 @@ impl ProcessorLogic for NamespaceProcessor { }; } - let mut create_task_requests = vec![]; - let mut processed_state_changes = vec![]; - let mut new_reduction_tasks = vec![]; - let mut processed_reduction_tasks = vec![]; - let state_changes = self .indexify_state .reader() .get_unprocessed_state_changes(ProcessorId::new(ProcessorType::Namespace))?; for state_change in &state_changes { + let mut create_task_requests = vec![]; + let mut processed_state_changes = vec![]; + let mut new_reduction_tasks = vec![]; + let mut processed_reduction_tasks = vec![]; + + trace!("processing state change: {:?}", state_change); + match self.process_state_change(state_change).await { Ok(result) => { processed_state_changes.push(state_change.clone()); @@ -130,29 +135,31 @@ impl ProcessorLogic for NamespaceProcessor { continue; } } - } - // Do not write an update request if there are no state changes to mark as - // processed since we did no work. - if processed_state_changes.is_empty() { - return Ok(()); - } + // Do not write an update request if there are no state changes to mark as + // processed since we did no work. + if processed_state_changes.is_empty() { + return Ok(()); + } - let scheduler_update_request = StateMachineUpdateRequest { - payload: RequestPayload::NamespaceProcessorUpdate(NamespaceProcessorUpdateRequest { - task_requests: create_task_requests, - reduction_tasks: ReductionTasks { - new_reduction_tasks, - processed_reduction_tasks, - }, - }), - process_state_change: Some(ProcessedStateChange { - processor_id: ProcessorId::new(ProcessorType::Namespace), - state_changes: processed_state_changes, - }), - }; - if let Err(err) = self.indexify_state.write(scheduler_update_request).await { - error!("error writing namespace update request: {:?}", err); + let scheduler_update_request = StateMachineUpdateRequest { + payload: RequestPayload::NamespaceProcessorUpdate( + NamespaceProcessorUpdateRequest { + task_requests: create_task_requests, + reduction_tasks: ReductionTasks { + new_reduction_tasks, + processed_reduction_tasks, + }, + }, + ), + process_state_change: Some(ProcessedStateChange { + state_changes: processed_state_changes, + processor_id: ProcessorId::new(ProcessorType::Namespace), + }), + }; + if let Err(err) = self.indexify_state.write(scheduler_update_request).await { + error!("error writing namespace update request: {:?}", err); + } } Ok(()) diff --git a/server/processor/src/task_allocator.rs b/server/processor/src/task_allocator.rs index 7c6d70eee..1813842ec 100644 --- a/server/processor/src/task_allocator.rs +++ b/server/processor/src/task_allocator.rs @@ -72,7 +72,10 @@ impl ProcessorLogic for TaskAllocationProcessor { requests.len() ); - let timer_kvs = &[KeyValue::new("processor", "task_allocator")]; + let timer_kvs = &[KeyValue::new( + "processor", + ProcessorType::TaskAllocator.as_ref().to_string(), + )]; let _timer = Timer::start_with_labels(&self.metrics.processors_duration, timer_kvs); for request in requests { diff --git a/server/src/integration_test.rs b/server/src/integration_test.rs index d587ec35c..85c3ba652 100644 --- a/server/src/integration_test.rs +++ b/server/src/integration_test.rs @@ -25,6 +25,8 @@ mod tests { InvocationPayloadBuilder, InvokeComputeGraphRequest, Node, + ProcessorId, + ProcessorType, RegisterExecutorRequest, RequestPayload, RuntimeInformation, @@ -39,6 +41,7 @@ mod tests { state_machine::IndexifyObjectsColumns, test_state_store, }; + use tracing::error; use crate::{service::Service, testing}; @@ -346,7 +349,9 @@ mod tests { let Service { indexify_state, .. } = test_srv.service.clone(); let invocation_id = test_state_store::with_router_graph(&indexify_state).await; + test_srv.process_all().await?; + let tasks = indexify_state .reader() .list_tasks_by_compute_graph(TEST_NAMESPACE, "graph_B", &invocation_id, None, None)? @@ -362,7 +367,9 @@ mod tests { ) .await .unwrap(); + test_srv.process_all().await?; + let tasks = indexify_state .reader() .list_tasks_by_compute_graph(TEST_NAMESPACE, "graph_B", &invocation_id, None, None) @@ -373,8 +380,14 @@ mod tests { .reader() .get_unprocessed_state_changes_all_processors() .unwrap(); - // has task crated state change in it. - assert_eq!(unprocessed_state_changes.len(), 1); + + // has task created state change in it. + assert_eq!( + unprocessed_state_changes.len(), + 0, + "{:?}", + unprocessed_state_changes + ); // Now finish the router task and we should have 3 tasks // The last one would be for the edge which the router picks @@ -386,20 +399,19 @@ mod tests { ) .await .unwrap(); + test_srv.process_all().await?; + let tasks = indexify_state .reader() .list_tasks_by_compute_graph(TEST_NAMESPACE, "graph_B", &invocation_id, None, None) .unwrap() .0; - assert_eq!(tasks.len(), 3); + assert_eq!(tasks.len(), 3, "tasks: {:?}", tasks); Ok(()) } - // TODO write edge case test case when all fn_map finish state changes are - // handled in the same runloop of the executor! - // test a simple reducer graph // // Tasks: @@ -521,9 +533,9 @@ mod tests { .collect(); assert_eq!( - pending_tasks.len(), expected_num, - "pending tasks: {:?}", + pending_tasks.len(), + "pending tasks: {:#?}", pending_tasks ); pending_tasks.iter().for_each(|t| { @@ -634,7 +646,7 @@ mod tests { .invocation_ctx(&graph.namespace, &graph.name, &invocation_payload.id)? .unwrap(); assert_eq!(graph_ctx.outstanding_tasks, 0); - assert_eq!(graph_ctx.completed, true); + assert!(graph_ctx.completed); } Ok(()) @@ -1479,7 +1491,7 @@ mod tests { for state_change in all_unprocessed_state_changes_reduce.clone() { indexify_state.db.delete_cf( &IndexifyObjectsColumns::UnprocessedStateChanges.cf_db(&indexify_state.db), - &state_change.id.to_key(), + state_change.key(&ProcessorId::new(ProcessorType::Namespace)), )?; } @@ -1546,7 +1558,7 @@ mod tests { let serialized_state_change = JsonEncoder::encode(&state_change)?; indexify_state.db.put_cf( &IndexifyObjectsColumns::UnprocessedStateChanges.cf_db(&indexify_state.db), - &state_change.id.to_key(), + state_change.key(&ProcessorId::new(ProcessorType::Namespace)), serialized_state_change, )?; } diff --git a/server/src/testing.rs b/server/src/testing.rs index 337b1751f..a63f6318e 100644 --- a/server/src/testing.rs +++ b/server/src/testing.rs @@ -12,7 +12,7 @@ pub struct TestService { impl TestService { pub async fn new() -> Result { let env_filter = tracing_subscriber::EnvFilter::try_from_default_env() - .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")); + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("trace")); let _ = subscriber::set_global_default( tracing_subscriber::registry() .with(tracing_subscriber::fmt::layer().with_filter(env_filter)), diff --git a/server/state_store/src/state_machine.rs b/server/state_store/src/state_machine.rs index c2d917421..bc927e98c 100644 --- a/server/state_store/src/state_machine.rs +++ b/server/state_store/src/state_machine.rs @@ -46,7 +46,7 @@ use rocksdb::{ TransactionDB, }; use strum::AsRefStr; -use tracing::{debug, error, info, instrument}; +use tracing::{debug, error, info, instrument, trace}; use super::serializer::{JsonEncode, JsonEncoder}; use crate::StateChangeDispatcher; @@ -943,6 +943,7 @@ pub(crate) fn mark_state_changes_processed( process: &ProcessedStateChange, ) -> Result<()> { for state_change in process.state_changes.iter() { + trace!("Marking state change processed: {:?}", state_change); let key = &state_change.key(&process.processor_id); txn.delete_cf( &IndexifyObjectsColumns::UnprocessedStateChanges.cf_db(&db),