Skip to content

Commit

Permalink
add docs
Browse files Browse the repository at this point in the history
  • Loading branch information
jr1221 committed Sep 20, 2024
1 parent b1622b9 commit 3e62501
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 2 deletions.
2 changes: 2 additions & 0 deletions compose.tpu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ services:
scylla-server:
environment:
- SCYLLA_SIREN_HOST_URL=host.docker.internal:1883
- SCYLLA_RATE_LIMIT_MODE=static
- SCYLLA_STATIC_RATE_LIMIT_VALUE=100
extra_hosts:
- "host.docker.internal:host-gateway" # for external siren
init: false # not supported on buildroot for some reason, further investigation needed
16 changes: 14 additions & 2 deletions scylla-server/src/processors/mqtt_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,14 @@ use crate::{
use super::ClientData;
use std::borrow::Cow;

/// a mqtt processor
/// The chief processor of incoming mqtt data, this handles
/// - mqtt state
/// - reception via mqtt and subsequent parsing
/// - labeling of data with runs
/// - sending data over socket
/// - sending data over the channel to a db handler
///
/// It also is the main form of rate limiting
pub struct MqttProcessor {
channel: Sender<ClientData>,
new_run_channel: Receiver<run_service::public_run::Data>,
Expand All @@ -44,7 +51,7 @@ pub struct MqttProcessor {
rate_limit_mode: RateLimitMode,
}

/// processor options
/// processor options, these are static immutable settings
pub struct MqttProcessorOptions {
/// URI of the mqtt server
pub mqtt_path: String,
Expand Down Expand Up @@ -215,15 +222,20 @@ impl MqttProcessor {
return None;
}

// handle static rate limiting mode
if self.rate_limit_mode == RateLimitMode::Static {
// check if we have a previous time for a message based on its topic
if let Some(old) = self.rate_limiter.get(topic) {
// if the message is less than the rate limit, skip it and do not update the map
if old.elapsed() < Duration::from_millis(self.rate_limit_time) {
trace!("Static rate limit skipping message with topic {}", topic);
return None;
} else {
// if the message is past the rate limit, continue with the parsing of it and mark the new time last received
self.rate_limiter.insert(topic.to_string(), Instant::now());
}
} else {
// here is the first insertion of the topic (the first time we receive the topic in scylla's lifetime)
self.rate_limiter.insert(topic.to_string(), Instant::now());
}
}
Expand Down

0 comments on commit 3e62501

Please sign in to comment.