diff --git a/compose.tpu.yml b/compose.tpu.yml index a68135b5..88ea3216 100644 --- a/compose.tpu.yml +++ b/compose.tpu.yml @@ -2,6 +2,8 @@ services: scylla-server: environment: - SCYLLA_SIREN_HOST_URL=host.docker.internal:1883 + - SCYLLA_RATE_LIMIT_MODE=static + - SCYLLA_STATIC_RATE_LIMIT_VALUE=100 extra_hosts: - "host.docker.internal:host-gateway" # for external siren init: false # not supported on buildroot for some reason, further investigation needed diff --git a/scylla-server/src/processors/mqtt_processor.rs b/scylla-server/src/processors/mqtt_processor.rs index 44c0b6e2..056b2207 100644 --- a/scylla-server/src/processors/mqtt_processor.rs +++ b/scylla-server/src/processors/mqtt_processor.rs @@ -27,7 +27,14 @@ use crate::{ use super::ClientData; use std::borrow::Cow; -/// a mqtt processor +/// The chief processor of incoming mqtt data, this handles +/// - mqtt state +/// - reception via mqtt and subsequent parsing +/// - labeling of data with runs +/// - sending data over socket +/// - sending data over the channel to a db handler +/// +/// It also is the main form of rate limiting pub struct MqttProcessor { channel: Sender, new_run_channel: Receiver, @@ -44,7 +51,7 @@ pub struct MqttProcessor { rate_limit_mode: RateLimitMode, } -/// processor options +/// processor options, these are static immutable settings pub struct MqttProcessorOptions { /// URI of the mqtt server pub mqtt_path: String, @@ -215,15 +222,20 @@ impl MqttProcessor { return None; } + // handle static rate limiting mode if self.rate_limit_mode == RateLimitMode::Static { + // check if we have a previous time for a message based on its topic if let Some(old) = self.rate_limiter.get(topic) { + // if the message is less than the rate limit, skip it and do not update the map if old.elapsed() < Duration::from_millis(self.rate_limit_time) { trace!("Static rate limit skipping message with topic {}", topic); return None; } else { + // if the message is past the rate limit, continue with the parsing of it and mark the new time last received self.rate_limiter.insert(topic.to_string(), Instant::now()); } } else { + // here is the first insertion of the topic (the first time we receive the topic in scylla's lifetime) self.rate_limiter.insert(topic.to_string(), Instant::now()); } }