Skip to content

Commit

Permalink
consolidate batching logic in db_handler
Browse files Browse the repository at this point in the history
  • Loading branch information
jr1221 committed Nov 30, 2024
1 parent db25af2 commit f0811ff
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 11 deletions.
24 changes: 14 additions & 10 deletions scylla-server/src/controllers/file_insertion_controller.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
use axum::extract::{Multipart, State};
use axum::{
extract::{Multipart, State},
Extension,
};
use axum_macros::debug_handler;
use chrono::DateTime;
use protobuf::CodedInputStream;
use rangemap::RangeInclusiveMap;
use tokio::sync::mpsc;
use tokio_util::bytes::Buf;
use tracing::{debug, info, trace};
use tracing::{debug, info, trace, warn};

use crate::{
error::ScyllaError,
playback_data,
services::{data_service, run_service},
ClientData, PoolHandle,
};
use crate::{error::ScyllaError, playback_data, services::run_service, ClientData, PoolHandle};

/// Inserts a file using http multipart
/// This file is parsed and clientdata values are extracted, the run ID of each variable is inferred, and then data is batch uploaded
// super cool: adding this tag tells you what variable is misbehaving in cases of axum Send+Sync Handler fails
#[debug_handler]
pub async fn insert_file(
State(pool): State<PoolHandle>,
Extension(batcher): Extension<mpsc::Sender<Vec<ClientData>>>,
mut multipart: Multipart,
) -> Result<String, ScyllaError> {
// create a run ID cache
Expand Down Expand Up @@ -95,7 +97,9 @@ pub async fn insert_file(
.collect::<Vec<_>>()
.len()
);
data_service::add_many(&mut db, new_data).await?;
if let Err(err) = batcher.send(new_data).await {
warn!("Error sending file insert data to batcher! {}", err);
};
}
Ok("Successfully inserted all!".to_string())
Ok("Successfully sent all to batcher!".to_string())
}
4 changes: 4 additions & 0 deletions scylla-server/src/db_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ impl DbHandler {
// max for batch is 65535/4 params per message, hence the below, rounded down with a margin for safety
// TODO avoid this code batch uploading the remainder messages as a new batch, combine it with another safely
let chunk_size = msgs.len() / ((msgs.len() / 16380) + 1);
if chunk_size == 0 {
warn!("Could not insert {} messages, chunk size zero!", msgs.len());
continue;
}
debug!("Batch uploading {} chunks in parrallel", msgs.len() / chunk_size);
for chunk in msgs.chunks(chunk_size).collect::<Vec<_>>() {
tokio::spawn(DbHandler::batch_upload(chunk.to_vec(), pool.clone()));
Expand Down
3 changes: 2 additions & 1 deletion scylla-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ async fn main() {
// spawn the database handler
task_tracker.spawn(
db_handler::DbHandler::new(mqtt_receive, db.clone(), cli.batch_upsert_time * 1000)
.handling_loop(db_send, token.clone()),
.handling_loop(db_send.clone(), token.clone()),
);
// spawn the database inserter, if we have it enabled
if !cli.disable_data_upload {
Expand Down Expand Up @@ -236,6 +236,7 @@ async fn main() {
)
// FILE INSERT
.route("/insert/file", post(file_insertion_controller::insert_file))
.layer(Extension(db_send))
.layer(DefaultBodyLimit::disable())
// for CORS handling
.layer(
Expand Down

0 comments on commit f0811ff

Please sign in to comment.