From 05b8f2c9a41b7847ce045440697707e377c25d84 Mon Sep 17 00:00:00 2001 From: Paul Scoropan <1paulscoropan@gmail.com> Date: Mon, 25 Nov 2024 12:23:45 -0500 Subject: [PATCH] Introduce tower service into clients Signed-off-by: Paul Scoropan <1paulscoropan@gmail.com> Co-authored-by: Evaline Ju <69598118+evaline-ju@users.noreply.github.com> --- Cargo.lock | 103 +++++++++++++++++++++++++++++++------------- Cargo.toml | 4 +- src/clients.rs | 9 +++- src/clients/http.rs | 52 +++++++++++----------- 4 files changed, 113 insertions(+), 55 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ffc74e27..fed55e3b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -100,7 +100,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.79", ] [[package]] @@ -111,7 +111,7 @@ checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.79", ] [[package]] @@ -310,7 +310,7 @@ dependencies = [ "regex", "rustc-hash 1.1.0", "shlex", - "syn", + "syn 2.0.79", "which", ] @@ -430,7 +430,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn", + "syn 2.0.79", ] [[package]] @@ -501,7 +501,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn", + "syn 2.0.79", ] [[package]] @@ -512,7 +512,7 @@ checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" dependencies = [ "darling_core", "quote", - "syn", + "syn 2.0.79", ] [[package]] @@ -566,7 +566,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn", + "syn 2.0.79", ] [[package]] @@ -642,7 +642,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn", + "syn 2.0.79", "uuid", ] @@ -698,8 +698,10 @@ dependencies = [ "tokio-stream", "tonic", "tonic-build", + "tower 0.5.1", "tower-http", "tower-service", + "tower-timeout", "tracing", "tracing-opentelemetry", "tracing-subscriber", @@ -816,7 +818,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.79", ] [[package]] @@ -1077,7 +1079,7 @@ dependencies = [ "httparse", "httpdate 0.3.2", "itoa 0.4.8", - "pin-project", + "pin-project 1.1.5", "socket2 0.3.19", "tokio 0.2.25", "tower-service", @@ -1614,7 +1616,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.79", ] [[package]] @@ -1765,13 +1767,33 @@ dependencies = [ "indexmap 2.5.0", ] +[[package]] +name = "pin-project" +version = "0.4.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ef0f924a5ee7ea9cbcea77529dba45f8a9ba9f622419fe3386ca581a3ae9d5a" +dependencies = [ + "pin-project-internal 0.4.30", +] + [[package]] name = "pin-project" version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3" dependencies = [ - "pin-project-internal", + "pin-project-internal 1.1.5", +] + +[[package]] +name = "pin-project-internal" +version = "0.4.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "851c8d0ce9bebe43790dedfc86614c23494ac9f423dd618d3a61fc693eafe61e" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", ] [[package]] @@ -1782,7 +1804,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.79", ] [[package]] @@ -1847,7 +1869,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "479cf940fbbb3426c32c5d5176f62ad57549a0bb84773423ba8be9d089f5faba" dependencies = [ "proc-macro2", - "syn", + "syn 2.0.79", ] [[package]] @@ -1886,7 +1908,7 @@ dependencies = [ "prost", "prost-types", "regex", - "syn", + "syn 2.0.79", "tempfile", ] @@ -1900,7 +1922,7 @@ dependencies = [ "itertools 0.13.0", "proc-macro2", "quote", - "syn", + "syn 2.0.79", ] [[package]] @@ -2366,7 +2388,7 @@ checksum = "243902eda00fad750862fc144cea25caca5e20d615af0a81bee94ca738f1df1f" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.79", ] [[package]] @@ -2498,6 +2520,17 @@ version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "syn" version = "2.0.79" @@ -2575,7 +2608,7 @@ checksum = "08904e7672f5eb876eaaf87e0ce17857500934f4981c4a0ab2b4aa98baac7fc3" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.79", ] [[package]] @@ -2678,7 +2711,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.79", ] [[package]] @@ -2770,7 +2803,7 @@ dependencies = [ "hyper-timeout", "hyper-util", "percent-encoding", - "pin-project", + "pin-project 1.1.5", "prost", "rustls-native-certs", "rustls-pemfile", @@ -2796,7 +2829,7 @@ dependencies = [ "prost-build", "prost-types", "quote", - "syn", + "syn 2.0.79", ] [[package]] @@ -2808,7 +2841,7 @@ dependencies = [ "futures-core", "futures-util", "indexmap 1.9.3", - "pin-project", + "pin-project 1.1.5", "pin-project-lite 0.2.15", "rand", "slab", @@ -2864,6 +2897,18 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" +[[package]] +name = "tower-timeout" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "127b8924b357be938823eaaec0608c482d40add25609481027b96198b2e4b31e" +dependencies = [ + "pin-project 0.4.30", + "tokio 0.2.25", + "tower-layer", + "tower-service", +] + [[package]] name = "tracing" version = "0.1.40" @@ -2884,7 +2929,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.79", ] [[package]] @@ -2903,7 +2948,7 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" dependencies = [ - "pin-project", + "pin-project 1.1.5", "tracing", ] @@ -2985,7 +3030,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568" dependencies = [ "quote", - "syn", + "syn 2.0.79", ] [[package]] @@ -3160,7 +3205,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn", + "syn 2.0.79", "wasm-bindgen-shared", ] @@ -3194,7 +3239,7 @@ checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.79", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -3530,11 +3575,11 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.79", ] [[package]] name = "zeroize" version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" +checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index f188c175..74d579df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ http-body-util = "0.1.2" http-serde = "2.1.1" hyper = { version = "1.4.1", features = ["http1", "http2", "server"] } hyper-rustls = { version = "0.27.3", features = ["ring"]} +hyper-timeout = "0.5.2" hyper-util = { version = "0.1.7", features = ["server-auto", "server-graceful", "tokio"] } mime = "0.3.17" mio = "1.0.2" @@ -52,14 +53,15 @@ tokio = { version = "1.39.2", features = ["rt", "rt-multi-thread", "parking_lot" tokio-rustls = { version = "0.26.0", features = ["ring"]} tokio-stream = { version = "0.1.15", features = ["sync"] } tonic = { version = "0.12.1", features = ["tls", "tls-roots", "tls-webpki-roots"] } +tower = "0.5.1" tower-http = { version = "0.5.2", features = ["trace"] } tower-service = "0.3" +tower-timeout = "0.3.0" tracing = "0.1.40" tracing-opentelemetry = "0.25.0" tracing-subscriber = { version = "0.3.18", features = ["json", "env-filter"] } url = "2.5.2" uuid = { version = "1.10.0", features = ["v4", "fast-rng"] } -hyper-timeout = "0.5.2" [build-dependencies] tonic-build = "0.12.1" diff --git a/src/clients.rs b/src/clients.rs index becef10e..0e61c6f1 100644 --- a/src/clients.rs +++ b/src/clients.rs @@ -30,6 +30,8 @@ use ginepro::LoadBalancedChannel; use hyper_timeout::TimeoutConnector; use hyper_util::rt::TokioExecutor; use tonic::{metadata::MetadataMap, Request}; +use tower::ServiceBuilder; +use tower_timeout::TimeoutLayer; use tracing::{debug, instrument, Span}; use tracing_opentelemetry::OpenTelemetrySpanExt; use url::Url; @@ -244,7 +246,10 @@ pub async fn create_http_client( let client = hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build(timeout_conn); - Ok(HttpClient::new(base_url, request_timeout, client)) + let client = ServiceBuilder::new() + .layer(TimeoutLayer::new(request_timeout)) + .service(client); + Ok(HttpClient::new(base_url, client)) } #[instrument(skip_all, fields(hostname = service_config.hostname))] @@ -306,6 +311,8 @@ pub async fn create_grpc_client( .channel() .await .unwrap_or_else(|error| panic!("error creating grpc client: {error}")); + + let channel = ServiceBuilder::new().service(channel); new(channel) } diff --git a/src/clients/http.rs b/src/clients/http.rs index 30a108ca..cdc0ad69 100644 --- a/src/clients/http.rs +++ b/src/clients/http.rs @@ -15,18 +15,19 @@ */ -use std::{fmt::Debug, ops::Deref, time::Duration}; +use std::{fmt::Debug, ops::Deref}; use http_body_util::{combinators::BoxBody, BodyExt, Full}; use hyper::{ body::{Body, Bytes, Incoming}, - HeaderMap, Method, StatusCode, + HeaderMap, Method, Request, StatusCode, }; use hyper_rustls::HttpsConnector; use hyper_timeout::TimeoutConnector; use hyper_util::client::legacy::connect::HttpConnector; use serde::{de::DeserializeOwned, Serialize}; -use tokio::time::timeout; +use tower_service::Service; +use tower_timeout::Timeout; use tracing::{debug, error, instrument, Span}; use tracing_opentelemetry::OpenTelemetrySpanExt; use url::Url; @@ -81,9 +82,11 @@ impl From> for Response { } } -pub type HttpClientInner = hyper_util::client::legacy::Client< - TimeoutConnector>, - BoxBody, +pub type HttpClientInner = Timeout< + hyper_util::client::legacy::Client< + TimeoutConnector>, + BoxBody, + >, >; /// A trait implemented by all clients that use HTTP for their inner client. @@ -96,17 +99,15 @@ pub trait HttpClientExt: Client { pub struct HttpClient { base_url: Url, health_url: Url, - request_timeout: Duration, inner: HttpClientInner, } impl HttpClient { - pub fn new(base_url: Url, request_timeout: Duration, inner: HttpClientInner) -> Self { + pub fn new(base_url: Url, inner: HttpClientInner) -> Self { let health_url = base_url.join("health").unwrap(); Self { base_url, health_url, - request_timeout, inner, } } @@ -177,21 +178,21 @@ impl HttpClient { message: format!("client request serialization failed: {}", e) } })?; - let response_fut = self + let response = match self .inner - .request(request); - - let response = match timeout(self.request_timeout, response_fut).await { - Ok(response) => Ok(response.map_err(|e| { - Error::Http { - code: StatusCode::INTERNAL_SERVER_ERROR, - message: format!("sending client request failed: {}", e) - } - })?), - Err(e) => Err(Error::Http { - code: StatusCode::REQUEST_TIMEOUT, - message: format!("client request timeout: {}", e), - }), + .clone() + .call(request) + .await { + Ok(response) => Ok(response.map_err(|e| { + Error::Http { + code: StatusCode::INTERNAL_SERVER_ERROR, + message: format!("sending client request failed: {}", e) + } + }).into_inner()), + Err(e) => Err(Error::Http { + code: StatusCode::REQUEST_TIMEOUT, + message: format!("client request timeout: {}", e), + }), }?; debug!( @@ -278,7 +279,10 @@ impl HttpClient { } pub async fn health(&self) -> HealthCheckResult { - let res = self.inner.get(self.health_url.as_uri()).await; + let req = Request::get(self.health_url.as_uri()) + .body(BoxBody::default()) + .unwrap(); + let res = self.inner.clone().call(req).await; Self::http_response_to_health_check_result(res.map(Into::into).map_err(|e| Error::Http { code: StatusCode::INTERNAL_SERVER_ERROR, message: format!("sending client health request failed: {}", e),