Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Diesel Performance Fixes, Batching Improvements, New Allocator #262

Merged
merged 16 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,510 changes: 913 additions & 597 deletions scylla-server/Cargo.lock

Large diffs are not rendered by default.

26 changes: 18 additions & 8 deletions scylla-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,35 @@ edition = "2021"
default-run = "scylla-server"

[dependencies]
diesel = { version = "2.2.4", features = ["postgres", "r2d2", "chrono"] }
diesel = { version = "2.2.4", features = ["postgres", "chrono"] }
pq-sys = { version = "0.6.3", features = ["bundled_without_openssl"] }
dotenvy = "0.15"
serde = "1.0.203"
protobuf-codegen = "3.7.1"
protobuf = { version = "3.7.1", features = ["with-bytes"] }
tokio = { version = "1.38.0", features = ["full", "tracing"] }
axum = { version = "0.7.5", features = ["multipart"] }
tower = { version = "0.4.13", features = ["timeout"] }
tower-http = { version = "0.5.2", features = ["cors", "trace"] }
socketioxide = { version = "0.14.0", features = ["tracing"] }
axum = { version = "0.8.1", features = ["multipart"] }
tower = { version = "0.5.2", features = ["timeout"] }
tower-http = { version = "0.6.2", features = ["cors", "trace"] }
socketioxide = { version = "0.15.1", features = ["tracing"] }
rumqttc = { git = "https://github.com/bytebeamio/rumqtt", branch = "main"}
tokio-util = { version= "0.7.11", features = ["full"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["ansi", "env-filter"] }
rand = "0.8.5"
console-subscriber = { version = "0.3.0", optional = true }
console-subscriber = { version = "0.4.1", optional = true }
ringbuffer = "0.15.0"
clap = { version = "4.5.11", features = ["derive", "env"] }
axum-extra = { version = "0.9.3", features = ["query"] }
axum-extra = { version = "0.10.0", features = ["query"] }
chrono = { version = "0.4.38", features = ["serde"] }
serde_json = "1.0.128"
diesel_migrations = { version = "2.2.0", features = ["postgres"] }
rangemap = "1.5.1"
axum-macros = "0.4.2"
axum-macros = "0.5.0"
diesel-async = { version = "0.5.2", features = ["postgres", "bb8", "async-connection-wrapper", "sync-connection-wrapper"] }
rustc-hash = "2.1.0"
[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = "0.6"

[features]
top = ["dep:console-subscriber"]
Expand All @@ -43,6 +47,12 @@ codegen-units = 1
panic = "abort"
strip = true # Automatically strip symbols from the binary.

[profile.profiling]
inherits = "release"
debug = true
strip = false

[[bin]]
name = "scylla-server"
path = "src/main.rs"

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ CREATE TABLE "run" (

-- CreateTable
CREATE TABLE "data" (
"values" DOUBLE PRECISION[],
"values" REAL [] NOT NULL check ("values" <> '{}' AND array_position("values", NULL) IS NULL),
"dataTypeName" TEXT NOT NULL,
"time" TIMESTAMPTZ NOT NULL,
"runId" INTEGER NOT NULL,
Expand Down
2 changes: 1 addition & 1 deletion scylla-server/src/controllers/data_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub async fn get_data(
State(pool): State<PoolHandle>,
Path((data_type_name, run_id)): Path<(String, i32)>,
) -> Result<Json<Vec<PublicData>>, ScyllaError> {
let mut db = pool.get()?;
let mut db = pool.get().await?;
let data = data_service::get_data(&mut db, data_type_name, run_id).await?;

// map data to frontend data types according to the From func of the client struct
Expand Down
2 changes: 1 addition & 1 deletion scylla-server/src/controllers/data_type_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
pub async fn get_all_data_types(
State(pool): State<PoolHandle>,
) -> Result<Json<Vec<PublicDataType>>, ScyllaError> {
let mut db = pool.get()?;
let mut db = pool.get().await?;
let data_types = data_type_service::get_all_data_types(&mut db).await?;

let transformed_data_types: Vec<PublicDataType> =
Expand Down
9 changes: 6 additions & 3 deletions scylla-server/src/controllers/file_insertion_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub async fn insert_file(
mut multipart: Multipart,
) -> Result<String, ScyllaError> {
// create a run ID cache
let mut db = pool.get()?;
let mut db = pool.get().await?;
debug!("Warming up run ID map!");
let mut run_iter = run_service::get_all_runs(&mut db)
.await?
Expand All @@ -45,9 +45,12 @@ pub async fn insert_file(

// iterate through all files
debug!("Converting file data to insertable data!");
while let Some(field) = multipart.next_field().await.unwrap() {
while let Ok(Some(field)) = multipart.next_field().await {
// round up all of the protobuf segments as a giant list
let data = field.bytes().await.unwrap();
let Ok(data) = field.bytes().await else {
warn!("Could not decode file insert, perhaps it was interrupted!");
continue;
};
let mut count_bad_run = 0usize;
let mut insertable_data: Vec<ClientData> = Vec::new();
{
Expand Down
6 changes: 3 additions & 3 deletions scylla-server/src/controllers/run_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
pub async fn get_all_runs(
State(pool): State<PoolHandle>,
) -> Result<Json<Vec<PublicRun>>, ScyllaError> {
let mut db = pool.get()?;
let mut db = pool.get().await?;
let run_data = run_service::get_all_runs(&mut db).await?;

let transformed_run_data: Vec<PublicRun> = run_data.into_iter().map(PublicRun::from).collect();
Expand All @@ -26,7 +26,7 @@ pub async fn get_run_by_id(
State(pool): State<PoolHandle>,
Path(run_id): Path<i32>,
) -> Result<Json<PublicRun>, ScyllaError> {
let mut db = pool.get()?;
let mut db = pool.get().await?;
let run_data = run_service::get_run_by_id(&mut db, run_id).await?;

if run_data.is_none() {
Expand All @@ -43,7 +43,7 @@ pub async fn get_run_by_id(
/// create a new run with an auto-incremented ID
/// note the new run must be updated so the channel passed in notifies the data processor to use the new run
pub async fn new_run(State(pool): State<PoolHandle>) -> Result<Json<PublicRun>, ScyllaError> {
let mut db = pool.get()?;
let mut db = pool.get().await?;
let run_data = run_service::create_run(&mut db, chrono::offset::Utc::now()).await?;

crate::RUN_ID.store(run_data.id, Ordering::Relaxed);
Expand Down
113 changes: 71 additions & 42 deletions scylla-server/src/db_handler.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use rustc_hash::FxHashSet;
use tokio::sync::mpsc::Receiver;

use tokio::{sync::mpsc::Sender, time::Duration};
Expand All @@ -12,19 +13,46 @@ use crate::{ClientData, PoolHandle, RUN_ID};
/// upserting of metadata for data, and batch uploading the database
pub struct DbHandler {
/// The list of data types seen by this instance, used for when to upsert
datatype_list: Vec<String>,
datatype_list: FxHashSet<String>,
bracyw marked this conversation as resolved.
Show resolved Hide resolved
/// The broadcast channel which provides serial datapoints for processing
receiver: Receiver<ClientData>,
/// The database pool handle
pool: PoolHandle,
/// the queue of data
data_queue: Vec<ClientData>,
/// the time since last batch
last_time: tokio::time::Instant,
/// upload interval
upload_interval: u64,
}

/// Chunks a vec into roughly equal vectors all under size `max_chunk_size`
/// This precomputes vec capacity but does however call to_vec(), reallocating the slices
fn chunk_vec<T: Clone>(input: Vec<T>, max_chunk_size: usize) -> Vec<Vec<T>> {
if max_chunk_size == 0 {
panic!("Maximum chunk size must be greater than zero");
}

let len = input.len();
if len == 0 {
return Vec::new();
}

// Calculate the number of chunks
let num_chunks = len.div_ceil(max_chunk_size);

// Recompute a balanced chunk size
let chunk_size = usize::max(1, len.div_ceil(num_chunks));

let mut result = Vec::with_capacity(num_chunks);
let mut start = 0;

while start < len {
let end = usize::min(start + chunk_size, len);
result.push(input[start..end].to_vec());
start = end;
}
result
}

impl DbHandler {
/// Make a new db handler
/// * `recv` - the broadcast reciver of which clientdata will be sent
Expand All @@ -34,11 +62,10 @@ impl DbHandler {
upload_interval: u64,
) -> DbHandler {
DbHandler {
datatype_list: vec![],
datatype_list: FxHashSet::default(),
receiver,
pool,
data_queue: vec![],
last_time: tokio::time::Instant::now(),
upload_interval,
}
}
Expand All @@ -54,23 +81,25 @@ impl DbHandler {
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
let Ok(mut database) = pool.get() else {
let Ok(mut database) = pool.get().await else {
warn!("Could not get connection for cleanup");
break;
};
// cleanup all remaining messages if batches start backing up
while let Some(final_msgs) = batch_queue.recv().await {
info!("{} batches remaining!", batch_queue.len()+1);
// do not spawn new tasks in this mode, see below comment for chunk_size math
let chunk_size = final_msgs.len() / ((final_msgs.len() / 16380) + 1);
if chunk_size == 0 {
warn!("Could not insert {} messages, chunk size zero!", final_msgs.len());
if final_msgs.is_empty() {
debug!("A batch of zero messages was sent!");
continue;
}
for chunk in final_msgs.chunks(chunk_size).collect::<Vec<_>>() {
let chunk_size = final_msgs.len() / ((final_msgs.len() / 8190) + 1);
let chunks = chunk_vec(final_msgs, chunk_size);
debug!("Batch uploading {} chunks in sequence", chunks.len());
for chunk in chunks {
info!(
"A cleanup chunk uploaded: {:?}",
data_service::add_many(&mut database, chunk.to_vec())
data_service::add_many(&mut database, chunk).await
);
}
}
Expand All @@ -81,17 +110,16 @@ impl DbHandler {
// libpq has max 65535 params, therefore batch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually never understood this fully. why is the batching logic needed for diesel... is it because it let's you try and insert more data then libpq can handle. Does prisma just manage this itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess prisma splits the queries as needed. I doubt they work around libpq as it's kinda the best way to communicate with postgres. It's a very annoying limit but kinda inherent to postgres.

// max for batch is 65535/4 params per message, hence the below, rounded down with a margin for safety
// TODO avoid this code batch uploading the remainder messages as a new batch, combine it with another safely
let chunk_size = msgs.len() / ((msgs.len() / 16380) + 1);
if chunk_size == 0 {
warn!("Could not insert {} messages, chunk size zero!", msgs.len());
if msgs.is_empty() {
debug!("A batch of zero messages was sent!");
continue;
}
debug!("Batch uploading {} chunks in parrallel", msgs.len() / chunk_size);
for chunk in msgs.chunks(chunk_size).collect::<Vec<_>>() {
let owned = chunk.to_vec();
let pool = pool.clone();
tokio::task::spawn_blocking(move || {
DbHandler::batch_upload(owned, pool)});
let msg_len = msgs.len();
let chunk_size = msg_len / ((msg_len / 8190) + 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you now dividing by 8190 instead of 16380, something special about dividing max params by 8 instead of 4?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh ya forgot to investigate that. The switch to diesel async doubled the number of instructions per insert. I'll do some investigating there. It's a very annoying limit.

let chunks = chunk_vec(msgs, chunk_size);
info!("Batch uploading {} chunks in parrallel, {} messages.", chunks.len(), msg_len);
for chunk in chunks {
tokio::spawn(DbHandler::batch_upload(chunk, pool.clone()));
}
debug!(
"DB send: {} of {}",
Expand Down Expand Up @@ -121,13 +149,13 @@ impl DbHandler {
}
}

//#[instrument(level = Level::DEBUG, skip(msg, pool))]
fn batch_upload(msg: Vec<ClientData>, pool: PoolHandle) {
let Ok(mut database) = pool.get() else {
#[instrument(level = Level::DEBUG, skip(msg, pool))]
async fn batch_upload(msg: Vec<ClientData>, pool: PoolHandle) {
let Ok(mut database) = pool.get().await else {
warn!("Could not get connection for batch upload!");
return;
};
match data_service::add_many(&mut database, msg) {
match data_service::add_many(&mut database, msg).await {
Ok(count) => info!("Batch uploaded: {:?}", count),
Err(err) => warn!("Error in batch upload: {:?}", err),
}
Expand All @@ -143,17 +171,31 @@ impl DbHandler {
data_channel: Sender<Vec<ClientData>>,
cancel_token: CancellationToken,
) {
let mut batch_interval = tokio::time::interval(Duration::from_millis(self.upload_interval));
// the match batch size to reasonably expect
jr1221 marked this conversation as resolved.
Show resolved Hide resolved
let mut max_batch_size = 2usize;
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
debug!("Pushing final messages to queue");
data_channel.send(self.data_queue.clone()).await.expect("Could not comm data to db thread, shutdown");
self.data_queue.clear();
data_channel.send(self.data_queue).await.expect("Could not comm data to db thread, shutdown");
break;
},
Some(msg) = self.receiver.recv() => {
self.handle_msg(msg, &data_channel).await;
}
_ = batch_interval.tick() => {
if !self.data_queue.is_empty() {
// set a new max if this batch is larger
max_batch_size = usize::max(max_batch_size, self.data_queue.len());
// mem::take allows us to assign the value of data queue vec::new() while maintaining the memory for data_channel ownership
jr1221 marked this conversation as resolved.
Show resolved Hide resolved
data_channel
.send(self.data_queue)
.await
.expect("Could not comm data to db thread");
self.data_queue = Vec::with_capacity((max_batch_size as f32 * 1.05) as usize);
}
}
}
}
}
Expand All @@ -166,21 +208,8 @@ impl DbHandler {
self.receiver.max_capacity()
);

// If the time is greater than upload interval, push to batch upload thread and clear queue
if tokio::time::Instant::now().duration_since(self.last_time)
> Duration::from_millis(self.upload_interval)
&& !self.data_queue.is_empty()
{
data_channel
.send(self.data_queue.clone())
.await
.expect("Could not comm data to db thread");
self.data_queue.clear();
self.last_time = tokio::time::Instant::now();
}

if !self.datatype_list.contains(&msg.name) {
let Ok(mut database) = self.pool.get() else {
let Ok(mut database) = self.pool.get().await else {
warn!("Could not get connection for dataType upsert");
return;
};
Expand All @@ -195,13 +224,13 @@ impl DbHandler {
{
warn!("DB error datatype upsert: {:?}", msg);
}
self.datatype_list.push(msg.name.clone());
self.datatype_list.insert(msg.name.clone());
}

// Check for GPS points, insert them into current run if available
if msg.name == "TPU/GPS/Location" {
debug!("Upserting run with location points!");
let Ok(mut database) = self.pool.get() else {
let Ok(mut database) = self.pool.get().await else {
warn!("Could not get connection for db points update");
return;
};
Expand Down
6 changes: 3 additions & 3 deletions scylla-server/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub enum ScyllaError {
/// Deseil error
DbError(diesel::result::Error),
/// Diesel db connection error,
ConnError(diesel::r2d2::PoolError),
ConnError(diesel_async::pooled_connection::bb8::RunError),
/// An instruction was not encodable
InvalidEncoding(String),
/// Could not communicate to car
Expand All @@ -23,8 +23,8 @@ impl From<diesel::result::Error> for ScyllaError {
}
}

impl From<diesel::r2d2::PoolError> for ScyllaError {
fn from(error: diesel::r2d2::PoolError) -> Self {
impl From<diesel_async::pooled_connection::bb8::RunError> for ScyllaError {
fn from(error: diesel_async::pooled_connection::bb8::RunError) -> Self {
ScyllaError::ConnError(error)
}
}
Expand Down
Loading
Loading