Skip to content

Commit

Permalink
Observability improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiimk committed Sep 13, 2024
1 parent d3bb067 commit b394911
Show file tree
Hide file tree
Showing 17 changed files with 318 additions and 928 deletions.
209 changes: 108 additions & 101 deletions Cargo.lock

Large diffs are not rendered by default.

87 changes: 43 additions & 44 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
members = [
# Utils
"src/utils/graceful-shutdown",
"src/utils/observability",
"src/utils/repo-tools",
# Apps
"src/app/api-server",
Expand All @@ -14,52 +13,52 @@ resolver = "2"
[workspace.dependencies]
# Utils
graceful-shutdown = { path = "src/utils/graceful-shutdown", version = "0.36.0", default-features = false }
observability = { path = "src/utils/observability", version = "0.36.0", default-features = false }
# Utils (core)
container-runtime = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
database-common = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
database-common-macros = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
http-common = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
internal-error = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
messaging-outbox = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
random-names = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
time-source = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
container-runtime = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
database-common = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
database-common-macros = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
http-common = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
internal-error = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
messaging-outbox = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
observability = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
random-names = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
time-source = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
# Domain
opendatafabric = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
kamu-task-system = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
kamu-task-system-services = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
kamu-flow-system = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
kamu-flow-system-services = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
kamu-accounts = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
kamu-datasets = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
opendatafabric = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
kamu-task-system = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
kamu-task-system-services = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
kamu-flow-system = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
kamu-flow-system-services = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
kamu-accounts = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
kamu-datasets = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
# Infra
kamu = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
kamu-task-system-inmem = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
kamu-task-system-postgres = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
kamu-task-system-sqlite = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
kamu-flow-system-inmem = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
kamu-flow-system-postgres = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
kamu-flow-system-sqlite = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
kamu-adapter-graphql = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
kamu-adapter-http = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
kamu-adapter-oauth = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
kamu-adapter-odata = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
kamu-adapter-auth-oso = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
kamu-adapter-flight-sql = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
kamu-accounts-services = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
kamu-accounts-inmem = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
kamu-accounts-postgres = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
kamu-accounts-sqlite = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
kamu-datasets-services = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
kamu-datasets-inmem = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
kamu-datasets-postgres = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
kamu-datasets-sqlite = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
kamu-messaging-outbox-inmem = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
kamu-messaging-outbox-postgres = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
kamu-messaging-outbox-sqlite = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
kamu-auth-rebac-inmem = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
kamu-auth-rebac-services = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
kamu-auth-rebac-sqlite = { git = "https://github.com/kamu-data/kamu-cli", tag = "v0.199.2", version = "0.199.2", default-features = false }
kamu = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
kamu-task-system-inmem = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
kamu-task-system-postgres = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
kamu-task-system-sqlite = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
kamu-flow-system-inmem = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
kamu-flow-system-postgres = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
kamu-flow-system-sqlite = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
kamu-adapter-graphql = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
kamu-adapter-http = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
kamu-adapter-oauth = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
kamu-adapter-odata = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
kamu-adapter-auth-oso = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
kamu-adapter-flight-sql = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
kamu-accounts-services = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
kamu-accounts-inmem = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
kamu-accounts-postgres = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
kamu-accounts-sqlite = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
kamu-datasets-services = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
kamu-datasets-inmem = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
kamu-datasets-postgres = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
kamu-datasets-sqlite = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
kamu-messaging-outbox-inmem = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
kamu-messaging-outbox-postgres = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
kamu-messaging-outbox-sqlite = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
kamu-auth-rebac-inmem = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
kamu-auth-rebac-services = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }
kamu-auth-rebac-sqlite = { git = "https://github.com/kamu-data/kamu-cli", branch = "feature/integrate-metrics", version = "0.199.3", default-features = false }


[workspace.package]
Expand Down
20 changes: 18 additions & 2 deletions src/app/api-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ edition = { workspace = true }
publish = { workspace = true }


[lib]
doctest = false


[dependencies]
dill = "0.9"
container-runtime = { workspace = true }
Expand Down Expand Up @@ -78,7 +82,8 @@ tonic = { version = "0.11", default-features = false }
tower = { version = "0.4" }
tower-http = { version = "0.4", features = ["trace", "cors"] }

