Skip to content

Commit

Permalink
feat(server): API and Event processing improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
seriousben committed Jan 11, 2025
1 parent 06d5af4 commit 031a3b7
Show file tree
Hide file tree
Showing 31 changed files with 2,580 additions and 1,776 deletions.
62 changes: 46 additions & 16 deletions server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

100 changes: 55 additions & 45 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ members = [
"indexify_ui",
"metrics",
"state_store",
"task_scheduler",
"processor",
"utils",
]

Expand All @@ -24,33 +24,37 @@ serde = { version = "1.0.217", features = ["derive"] }
anyhow = "1.0.95"
serde_json = "1.0.134"
# https://github.com/rust-rocksdb/rust-rocksdb/issues/881
rocksdb = { git = "https://github.com/rust-rocksdb/rust-rocksdb", rev = "87b6b2df89c1fafcfb53129f8c3304d636a94f2e", features=["multi-threaded-cf"]}
rocksdb = { git = "https://github.com/rust-rocksdb/rust-rocksdb", rev = "87b6b2df89c1fafcfb53129f8c3304d636a94f2e", features = [
"multi-threaded-cf",
] }
data_model = { path = "data_model" }
blob_store = { path = "blob_store" }
indexify_utils = { path = "utils" }
indexify_ui = {path = "indexify_ui"}
metrics = {path = "metrics"}
indexify_ui = { path = "indexify_ui" }
metrics = { path = "metrics" }
hyper = "1.5.2"
state_store = { path = "state_store" }
strum = { version = "0.26.3", features = ["derive"] }
tracing = "0.1.41"
rand = "0.8.5"
tokio = { version = "1.42.0", features = ["full"] }
once_cell = "1.20.2"
itertools = "0.14.0"
serde_yml = "0.0.12"
figment = {version="0.10.19",features=["yaml"]}
axum = {version = "0.7.9", features = ["multipart", "macros", "tokio"]}
figment = { version = "0.10.19", features = ["yaml"] }
axum = { version = "0.7.9", features = ["multipart", "macros", "tokio"] }
axum-server = "0.7.1"
tempfile = "3.14.0"
utoipa = { version = "5.3.0", features = ["axum_extras"] }
utoipa-swagger-ui = { version = "8.1.0", features = ["axum"] }
object_store = {version= "0.11.2", features = ["aws"]}
object_store = { version = "0.11.2", features = ["aws"] }
futures = "0.3.31"
bytes = "1.9.0"
pin-project-lite = "0.2.15"
async-trait = "0.1.83"
tokio-stream = "0.1.17"
slatedb = {git = "https://github.com/diptanu/slatedb"}
rust-embed = {version="8.5.0", features=["mime-guess"]}
slatedb = { git = "https://github.com/diptanu/slatedb" }
rust-embed = { version = "8.5.0", features = ["mime-guess"] }
reqwest = { version = "0.12.11", default-features = false, features = [
"json",
"rustls-tls",
Expand All @@ -66,67 +70,73 @@ pin-project = "1.1.7"
ciborium = "0.2.2"
uuid = { version = "1.11.0", features = ["v4"] }
url = "2.5.4"
opentelemetry = { version="0.24", features = ["metrics", "trace"] }
opentelemetry_sdk = { version = "0.24.1", features = ["rt-tokio", "metrics", "trace"] }
opentelemetry-otlp = { version = "0.17", features = ["tokio", "metrics", "trace"] }
opentelemetry = { version = "0.24", features = ["metrics", "trace"] }
opentelemetry_sdk = { version = "0.24.1", features = [
"rt-tokio",
"metrics",
"trace",
] }
opentelemetry-otlp = { version = "0.17", features = [
"tokio",
"metrics",
"trace",
] }
opentelemetry-prometheus = { version = "0.17" }
prometheus = {version = "0.13.4"}
prometheus = { version = "0.13.4" }
axum-otel-metrics = { version = "0.9.0-alpha.2" }
tracing-opentelemetry = "0.25"
dashmap = "6.1.0"

[dependencies]
async-stream = {workspace = true}
async-stream = { workspace = true }
data_model = { path = "data_model" }
state_store = { path = "state_store" }
task_scheduler = { path = "task_scheduler" }
processor = { path = "processor" }
blob_store = { path = "blob_store" }
serde={workspace = true}
serde_json={workspace = true}
anyhow = {workspace=true}
figment = {workspace=true}
serde = { workspace = true }
serde_json = { workspace = true }
anyhow = { workspace = true }
figment = { workspace = true }
clap = { version = "4.5.23", features = ["derive"] }
tracing ={workspace=true}
axum ={workspace=true}
tokio = {workspace=true}
axum-server={workspace=true}
tracing = { workspace = true }
axum = { workspace = true }
tokio = { workspace = true }
axum-server = { workspace = true }
futures = "0.3.30"
tracing-subscriber = { version = "0.3.19", features = ["env-filter", "json" ] }
tracing-subscriber = { version = "0.3.19", features = ["env-filter", "json"] }
utoipa = { workspace = true }
utoipa-swagger-ui = { workspace = true }
sha2={workspace=true}
nanoid={workspace=true}
sha2 = { workspace = true }
nanoid = { workspace = true }
object_store.workspace = true
uuid = {workspace=true}
indexify_utils = {workspace=true}
uuid = { workspace = true }
indexify_utils = { workspace = true }
tower-http = { workspace = true }
bytes.workspace = true
ciborium.workspace = true
rand.workspace = true
hex = "0.4.3"
indexify_ui = {workspace=true}
hyper = {workspace=true}
url = {workspace=true}
opentelemetry = {workspace=true}
opentelemetry-prometheus = {workspace=true}
indexify_ui = { workspace = true }
hyper = { workspace = true }
url = { workspace = true }
opentelemetry = { workspace = true }
opentelemetry-prometheus = { workspace = true }
opentelemetry_sdk.workspace = true
prometheus = {workspace=true}
axum-otel-metrics = { workspace=true }
metrics = {workspace=true}
tracing-opentelemetry = {workspace=true}
opentelemetry-otlp = {workspace=true}
axum-tracing-opentelemetry = { version = "0.19.0", features = ["tracing_level_info"] }
prometheus = { workspace = true }
axum-otel-metrics = { workspace = true }
metrics = { workspace = true }
tracing-opentelemetry = { workspace = true }
opentelemetry-otlp = { workspace = true }
axum-tracing-opentelemetry = { version = "0.19.0", features = [
"tracing_level_info",
] }

[dev-dependencies]
tempfile = { workspace = true }

[build-dependencies]
# All features enabled
vergen = { version = "9.0.2", features = [
"build",
"cargo",
"rustc",
"si",
] }
vergen = { version = "9.0.2", features = ["build", "cargo", "rustc", "si"] }

[package.metadata.deb]
maintainer = "Diptanu Gon Choudhury <[email protected]>"
Expand Down
10 changes: 5 additions & 5 deletions server/data_model/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ derive_builder = "0.20.1"
anyhow = { workspace = true }
serde_json = { workspace = true }
indexify_utils = { workspace = true }
rand = {workspace=true}
uuid = {workspace=true}
sha2 = {workspace=true}
strum = {workspace=true}
tracing = { workspace = true }
rand = { workspace = true }
uuid = { workspace = true }
sha2 = { workspace = true }
strum = { workspace = true }
tracing = { workspace = true }
54 changes: 47 additions & 7 deletions server/data_model/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{
fmt::{self, Display},
hash::{DefaultHasher, Hash, Hasher},
time::{SystemTime, UNIX_EPOCH},
vec,
};

use anyhow::{anyhow, Result};
Expand Down Expand Up @@ -898,14 +899,14 @@ impl ExecutorMetadata {
}
}

#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq, Hash)]
pub struct InvokeComputeGraphEvent {
pub invocation_id: String,
pub namespace: String,
pub compute_graph: String,
}

#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq, Hash)]
pub struct TaskFinishedEvent {
pub namespace: String,
pub compute_graph: String,
Expand Down Expand Up @@ -958,13 +959,9 @@ impl StateChangeId {
}

/// Return key to store in k/v db
pub fn to_key(&self) -> [u8; 8] {
fn to_key(self) -> [u8; 8] {
self.0.to_be_bytes()
}

pub fn from_key(key: [u8; 8]) -> Self {
Self(u64::from_be_bytes(key))
}
}

impl From<StateChangeId> for u64 {
Expand All @@ -979,6 +976,41 @@ impl Display for StateChangeId {
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ProcessorId(ProcessorType);
impl ProcessorId {
pub fn new(processor_type: ProcessorType) -> Self {
Self(processor_type)
}

pub fn key_prefix(&self) -> String {
self.0.key_prefix().to_string()
}
}

impl Display for ProcessorId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0.as_ref())
}
}

#[derive(Debug, AsRefStr, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum ProcessorType {
Namespace,
TaskAllocator,
// SystemTask, // TODO: move replay logic in namespace processor
// GarbageCollector, // TODO: move GC as a system task
}

impl ProcessorType {
pub fn key_prefix(&self) -> &'static str {
match self {
ProcessorType::Namespace => "ns",
ProcessorType::TaskAllocator => "ta",
}
}
}

#[derive(Clone, Serialize, Deserialize, Debug, Builder)]
pub struct StateChange {
pub id: StateChangeId,
Expand All @@ -987,6 +1019,14 @@ pub struct StateChange {
pub created_at: u64,
pub processed_at: Option<u64>,
}
impl StateChange {
pub fn key(&self, processor_id: &ProcessorId) -> Vec<u8> {
let mut key = processor_id.key_prefix().as_bytes().to_vec();
key.extend("|".as_bytes());
key.extend_from_slice(&self.id.to_key());
key
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Namespace {
Expand Down
Loading

0 comments on commit 031a3b7

Please sign in to comment.