diff --git a/Cargo.lock b/Cargo.lock index 72d0545709..4b848c78f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -379,6 +379,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-utils" +version = "0.8.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" + [[package]] name = "crypto-common" version = "0.1.6" @@ -682,6 +688,12 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" + [[package]] name = "futures-util" version = "0.3.31" @@ -733,6 +745,25 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "governor" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68a7f542ee6b35af73b06abc0dad1c1bae89964e4e253bc4b587b91c9637867b" +dependencies = [ + "cfg-if", + "futures", + "futures-timer", + "no-std-compat", + "nonzero_ext", + "parking_lot", + "portable-atomic", + "quanta", + "rand", + "smallvec", + "spinning_top", +] + [[package]] name = "gzip-header" version = "1.0.0" @@ -2012,6 +2043,7 @@ dependencies = [ name = "linkerd-proxy-server-policy" version = "0.1.0" dependencies = [ + "governor", "http", "ipnet", "linkerd-http-route", @@ -2385,8 +2417,7 @@ dependencies = [ [[package]] name = "linkerd2-proxy-api" version = "0.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26c72fb98d969e1e94e95d52a6fcdf4693764702c369e577934256e72fb5bc61" +source = "git+https://github.com/linkerd/linkerd2-proxy-api.git?branch=alpeb/v0.14.0-rate-limiting#903aeafd8c2e60790b5de0de7aee28f31964fce2" dependencies = [ "h2", "http", @@ -2530,6 +2561,12 @@ dependencies = [ "libc", ] +[[package]] +name = "no-std-compat" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" + [[package]] name = "nom" version = "7.1.3" @@ -2540,6 +2577,12 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nonzero_ext" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -2764,6 +2807,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "portable-atomic" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc9c68a3f6da06753e9335d63e27f6b9754dd1920d941135b7ea8224f141adb2" + [[package]] name = "powerfmt" version = "0.2.0" @@ -2920,6 +2969,21 @@ dependencies = [ "prost", ] +[[package]] +name = "quanta" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi", + "web-sys", + "winapi", +] + [[package]] name = "quick-error" version = "1.2.3" @@ -2980,6 +3044,15 @@ version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f60fcc7d6849342eff22c4350c8b9a989ee8ceabc4b481253e8946b9fe83d684" +[[package]] +name = "raw-cpuid" +version = "11.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ab240315c661615f2ee9f0f2cd32d5a7343a84d5ebcccb99d46e6637565e7b0" +dependencies = [ + "bitflags 2.4.2", +] + [[package]] name = "rcgen" version = "0.12.1" @@ -3290,6 +3363,15 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "spinning_top" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300" +dependencies = [ + "lock_api", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -3863,6 +3945,16 @@ version = "0.2.93" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c62a0a307cb4a311d3a07867860911ca130c3494e8c2719593806c08bc5d0484" +[[package]] +name = "web-sys" +version = "0.3.70" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26fdeaafd9bd129f65e7c031593c24d62186301e0c72c8978fa1678be7d532c0" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "widestring" version = "1.1.0" diff --git a/Cargo.toml b/Cargo.toml index 29f9651998..8e249afbb1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,3 +90,6 @@ lto = true [workspace.dependencies] linkerd2-proxy-api = "0.14.0" + +[patch.crates-io] +linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api.git", branch = "alpeb/v0.14.0-rate-limiting" } diff --git a/linkerd/app/gateway/src/http/tests.rs b/linkerd/app/gateway/src/http/tests.rs index e545d7f530..442c4ff316 100644 --- a/linkerd/app/gateway/src/http/tests.rs +++ b/linkerd/app/gateway/src/http/tests.rs @@ -129,6 +129,7 @@ async fn upgraded_request_remains_relative_form() { }), }]))]), }, + local_rate_limit: Arc::new(Default::default()), }; let (policy, tx) = inbound::policy::AllowPolicy::for_test(self.param(), policy); tokio::spawn(async move { diff --git a/linkerd/app/inbound/src/accept.rs b/linkerd/app/inbound/src/accept.rs index 04dc57db9b..d3c1516f75 100644 --- a/linkerd/app/inbound/src/accept.rs +++ b/linkerd/app/inbound/src/accept.rs @@ -138,6 +138,7 @@ mod tests { kind: "server".into(), name: "testsrv".into(), }), + local_rate_limit: Default::default(), }, None, ); diff --git a/linkerd/app/inbound/src/detect/tests.rs b/linkerd/app/inbound/src/detect/tests.rs index 0657ec733a..002717824b 100644 --- a/linkerd/app/inbound/src/detect/tests.rs +++ b/linkerd/app/inbound/src/detect/tests.rs @@ -35,6 +35,7 @@ fn allow(protocol: Protocol) -> AllowPolicy { kind: "server".into(), name: "testsrv".into(), }), + local_rate_limit: Arc::new(Default::default()), }, ); allow diff --git a/linkerd/app/inbound/src/http/tests.rs b/linkerd/app/inbound/src/http/tests.rs index f5c9351240..637cd73240 100644 --- a/linkerd/app/inbound/src/http/tests.rs +++ b/linkerd/app/inbound/src/http/tests.rs @@ -748,6 +748,7 @@ impl svc::Param for Target { kind: "server".into(), name: "testsrv".into(), }), + local_rate_limit: Default::default(), }, ); policy diff --git a/linkerd/app/inbound/src/policy.rs b/linkerd/app/inbound/src/policy.rs index 176c92551d..d320bbe601 100644 --- a/linkerd/app/inbound/src/policy.rs +++ b/linkerd/app/inbound/src/policy.rs @@ -44,7 +44,7 @@ pub trait GetPolicy { fn get_policy(&self, dst: OrigDstAddr) -> AllowPolicy; } -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug)] pub enum DefaultPolicy { Allow(ServerPolicy), Deny, @@ -90,6 +90,7 @@ impl From for ServerPolicy { DefaultPolicy::Allow(p) => p, DefaultPolicy::Deny => ServerPolicy { protocol: Protocol::Opaque(Arc::new([])), + local_rate_limit: Default::default(), meta: Meta::new_default("deny"), }, } diff --git a/linkerd/app/inbound/src/policy/defaults.rs b/linkerd/app/inbound/src/policy/defaults.rs index ad23cae462..29a5dd01da 100644 --- a/linkerd/app/inbound/src/policy/defaults.rs +++ b/linkerd/app/inbound/src/policy/defaults.rs @@ -88,5 +88,6 @@ fn mk( ServerPolicy { meta: Meta::new_default(name), protocol, + local_rate_limit: Default::default(), } } diff --git a/linkerd/app/inbound/src/policy/http/tests.rs b/linkerd/app/inbound/src/policy/http/tests.rs index 0369207938..22ed859c4a 100644 --- a/linkerd/app/inbound/src/policy/http/tests.rs +++ b/linkerd/app/inbound/src/policy/http/tests.rs @@ -29,6 +29,7 @@ macro_rules! new_svc { kind: "Server".into(), name: "testsrv".into(), }), + local_rate_limit: Arc::new(Default::default()), }, ); let svc = HttpPolicyService { @@ -197,6 +198,7 @@ async fn http_route() { }, ], }])), + local_rate_limit: Arc::new(Default::default()), }) .expect("must send"); diff --git a/linkerd/app/inbound/src/policy/tcp/tests.rs b/linkerd/app/inbound/src/policy/tcp/tests.rs index 957c51d736..53b10fc592 100644 --- a/linkerd/app/inbound/src/policy/tcp/tests.rs +++ b/linkerd/app/inbound/src/policy/tcp/tests.rs @@ -26,6 +26,7 @@ async fn unauthenticated_allowed() { kind: "server".into(), name: "test".into(), }), + local_rate_limit: Arc::new(Default::default()), }; let tls = tls::ConditionalServerTls::None(tls::NoServerTls::NoClientHello); @@ -75,6 +76,7 @@ async fn authenticated_identity() { kind: "server".into(), name: "test".into(), }), + local_rate_limit: Arc::new(Default::default()), }; let tls = tls::ConditionalServerTls::Some(tls::ServerTls::Established { @@ -138,6 +140,7 @@ async fn authenticated_suffix() { kind: "server".into(), name: "test".into(), }), + local_rate_limit: Arc::new(Default::default()), }; let tls = tls::ConditionalServerTls::Some(tls::ServerTls::Established { @@ -197,6 +200,7 @@ async fn tls_unauthenticated() { kind: "server".into(), name: "test".into(), }), + local_rate_limit: Arc::new(Default::default()), }; let tls = tls::ConditionalServerTls::Some(tls::ServerTls::Established { diff --git a/linkerd/app/inbound/src/test_util.rs b/linkerd/app/inbound/src/test_util.rs index fb28be8c24..09997f4787 100644 --- a/linkerd/app/inbound/src/test_util.rs +++ b/linkerd/app/inbound/src/test_util.rs @@ -46,6 +46,7 @@ pub fn default_config() -> Config { kind: "server".into(), name: "testsrv".into(), }), + local_rate_limit: Arc::new(Default::default()), } .into(), ports: Default::default(), diff --git a/linkerd/app/integration/src/policy.rs b/linkerd/app/integration/src/policy.rs index b8cf4b9cef..fd1053eb27 100644 --- a/linkerd/app/integration/src/policy.rs +++ b/linkerd/app/integration/src/policy.rs @@ -45,6 +45,7 @@ pub fn all_unauthenticated() -> inbound::Server { inbound::proxy_protocol::Detect { timeout: Some(Duration::from_secs(10).try_into().unwrap()), http_routes: vec![], + http_local_rate_limit: None, }, )), }), diff --git a/linkerd/proxy/server-policy/Cargo.toml b/linkerd/proxy/server-policy/Cargo.toml index b6b9c2c2bf..b8cc881a38 100644 --- a/linkerd/proxy/server-policy/Cargo.toml +++ b/linkerd/proxy/server-policy/Cargo.toml @@ -10,6 +10,7 @@ publish = false proto = ["linkerd-http-route/proto", "linkerd2-proxy-api", "prost-types"] [dependencies] +governor = { version = "0.6", default-features = false, features = ["std", "jitter", "quanta"] } ipnet = "2" http = "0.2" prost-types = { version = "0.12", optional = true } diff --git a/linkerd/proxy/server-policy/src/lib.rs b/linkerd/proxy/server-policy/src/lib.rs index bee142475b..9f0e098712 100644 --- a/linkerd/proxy/server-policy/src/lib.rs +++ b/linkerd/proxy/server-policy/src/lib.rs @@ -1,11 +1,13 @@ #![deny(rust_2018_idioms, clippy::disallowed_methods, clippy::disallowed_types)] #![forbid(unsafe_code)] +use local_rate_limit::HttpLocalRateLimit; use std::{hash::Hash, sync::Arc, time}; pub mod authz; pub mod grpc; pub mod http; +pub mod local_rate_limit; pub mod meta; pub use self::{ @@ -14,10 +16,11 @@ pub use self::{ }; pub use linkerd_http_route as route; -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug)] pub struct ServerPolicy { pub protocol: Protocol, pub meta: Arc, + pub local_rate_limit: Arc, } #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -65,6 +68,7 @@ impl ServerPolicy { }]), tcp_authorizations: Arc::new([]), }, + local_rate_limit: Arc::new(HttpLocalRateLimit::default()), } } } @@ -73,7 +77,7 @@ impl ServerPolicy { pub mod proto { use super::*; use crate::meta::proto::InvalidMeta; - use linkerd2_proxy_api::inbound as api; + use linkerd2_proxy_api::inbound::{self as api}; #[derive(Debug, thiserror::Error)] pub enum InvalidServer { @@ -131,6 +135,28 @@ pub mod proto { server_ips: _, } = proto; + let local_rate_limit = { + match protocol + .clone() + .and_then(|api::ProxyProtocol { kind }| kind) + .ok_or(InvalidServer::MissingProxyProtocol)? + { + api::proxy_protocol::Kind::Detect(api::proxy_protocol::Detect { + http_local_rate_limit, + .. + }) => http_local_rate_limit.unwrap_or_default().into(), + api::proxy_protocol::Kind::Http1(api::proxy_protocol::Http1 { + local_rate_limit, + .. + }) + | api::proxy_protocol::Kind::Http2(api::proxy_protocol::Http2 { + local_rate_limit, + .. + }) => local_rate_limit.unwrap_or_default().into(), + _ => Default::default(), + } + }; + let authorizations = { // Always permit traffic from localhost. let localhost = Authorization { @@ -154,6 +180,7 @@ pub mod proto { api::proxy_protocol::Kind::Detect(api::proxy_protocol::Detect { http_routes, timeout, + .. }) => Protocol::Detect { http: mk_routes!(http, http_routes, authorizations.clone())?, timeout: timeout @@ -162,11 +189,11 @@ pub mod proto { tcp_authorizations: authorizations, }, - api::proxy_protocol::Kind::Http1(api::proxy_protocol::Http1 { routes }) => { + api::proxy_protocol::Kind::Http1(api::proxy_protocol::Http1 { routes, .. }) => { Protocol::Http1(mk_routes!(http, routes, authorizations)?) } - api::proxy_protocol::Kind::Http2(api::proxy_protocol::Http2 { routes }) => { + api::proxy_protocol::Kind::Http2(api::proxy_protocol::Http2 { routes, .. }) => { Protocol::Http2(mk_routes!(http, routes, authorizations)?) } @@ -182,7 +209,11 @@ pub mod proto { // avoid label inference. let meta = Meta::try_new_with_default(labels, "policy.linkerd.io", "server")?; - Ok(ServerPolicy { protocol, meta }) + Ok(ServerPolicy { + protocol, + meta, + local_rate_limit: Arc::new(local_rate_limit), + }) } } } diff --git a/linkerd/proxy/server-policy/src/local_rate_limit.rs b/linkerd/proxy/server-policy/src/local_rate_limit.rs new file mode 100644 index 0000000000..7da897b2c0 --- /dev/null +++ b/linkerd/proxy/server-policy/src/local_rate_limit.rs @@ -0,0 +1,72 @@ +use governor::{ + clock::DefaultClock, + state::{keyed::DefaultKeyedStateStore, InMemoryState, NotKeyed, RateLimiter}, + Quota, +}; +use std::num::NonZeroU32; + +#[derive(Debug, Default)] +pub struct HttpLocalRateLimit { + pub total: Option>, + pub identity: Option, DefaultClock>>, + pub overrides: Vec, +} + +#[derive(Debug)] +pub struct HttpLocalRateLimitOverride { + pub ids: Vec, + pub rate_limit: RateLimiter, DefaultKeyedStateStore>, DefaultClock>, +} + +impl Default for HttpLocalRateLimitOverride { + fn default() -> Self { + Self { + ids: vec![], + rate_limit: RateLimiter::keyed(Quota::per_second(NonZeroU32::new(1).unwrap())), + } + } +} + +#[cfg(feature = "proto")] +pub mod proto { + use super::*; + use linkerd2_proxy_api::inbound::{self as api}; + + impl From for HttpLocalRateLimit { + fn from(proto: api::HttpLocalRateLimit) -> Self { + let total = proto.total.map(|lim| { + let quota = Quota::per_second(NonZeroU32::new(lim.requests_per_second).unwrap()); + RateLimiter::direct(quota) + }); + + let identity = proto.identity.map(|lim| { + let quota = Quota::per_second(NonZeroU32::new(lim.requests_per_second).unwrap()); + RateLimiter::keyed(quota) + }); + + let overrides = proto + .overrides + .into_iter() + .flat_map(|ovr| { + ovr.limit.map(|lim| { + let ids = ovr + .clients + .into_iter() + .flat_map(|cl| cl.identities.into_iter().map(|id| id.name)) + .collect(); + let quota = + Quota::per_second(NonZeroU32::new(lim.requests_per_second).unwrap()); + let rate_limit = RateLimiter::keyed(quota); + HttpLocalRateLimitOverride { ids, rate_limit } + }) + }) + .collect(); + + Self { + total, + identity, + overrides, + } + } + } +}