# Tracing / logging / telemetry
# Tracing / logging / telemetry / metrics
prometheus = { version = "0.13", default-features = false }
tracing = "0.1"
tracing-subscriber = { version = "0.3", default-features = false, features = [
"std",
Expand All @@ -91,7 +96,18 @@ tracing-log = "0.2"
# Utils
async-trait = { version = "0.1", default-features = false }
chrono = "0.4"
clap = "4"
clap = { version = "4", default-features = false, features = [
"std",
"color",
"help",
"usage",
"error-context",
"suggestions",
"derive",
"cargo",
# "env",
"wrap_help",
] }
datafusion = { version = "41", default-features = false, features = [
"crypto_expressions",
"encoding_expressions",
Expand Down
46 changes: 26 additions & 20 deletions src/app/api-server/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::config::{
ACCOUNT_KAMU,
};
use crate::{
cli,
configure_database_components,
configure_in_memory_components,
connect_database_initially,
Expand All @@ -41,7 +42,7 @@ const DEFAULT_RUST_LOG: &str = "debug,";

/////////////////////////////////////////////////////////////////////////////////////////

pub async fn run(matches: clap::ArgMatches, config: ApiServerConfig) -> Result<(), InternalError> {
pub async fn run(args: cli::Cli, config: ApiServerConfig) -> Result<(), InternalError> {
let repo_url = if let Some(repo_url) = config.repo.repo_url.as_ref().cloned() {
repo_url
} else {
Expand All @@ -55,7 +56,7 @@ pub async fn run(matches: clap::ArgMatches, config: ApiServerConfig) -> Result<(
Url::from_directory_path(workspace_dir.join("datasets")).unwrap()
};

let multi_tenant = matches.get_flag("multi-tenant") || repo_url.scheme() != "file";
let multi_tenant = args.multi_tenant || repo_url.scheme() != "file";

let local_dir = tempfile::tempdir().unwrap();

Expand Down Expand Up @@ -90,6 +91,9 @@ pub async fn run(matches: clap::ArgMatches, config: ApiServerConfig) -> Result<(
.bind::<dyn kamu::domain::DependencyGraphRepository, kamu::DependencyGraphRepositoryInMemory>()
.build();

// Register metrics
let metrics_registry = observability::metrics::register_all(&catalog);

// Database requires extra actions:
let final_catalog = if db_config.needs_database() {
// Connect database and obtain a connection pool
Expand All @@ -105,30 +109,29 @@ pub async fn run(matches: clap::ArgMatches, config: ApiServerConfig) -> Result<(

initialize_components(&final_catalog, server_account_subject.clone()).await;

match matches.subcommand() {
Some(("gql", sub)) => match sub.subcommand() {
Some(("schema", _)) => {
match args.command {
cli::Command::Gql(c) => match c.subcommand {
cli::Gql::Schema(_) => {
println!("{}", kamu_adapter_graphql::schema().sdl());
Ok(())
}
Some(("query", qsub)) => {
let result = crate::gql_server::gql_query(
qsub.get_one("query").map(String::as_str).unwrap(),
qsub.get_flag("full"),
final_catalog,
)
.await;
cli::Gql::Query(sc) => {
let result = crate::gql_server::gql_query(&sc.query, sc.full, final_catalog).await;
print!("{}", result);
Ok(())
}
_ => unimplemented!(),
},
Some(("run", sub)) => {
cli::Command::Metrics(_) => {
// TODO: Proper implementation is blocked by https://github.com/tikv/rust-prometheus/issues/526
let metric_families = metrics_registry.gather();
println!("{metric_families:#?}");
Ok(())
}
cli::Command::Run(c) => {
let shutdown_requested = graceful_shutdown::trap_signals();

let address = sub
.get_one::<std::net::IpAddr>("address")
.copied()
let address = c
.address
.unwrap_or(std::net::Ipv4Addr::new(127, 0, 0, 1).into());

// API servers are built from the regular catalog
Expand All @@ -137,14 +140,14 @@ pub async fn run(matches: clap::ArgMatches, config: ApiServerConfig) -> Result<(
// all processing in the user context.
let http_server = crate::http_server::build_server(
address,
sub.get_one("http-port").copied(),
c.http_port,
final_catalog.clone(),
multi_tenant,
);

let flightsql_server = crate::flightsql_server::FlightSqlServer::new(
address,
sub.get_one("flightsql-port").copied(),
c.flightsql_port,
final_catalog.clone(),
)
.await;
Expand Down Expand Up @@ -203,7 +206,6 @@ pub async fn run(matches: clap::ArgMatches, config: ApiServerConfig) -> Result<(
res = outbox_processor.run() => { res.int_err() },
}
}
_ => unimplemented!(),
}
}

Expand Down Expand Up @@ -293,6 +295,9 @@ pub async fn init_dependencies(

b.add::<time_source::SystemTimeSourceDefault>();

// TODO: Add Kubernetes labels
b.add_value(prometheus::Registry::new());

b.add::<container_runtime::ContainerRuntime>();
b.add_value(container_runtime::ContainerRuntimeConfig {
runtime: config.engine.runtime,
Expand Down Expand Up @@ -369,6 +374,7 @@ pub async fn init_dependencies(
b.add::<messaging_outbox::OutboxDispatchingImpl>();
b.bind::<dyn messaging_outbox::Outbox, messaging_outbox::OutboxDispatchingImpl>();
b.add::<messaging_outbox::OutboxTransactionalProcessor>();
b.add::<messaging_outbox::OutboxTransactionalProcessorMetrics>();

messaging_outbox::register_message_dispatcher::<kamu::domain::DatasetLifecycleMessage>(
&mut b,
Expand Down
Loading

0 comments on commit b394911

Please sign in to comment.