Skip to content

Commit

Permalink
add simple socket discarding
Browse files Browse the repository at this point in the history
  • Loading branch information
jr1221 committed Sep 3, 2024
1 parent d4d2cab commit 9de14a8
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 20 deletions.
10 changes: 10 additions & 0 deletions scylla-server-rust/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ struct ScyllaArgs {
default_value = "10"
)]
batch_upsert_time: u64,

/// The
#[arg(
short = 'd',
long,
env = "SCYLLA_SOCKET_DISCARD_PERCENT",
default_value = "0"
)]
socketio_discard_percent: u8,
}

#[tokio::main]
Expand Down Expand Up @@ -180,6 +189,7 @@ async fn main() {
curr_run.id,
io,
token.clone(),
((cli.socketio_discard_percent as f32 / 100.0) * 255.0) as u8,
);
let (client, eventloop) = AsyncClient::new(opts, 600);
let client_sharable: Arc<AsyncClient> = Arc::new(client);
Expand Down
51 changes: 31 additions & 20 deletions scylla-server-rust/src/processors/mqtt_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub struct MqttProcessor {
curr_run: i32,
io: SocketIo,
cancel_token: CancellationToken,
/// Upload ratio, below is not uploaded above is uploaded
upload_ratio: u8,
}

impl MqttProcessor {
Expand All @@ -44,6 +46,7 @@ impl MqttProcessor {
initial_run: i32,
io: SocketIo,
cancel_token: CancellationToken,
upload_ratio: u8,
) -> (MqttProcessor, MqttOptions) {
// create the mqtt client and configure it
let mut mqtt_opts = MqttOptions::new(
Expand Down Expand Up @@ -79,6 +82,7 @@ impl MqttProcessor {
curr_run: initial_run,
io,
cancel_token,
upload_ratio,
},
mqtt_opts,
)
Expand All @@ -93,6 +97,8 @@ impl MqttProcessor {
let mut latency_interval = tokio::time::interval(Duration::from_millis(250));
let mut latency_ringbuffer = ringbuffer::AllocRingBuffer::<i64>::new(20);

let mut upload_counter: u8 = 0;

debug!("Subscribing to siren");
client
.subscribe("#", rumqttc::v5::mqttbytes::QoS::ExactlyOnce)
Expand All @@ -119,7 +125,7 @@ impl MqttProcessor {
};
latency_ringbuffer.push(chrono::offset::Utc::now().timestamp_millis() - msg.timestamp);
self.send_db_msg(msg.clone()).await;
self.send_socket_msg(msg);
self.send_socket_msg(msg, &mut upload_counter);
},
Err(msg) => trace!("Received mqtt error: {:?}", msg),
_ => trace!("Received misc mqtt: {:?}", msg),
Expand All @@ -135,7 +141,7 @@ impl MqttProcessor {
timestamp: chrono::offset::Utc::now().timestamp_millis(),
values: vec![sockets.len().to_string()]
};
self.send_socket_msg(client_data);
self.send_socket_msg(client_data, &mut upload_counter);
} else {
warn!("Could not fetch socket count");
}
Expand All @@ -157,7 +163,7 @@ impl MqttProcessor {
values: vec![avg_latency.to_string()]
};
trace!("Latency update sending: {}", client_data.values.first().unwrap_or(&"n/a".to_string()));
self.send_socket_msg(client_data);
self.send_socket_msg(client_data, &mut upload_counter);
}
Some(new_run) = self.new_run_channel.recv() => {
trace!("New run: {:?}", new_run);
Expand Down Expand Up @@ -253,23 +259,28 @@ impl MqttProcessor {

/// Sends a message to the socket, printing and IGNORING any error that may occur
/// * `client_data` - The client data to send over the broadcast
fn send_socket_msg(&self, client_data: ClientData) {
match self.io.emit(
"message",
serde_json::to_string(&client_data).expect("Could not serialize ClientData"),
) {
Ok(_) => (),
Err(err) => match err {
socketioxide::BroadcastError::Socket(e) => {
trace!("Socket: Transmit error: {:?}", e);
}
socketioxide::BroadcastError::Serialize(_) => {
warn!("Socket: Serialize error: {}", err)
}
socketioxide::BroadcastError::Adapter(_) => {
warn!("Socket: Adapter error: {}", err)
}
},
fn send_socket_msg(&self, client_data: ClientData, upload_counter: &mut u8) {
*upload_counter = upload_counter.wrapping_add(1);
if *upload_counter >= self.upload_ratio {
match self.io.emit(
"message",
serde_json::to_string(&client_data).expect("Could not serialize ClientData"),
) {
Ok(_) => (),
Err(err) => match err {
socketioxide::BroadcastError::Socket(e) => {
trace!("Socket: Transmit error: {:?}", e);
}
socketioxide::BroadcastError::Serialize(_) => {
warn!("Socket: Serialize error: {}", err)
}
socketioxide::BroadcastError::Adapter(_) => {
warn!("Socket: Adapter error: {}", err)
}
},
}
} else {
trace!("Discarding message with topic {}", client_data.name);
}
}
}

0 comments on commit 9de14a8

Please sign in to comment.