From 1aa74d860bb01324152afc2ceaea414aa852c3ee Mon Sep 17 00:00:00 2001 From: Evaline Ju <69598118+evaline-ju@users.noreply.github.com> Date: Wed, 18 Dec 2024 14:31:03 -0800 Subject: [PATCH] :sparkles: Client metric trace layers (#260) * Introduce tower service into clients Signed-off-by: Paul Scoropan <1paulscoropan@gmail.com> Co-authored-by: Evaline Ju <69598118+evaline-ju@users.noreply.github.com> * :arrow_up: Update tower dependencies for tokio runtimes Signed-off-by: Evaline Ju <69598118+evaline-ju@users.noreply.github.com> * :heavy_minus_sign: Remove tower layer Signed-off-by: Evaline Ju <69598118+evaline-ju@users.noreply.github.com> * :sparkles::construction: Grpc trace layer Signed-off-by: Paul Scoropan <1paulscoropan@gmail.com> Co-authored-by: Evaline Ju <69598118+evaline-ju@users.noreply.github.com>" * :sparkles: Http trace layer Signed-off-by: Paul Scoropan <1paulscoropan@gmail.com> Co-authored-by: Evaline Ju <69598118+evaline-ju@users.noreply.github.com> * :truck: Update client app error metric name Signed-off-by: Evaline Ju <69598118+evaline-ju@users.noreply.github.com> * :wrench: Use histogram Signed-off-by: Evaline Ju <69598118+evaline-ju@users.noreply.github.com> * :bug: Cast durations Signed-off-by: Evaline Ju <69598118+evaline-ju@users.noreply.github.com> * :recycle::fire: Refactor health and remove client side mock tests Signed-off-by: Evaline Ju <69598118+evaline-ju@users.noreply.github.com> * :recycle::loud_sound: Update trace messages Signed-off-by: Evaline Ju <69598118+evaline-ju@users.noreply.github.com> * :recycle::loud_sound: Update trace error messages Signed-off-by: Evaline Ju <69598118+evaline-ju@users.noreply.github.com> * :recycle: Consolidate logs Signed-off-by: Evaline Ju <69598118+evaline-ju@users.noreply.github.com> * :recycle: Use display Signed-off-by: Evaline Ju <69598118+evaline-ju@users.noreply.github.com> --------- Signed-off-by: Evaline Ju <69598118+evaline-ju@users.noreply.github.com> Co-authored-by: Paul Scoropan <1paulscoropan@gmail.com> --- src/clients.rs | 344 ++++++++++++++++++++++------------------- src/clients/chunker.rs | 6 +- src/clients/http.rs | 204 +++++++++++++++++++++--- src/clients/nlp.rs | 6 +- src/clients/tgis.rs | 4 +- src/utils/trace.rs | 20 ++- 6 files changed, 391 insertions(+), 193 deletions(-) diff --git a/src/clients.rs b/src/clients.rs index c191c7fb..907deb7f 100644 --- a/src/clients.rs +++ b/src/clients.rs @@ -19,6 +19,8 @@ use std::{ any::TypeId, collections::{hash_map, HashMap}, + fmt::Debug, + ops::{Deref, DerefMut}, pin::Pin, time::Duration, }; @@ -27,26 +29,38 @@ use async_trait::async_trait; use axum::http::{Extensions, HeaderMap}; use futures::Stream; use ginepro::LoadBalancedChannel; +use http_body_util::combinators::BoxBody; +use hyper::{ + body::{Bytes, Incoming}, + Response, +}; use hyper_timeout::TimeoutConnector; use hyper_util::rt::TokioExecutor; use tonic::{metadata::MetadataMap, Request}; use tower::timeout::TimeoutLayer; use tower::ServiceBuilder; -use tracing::{debug, instrument, Span}; +use tower_http::{ + classify::{GrpcErrorsAsFailures, GrpcFailureClass, SharedClassifier}, + trace::{ + DefaultOnBodyChunk, GrpcMakeClassifier, MakeSpan, OnEos, OnFailure, OnRequest, OnResponse, + Trace, TraceLayer, + }, +}; +use tracing::{debug, error, info, info_span, instrument, Span}; use tracing_opentelemetry::OpenTelemetrySpanExt; use url::Url; use crate::{ config::{ServiceConfig, Tls}, health::HealthCheckResult, - utils::{tls, trace::with_traceparent_header}, + utils::{tls, trace::current_trace_id, trace::with_traceparent_header}, }; pub mod errors; -pub use errors::Error; +pub use errors::{grpc_to_http_code, Error}; pub mod http; -pub use http::HttpClient; +pub use http::{http_trace_layer, HttpClient}; pub mod chunker; @@ -247,17 +261,18 @@ pub async fn create_http_client( let client = hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build(timeout_conn); let client = ServiceBuilder::new() + .layer(http_trace_layer()) .layer(TimeoutLayer::new(request_timeout)) .service(client); Ok(HttpClient::new(base_url, client)) } #[instrument(skip_all, fields(hostname = service_config.hostname))] -pub async fn create_grpc_client( +pub async fn create_grpc_client( default_port: u16, service_config: &ServiceConfig, new: fn(LoadBalancedChannel) -> C, -) -> C { +) -> GrpcClient { let port = service_config.port.unwrap_or(default_port); let protocol = match service_config.tls { Some(_) => "https", @@ -312,9 +327,12 @@ pub async fn create_grpc_client( .await .unwrap_or_else(|error| panic!("error creating grpc client: {error}")); + let client = new(channel); // Adds tower::Service wrapper to allow for enable middleware layers to be added - let channel = ServiceBuilder::new().service(channel); - new(channel) + let channel = ServiceBuilder::new() + .layer(grpc_trace_layer()) + .service(client); + GrpcClient(channel) } /// Returns `true` if hostname is valid according to [IETF RFC 1123](https://tools.ietf.org/html/rfc1123). @@ -352,37 +370,179 @@ fn grpc_request_with_headers(request: T, headers: HeaderMap) -> Request { Request::from_parts(metadata, Extensions::new(), request) } +pub type GrpcServiceRequest = hyper::Request; + +#[derive(Debug, Clone)] +pub struct GrpcClient( + Trace< + C, + SharedClassifier, + ClientMakeSpan, + ClientOnRequest, + ClientOnResponse, + DefaultOnBodyChunk, + ClientOnEos, + ClientOnFailure, + >, +); + +impl Deref for GrpcClient { + type Target = C; + + fn deref(&self) -> &Self::Target { + self.0.get_ref() + } +} + +impl DerefMut for GrpcClient { + fn deref_mut(&mut self) -> &mut Self::Target { + self.0.get_mut() + } +} + +pub type GrpcClientTraceLayer = TraceLayer< + GrpcMakeClassifier, + ClientMakeSpan, + ClientOnRequest, + ClientOnResponse, + DefaultOnBodyChunk, // no metrics currently per body chunk + ClientOnEos, + ClientOnFailure, +>; + +pub fn grpc_trace_layer() -> GrpcClientTraceLayer { + TraceLayer::new_for_grpc() + .make_span_with(ClientMakeSpan) + .on_request(ClientOnRequest) + .on_response(ClientOnResponse) + .on_failure(ClientOnFailure) + .on_eos(ClientOnEos) +} + +#[derive(Debug, Clone)] +pub struct ClientMakeSpan; + +impl MakeSpan> for ClientMakeSpan { + fn make_span(&mut self, request: &hyper::Request>) -> Span { + info_span!( + "client gRPC request", + request_method = request.method().to_string(), + request_path = request.uri().path().to_string(), + ) + } +} + +#[derive(Debug, Clone)] +pub struct ClientOnRequest; + +impl OnRequest> for ClientOnRequest { + fn on_request(&mut self, request: &hyper::Request>, span: &Span) { + let _guard = span.enter(); + info!( + trace_id = %current_trace_id(), + method = %request.method(), + path = %request.uri().path(), + monotonic_counter.incoming_request_count = 1, + "started processing request", + ); + } +} + +#[derive(Debug, Clone)] +pub struct ClientOnResponse; + +impl OnResponse for ClientOnResponse { + fn on_response(self, response: &Response, latency: Duration, span: &Span) { + let _guard = span.enter(); + // On every response + info!( + trace_id = %current_trace_id(), + status = %response.status(), + duration_ms = %latency.as_millis(), + monotonic_counter.client_response_count = 1, + histogram.client_request_duration = latency.as_millis() as u64, + "finished processing request", + ); + + if response.status().is_server_error() { + // On every server error (HTTP 5xx) response + info!(monotonic_counter.client_5xx_response_count = 1); + } else if response.status().is_client_error() { + // On every client error (HTTP 4xx) response + info!(monotonic_counter.client_4xx_response_count = 1); + } else if response.status().is_success() { + // On every successful (HTTP 2xx) response + info!(monotonic_counter.client_success_response_count = 1); + } else { + error!( + "unexpected gRPC client response status code: {}", + response.status().as_u16() + ); + } + } +} + +#[derive(Debug, Clone)] +pub struct ClientOnFailure; + +impl OnFailure for ClientOnFailure { + fn on_failure( + &mut self, + failure_classification: GrpcFailureClass, + latency: Duration, + span: &Span, + ) { + let _guard = span.enter(); + let trace_id = current_trace_id(); + let latency_ms = latency.as_millis().to_string(); + + let (status_code, error) = match failure_classification { + GrpcFailureClass::Code(code) => { + error!(%trace_id, code, latency_ms, "failure handling request",); + (Some(grpc_to_http_code(tonic::Code::from(code.get()))), None) + } + GrpcFailureClass::Error(error) => { + error!(%trace_id, latency_ms, "failure handling request: {}", error,); + (None, Some(error)) + } + }; + + info!( + monotonic_counter.client_request_failure_count = 1, + monotonic_counter.client_5xx_response_count = 1, + latency_ms, + ?status_code, + ?error + ); + } +} + +#[derive(Debug, Clone)] +pub struct ClientOnEos; + +impl OnEos for ClientOnEos { + fn on_eos(self, _trailers: Option<&HeaderMap>, stream_duration: Duration, span: &Span) { + let _guard = span.enter(); + info!( + trace_id = %current_trace_id(), + duration_ms = stream_duration.as_millis(), + monotonic_counter.client_stream_response_count = 1, + histogram.client_stream_response_duration = stream_duration.as_millis() as u64, + "end of stream", + ); + } +} + #[cfg(test)] mod tests { use errors::grpc_to_http_code; - use http_body_util::BodyExt; - use hyper::{http, StatusCode}; use super::*; use crate::{ - clients::http::Response, health::{HealthCheckResult, HealthStatus}, pb::grpc::health::v1::{health_check_response::ServingStatus, HealthCheckResponse}, }; - async fn mock_http_response(status: StatusCode, body: &str) -> Result { - Ok(Response( - http::Response::builder() - .status(status) - .body( - body.to_string() - .map_err(|e| { - panic!( - "infallible error parsing string body in test response: {}", - e - ) - }) - .boxed(), - ) - .unwrap(), - )) - } - async fn mock_grpc_response( health_status: Option, tonic_status: Option, @@ -396,132 +556,6 @@ mod tests { } } - #[tokio::test] - async fn test_http_health_check_responses() { - // READY responses from HTTP 200 OK with or without reason - let response = [ - (StatusCode::OK, r#"{}"#), - (StatusCode::OK, r#"{ "status": "HEALTHY" }"#), - (StatusCode::OK, r#"{ "status": "meaningless status" }"#), - ( - StatusCode::OK, - r#"{ "status": "HEALTHY", "reason": "needless reason" }"#, - ), - ]; - for (status, body) in response.iter() { - let response = mock_http_response(*status, body).await; - let result = HttpClient::http_response_to_health_check_result(response).await; - assert_eq!(result.status, HealthStatus::Healthy); - assert_eq!(result.code, StatusCode::OK); - assert_eq!(result.reason, None); - let serialized = serde_json::to_string(&result).unwrap(); - assert_eq!(serialized, r#"{"status":"HEALTHY"}"#); - } - - // NOT_READY response from HTTP 200 OK without reason - let response = mock_http_response(StatusCode::OK, r#"{ "status": "UNHEALTHY" }"#).await; - let result = HttpClient::http_response_to_health_check_result(response).await; - assert_eq!(result.status, HealthStatus::Unhealthy); - assert_eq!(result.code, StatusCode::OK); - assert_eq!(result.reason, None); - let serialized = serde_json::to_string(&result).unwrap(); - assert_eq!(serialized, r#"{"status":"UNHEALTHY"}"#); - - // UNKNOWN response from HTTP 200 OK without reason - let response = mock_http_response(StatusCode::OK, r#"{ "status": "UNKNOWN" }"#).await; - let result = HttpClient::http_response_to_health_check_result(response).await; - assert_eq!(result.status, HealthStatus::Unknown); - assert_eq!(result.code, StatusCode::OK); - assert_eq!(result.reason, None); - let serialized = serde_json::to_string(&result).unwrap(); - assert_eq!(serialized, r#"{"status":"UNKNOWN"}"#); - - // NOT_READY response from HTTP 200 OK with reason - let response = mock_http_response( - StatusCode::OK, - r#"{"status": "UNHEALTHY", "reason": "some reason" }"#, - ) - .await; - let result = HttpClient::http_response_to_health_check_result(response).await; - assert_eq!(result.status, HealthStatus::Unhealthy); - assert_eq!(result.code, StatusCode::OK); - assert_eq!(result.reason, Some("some reason".to_string())); - let serialized = serde_json::to_string(&result).unwrap(); - assert_eq!( - serialized, - r#"{"status":"UNHEALTHY","reason":"some reason"}"# - ); - - // UNKNOWN response from HTTP 200 OK with reason - let response = mock_http_response( - StatusCode::OK, - r#"{ "status": "UNKNOWN", "reason": "some reason" }"#, - ) - .await; - let result = HttpClient::http_response_to_health_check_result(response).await; - assert_eq!(result.status, HealthStatus::Unknown); - assert_eq!(result.code, StatusCode::OK); - assert_eq!(result.reason, Some("some reason".to_string())); - let serialized = serde_json::to_string(&result).unwrap(); - assert_eq!(serialized, r#"{"status":"UNKNOWN","reason":"some reason"}"#); - - // NOT_READY response from HTTP 503 SERVICE UNAVAILABLE with reason - let response = mock_http_response( - StatusCode::SERVICE_UNAVAILABLE, - r#"{ "message": "some error message" }"#, - ) - .await; - let result = HttpClient::http_response_to_health_check_result(response).await; - assert_eq!(result.status, HealthStatus::Unhealthy); - assert_eq!(result.code, StatusCode::SERVICE_UNAVAILABLE); - assert_eq!(result.reason, Some("Service Unavailable".to_string())); - let serialized = serde_json::to_string(&result).unwrap(); - assert_eq!( - serialized, - r#"{"status":"UNHEALTHY","code":503,"reason":"Service Unavailable"}"# - ); - - // UNKNOWN response from HTTP 404 NOT FOUND with reason - let response = mock_http_response( - StatusCode::NOT_FOUND, - r#"{ "message": "service not found" }"#, - ) - .await; - let result = HttpClient::http_response_to_health_check_result(response).await; - assert_eq!(result.status, HealthStatus::Unknown); - assert_eq!(result.code, StatusCode::NOT_FOUND); - assert_eq!(result.reason, Some("Not Found".to_string())); - let serialized = serde_json::to_string(&result).unwrap(); - assert_eq!( - serialized, - r#"{"status":"UNKNOWN","code":404,"reason":"Not Found"}"# - ); - - // NOT_READY response from HTTP 500 INTERNAL SERVER ERROR without reason - let response = mock_http_response(StatusCode::INTERNAL_SERVER_ERROR, r#""#).await; - let result = HttpClient::http_response_to_health_check_result(response).await; - assert_eq!(result.status, HealthStatus::Unhealthy); - assert_eq!(result.code, StatusCode::INTERNAL_SERVER_ERROR); - assert_eq!(result.reason, Some("Internal Server Error".to_string())); - let serialized = serde_json::to_string(&result).unwrap(); - assert_eq!( - serialized, - r#"{"status":"UNHEALTHY","code":500,"reason":"Internal Server Error"}"# - ); - - // UNKNOWN response from HTTP 400 BAD REQUEST without reason - let response = mock_http_response(StatusCode::BAD_REQUEST, r#""#).await; - let result = HttpClient::http_response_to_health_check_result(response).await; - assert_eq!(result.status, HealthStatus::Unknown); - assert_eq!(result.code, StatusCode::BAD_REQUEST); - assert_eq!(result.reason, Some("Bad Request".to_string())); - let serialized = serde_json::to_string(&result).unwrap(); - assert_eq!( - serialized, - r#"{"status":"UNKNOWN","code":400,"reason":"Bad Request"}"# - ); - } - #[tokio::test] async fn test_grpc_health_check_responses() { // READY responses from gRPC 0 OK from serving status 1 SERVING diff --git a/src/clients/chunker.rs b/src/clients/chunker.rs index 25ef2ecd..796ba0fa 100644 --- a/src/clients/chunker.rs +++ b/src/clients/chunker.rs @@ -26,7 +26,7 @@ use tracing::{info, instrument}; use super::{ create_grpc_client, errors::grpc_to_http_code, grpc_request_with_headers, BoxStream, Client, - Error, + Error, GrpcClient, }; use crate::{ config::ServiceConfig, @@ -53,8 +53,8 @@ type StreamingTokenizationResult = #[cfg_attr(test, faux::create)] #[derive(Clone)] pub struct ChunkerClient { - client: ChunkersServiceClient, - health_client: HealthClient, + client: GrpcClient>, + health_client: GrpcClient>, } #[cfg_attr(test, faux::methods)] diff --git a/src/clients/http.rs b/src/clients/http.rs index 49dda5ff..daf8a9fc 100644 --- a/src/clients/http.rs +++ b/src/clients/http.rs @@ -15,7 +15,7 @@ */ -use std::{fmt::Debug, ops::Deref}; +use std::{fmt::Debug, ops::Deref, time::Duration}; use http_body_util::{combinators::BoxBody, BodyExt, Full}; use hyper::{ @@ -28,7 +28,16 @@ use hyper_util::client::legacy::connect::HttpConnector; use serde::{de::DeserializeOwned, Serialize}; use tower::timeout::Timeout; use tower::Service; -use tracing::{debug, error, instrument, Span}; +use tower_http::{ + classify::{ + NeverClassifyEos, ServerErrorsAsFailures, ServerErrorsFailureClass, SharedClassifier, + }, + trace::{ + DefaultOnBodyChunk, HttpMakeClassifier, MakeSpan, OnEos, OnFailure, OnRequest, OnResponse, + Trace, TraceLayer, + }, +}; +use tracing::{debug, error, info, info_span, instrument, Span}; use tracing_opentelemetry::OpenTelemetrySpanExt; use url::Url; @@ -76,17 +85,26 @@ impl Deref for Response { } } -impl From> for Response { - fn from(response: hyper::http::Response) -> Self { +impl From for Response { + fn from(response: TracedResponse) -> Self { Response(response.map(|body| body.boxed())) } } -pub type HttpClientInner = Timeout< - hyper_util::client::legacy::Client< - TimeoutConnector>, - BoxBody, +pub type HttpClientInner = Trace< + Timeout< + hyper_util::client::legacy::Client< + TimeoutConnector>, + BoxBody, + >, >, + SharedClassifier, + ClientMakeSpan, + ClientOnRequest, + ClientOnResponse, + DefaultOnBodyChunk, + ClientOnEos, + ClientOnFailure, >; /// A trait implemented by all clients that use HTTP for their inner client. @@ -215,13 +233,15 @@ impl HttpClient { } } - /// This is sectioned off to allow for testing. - pub(super) async fn http_response_to_health_check_result( - res: Result, - ) -> HealthCheckResult { + pub async fn health(&self) -> HealthCheckResult { + let req = Request::get(self.health_url.as_uri()) + .body(BoxBody::default()) + .unwrap(); + let res = self.inner.clone().call(req).await; match res { Ok(response) => { - if response.0.status() == StatusCode::OK { + let response = Response::from(response); + if response.status() == StatusCode::OK { if let Ok(body) = response.json::().await { // If the service provided a body, we only anticipate a minimal health status and optional reason. HealthCheckResult { @@ -271,23 +291,159 @@ impl HttpClient { error!("error checking health: {}", e); HealthCheckResult { status: HealthStatus::Unknown, - code: e.status_code(), + code: StatusCode::INTERNAL_SERVER_ERROR, reason: Some(e.to_string()), } } } } +} - pub async fn health(&self) -> HealthCheckResult { - let req = Request::get(self.health_url.as_uri()) - .body(BoxBody::default()) - .unwrap(); - let res = self.inner.clone().call(req).await; - Self::http_response_to_health_check_result(res.map(Into::into).map_err(|e| Error::Http { - code: StatusCode::INTERNAL_SERVER_ERROR, - message: format!("sending client health request failed: {}", e), - })) - .await +pub type TracedResponse = hyper::Response< + tower_http::trace::ResponseBody< + Incoming, + NeverClassifyEos, + DefaultOnBodyChunk, + ClientOnEos, + ClientOnFailure, + >, +>; + +pub type HttpClientTraceLayer = TraceLayer< + HttpMakeClassifier, + ClientMakeSpan, + ClientOnRequest, + ClientOnResponse, + DefaultOnBodyChunk, // no metrics currently per body chunk + ClientOnEos, + ClientOnFailure, +>; + +pub fn http_trace_layer() -> HttpClientTraceLayer { + TraceLayer::new_for_http() + .make_span_with(ClientMakeSpan) + .on_request(ClientOnRequest) + .on_response(ClientOnResponse) + .on_failure(ClientOnFailure) + .on_eos(ClientOnEos) +} + +#[derive(Debug, Clone)] +pub struct ClientMakeSpan; + +impl MakeSpan> for ClientMakeSpan { + fn make_span(&mut self, request: &Request>) -> Span { + info_span!( + "client HTTP request", + request_method = request.method().to_string(), + request_path = request.uri().path().to_string(), + ) + } +} + +#[derive(Debug, Clone)] +pub struct ClientOnRequest; + +impl OnRequest> for ClientOnRequest { + fn on_request(&mut self, request: &Request>, span: &Span) { + let _guard = span.enter(); + info!( + trace_id = %trace::current_trace_id(), + method = %request.method(), + path = %request.uri().path(), + monotonic_counter.incoming_request_count = 1, + "started processing request", + ); + } +} + +#[derive(Debug, Clone)] +pub struct ClientOnResponse; + +impl OnResponse for ClientOnResponse { + fn on_response(self, response: &hyper::Response, latency: Duration, span: &Span) { + let _guard = span.enter(); + // On every response + info!( + trace_id = %trace::current_trace_id(), + status = %response.status(), + duration_ms = %latency.as_millis(), + monotonic_counter.client_response_count = 1, + histogram.client_request_duration = latency.as_millis() as u64, + "finished processing request" + ); + + if response.status().is_server_error() { + // On every server error (HTTP 5xx) response + info!(monotonic_counter.client_5xx_response_count = 1); + } else if response.status().is_client_error() { + // On every client error (HTTP 4xx) response + info!(monotonic_counter.client_4xx_response_count = 1); + } else if response.status().is_success() { + // On every successful (HTTP 2xx) response + info!(monotonic_counter.client_success_response_count = 1); + } else { + error!( + "unexpected HTTP client response status code: {}", + response.status().as_u16() + ); + } + } +} + +#[derive(Debug, Clone)] +pub struct ClientOnFailure; + +impl OnFailure for ClientOnFailure { + fn on_failure( + &mut self, + failure_classification: ServerErrorsFailureClass, + latency: Duration, + span: &Span, + ) { + let _guard = span.enter(); + let trace_id = trace::current_trace_id(); + let latency_ms = latency.as_millis().to_string(); + + let (status_code, error) = match failure_classification { + ServerErrorsFailureClass::StatusCode(status_code) => { + error!( + %trace_id, + %status_code, + latency_ms, + "failure handling request", + ); + (Some(status_code), None) + } + ServerErrorsFailureClass::Error(error) => { + error!(?trace_id, latency_ms, "failure handling request: {}", error,); + (None, Some(error)) + } + }; + + info!( + monotonic_counter.client_request_failure_count = 1, + monotonic_counter.client_5xx_response_count = 1, + latency_ms, + ?status_code, + ?error + ); + } +} + +#[derive(Debug, Clone)] +pub struct ClientOnEos; + +impl OnEos for ClientOnEos { + fn on_eos(self, _trailers: Option<&HeaderMap>, stream_duration: Duration, span: &Span) { + let _guard = span.enter(); + info!( + trace_id = %trace::current_trace_id(), + duration_ms = stream_duration.as_millis(), + monotonic_counter.client_stream_response_count = 1, + histogram.client_stream_response_duration = stream_duration.as_millis() as u64, + "end of stream", + ); } } diff --git a/src/clients/nlp.rs b/src/clients/nlp.rs index 50cdd925..8c86ffb4 100644 --- a/src/clients/nlp.rs +++ b/src/clients/nlp.rs @@ -24,7 +24,7 @@ use tracing::{info, instrument}; use super::{ create_grpc_client, errors::grpc_to_http_code, grpc_request_with_headers, BoxStream, Client, - Error, + Error, GrpcClient, }; use crate::{ config::ServiceConfig, @@ -49,8 +49,8 @@ const MODEL_ID_HEADER_NAME: &str = "mm-model-id"; #[cfg_attr(test, faux::create)] #[derive(Clone)] pub struct NlpClient { - client: NlpServiceClient, - health_client: HealthClient, + client: GrpcClient>, + health_client: GrpcClient>, } #[cfg_attr(test, faux::methods)] diff --git a/src/clients/tgis.rs b/src/clients/tgis.rs index cac86f64..ed340aa3 100644 --- a/src/clients/tgis.rs +++ b/src/clients/tgis.rs @@ -24,7 +24,7 @@ use tracing::{info, instrument}; use super::{ create_grpc_client, errors::grpc_to_http_code, grpc_request_with_headers, BoxStream, Client, - Error, + Error, GrpcClient, }; use crate::{ config::ServiceConfig, @@ -42,7 +42,7 @@ const DEFAULT_PORT: u16 = 8033; #[cfg_attr(test, faux::create)] #[derive(Clone)] pub struct TgisClient { - client: GenerationServiceClient, + client: GrpcClient>, } #[cfg_attr(test, faux::methods)] diff --git a/src/utils/trace.rs b/src/utils/trace.rs index 9454f8d7..f4817a79 100644 --- a/src/utils/trace.rs +++ b/src/utils/trace.rs @@ -18,11 +18,10 @@ use std::time::Duration; use axum::{extract::Request, http::HeaderMap, response::Response}; -use hyper::body::Incoming; use opentelemetry::{ global, metrics::MetricsError, - trace::{TraceContextExt, TraceError, TracerProvider}, + trace::{TraceContextExt, TraceError, TraceId, TracerProvider}, KeyValue, }; use opentelemetry_http::{HeaderExtractor, HeaderInjector}; @@ -42,6 +41,7 @@ use tracing_opentelemetry::{MetricsLayer, OpenTelemetrySpanExt}; use tracing_subscriber::{layer::SubscriberExt, EnvFilter, Layer}; use crate::args::{LogFormat, OtlpProtocol, TracingConfig}; +use crate::clients::http::TracedResponse; #[derive(Debug, thiserror::Error)] pub enum TracingError { @@ -280,8 +280,9 @@ pub fn on_outgoing_response(response: &Response, latency: Duration, span: &Span) response_status = response.status().as_u16(), request_duration = latency.as_millis() ); + // Note: tracing_opentelemetry expects u64/f64 for histograms but as_millis returns u128 info!( - histogram.service_request_duration = latency.as_millis(), + histogram.service_request_duration = latency.as_millis() as u64, response_status = response.status().as_u16() ); @@ -294,8 +295,10 @@ pub fn on_outgoing_response(response: &Response, latency: Duration, span: &Span) ); } else if response.status().is_client_error() { // On every client error (HTTP 4xx) response + // Named so that this does not get mixed up with orchestrator + // client response metrics info!( - monotonic_counter.client_error_response_count = 1, + monotonic_counter.client_app_error_response_count = 1, response_status = response.status().as_u16(), request_duration = latency.as_millis() ); @@ -330,7 +333,7 @@ pub fn on_outgoing_eos(trailers: Option<&HeaderMap>, stream_duration: Duration, monotonic_counter.service_stream_response_count = 1, stream_duration = stream_duration.as_millis() ); - info!(monotonic_histogram.service_stream_response_duration = stream_duration.as_millis()); + info!(histogram.service_stream_response_duration = stream_duration.as_millis() as u64); } /// Injects the `traceparent` header into the header map from the current tracing span context. @@ -351,7 +354,7 @@ pub fn with_traceparent_header(ctx: &opentelemetry::Context, headers: HeaderMap) /// tracing span context (i.e. use `traceparent` as parent to the current span). /// Defaults to using the current context when no `traceparent` is found. /// See https://www.w3.org/TR/trace-context/#trace-context-http-headers-format. -pub fn trace_context_from_http_response(span: &Span, response: &hyper::Response) { +pub fn trace_context_from_http_response(span: &Span, response: &TracedResponse) { let curr_trace = span.context().span().span_context().trace_id(); let ctx = global::get_text_map_propagator(|propagator| { // Returns the current context if no `traceparent` is found @@ -374,3 +377,8 @@ pub fn trace_context_from_grpc_response(response: &tonic::Response) { }); Span::current().set_parent(ctx); } + +/// Returns the `trace_id` of the current span according to the global tracing subscriber. +pub fn current_trace_id() -> TraceId { + Span::current().context().span().span_context().trace_id() +}