Skip to content

Commit

Permalink
Diesel Performance Fixes, Batching Improvements, New Allocator (#262)
Browse files Browse the repository at this point in the history
* refactor to diesel async

* remove 4 re-allocations by modifying DB and insertion layer

* implement better chunking logic, fix clippy

* switch to jemalloc

* fail gracefully on file insert, switch to hashset

hashset about 0.3% CPU time improvement

* vec with capacity: 0.3% speed increase

* precompute duration

* bump deps, fxhashmap

fxhasmap -> 0.25% perf

* fmt

* fix cargo.lock

* bump axum, fix clippy

* integration tests fix

* spruce up comments, fix dependency

* the max

---------

Co-authored-by: wyattb <[email protected]>
  • Loading branch information
jr1221 and bracyw authored Jan 7, 2025
1 parent 2b4a982 commit 86cf64c
Show file tree
Hide file tree
Showing 24 changed files with 1,230 additions and 792 deletions.
1,510 changes: 913 additions & 597 deletions scylla-server/Cargo.lock

Large diffs are not rendered by default.

26 changes: 18 additions & 8 deletions scylla-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,35 @@ edition = "2021"
default-run = "scylla-server"

[dependencies]
diesel = { version = "2.2.4", features = ["postgres", "r2d2", "chrono"] }
diesel = { version = "2.2.4", features = ["postgres", "chrono"] }
pq-sys = { version = "0.6.3", features = ["bundled_without_openssl"] }
dotenvy = "0.15"
serde = "1.0.203"
protobuf-codegen = "3.7.1"
protobuf = { version = "3.7.1", features = ["with-bytes"] }
tokio = { version = "1.38.0", features = ["full", "tracing"] }
axum = { version = "0.7.5", features = ["multipart"] }
tower = { version = "0.4.13", features = ["timeout"] }
tower-http = { version = "0.5.2", features = ["cors", "trace"] }
socketioxide = { version = "0.14.0", features = ["tracing"] }
axum = { version = "0.8.1", features = ["multipart"] }
tower = { version = "0.5.2", features = ["timeout"] }
tower-http = { version = "0.6.2", features = ["cors", "trace"] }
socketioxide = { version = "0.15.1", features = ["tracing"] }
rumqttc = { git = "https://github.com/bytebeamio/rumqtt", branch = "main"}
tokio-util = { version= "0.7.11", features = ["full"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["ansi", "env-filter"] }
rand = "0.8.5"
console-subscriber = { version = "0.3.0", optional = true }
console-subscriber = { version = "0.4.1", optional = true }
ringbuffer = "0.15.0"
clap = { version = "4.5.11", features = ["derive", "env"] }
axum-extra = { version = "0.9.3", features = ["query"] }
axum-extra = { version = "0.10.0", features = ["query"] }
chrono = { version = "0.4.38", features = ["serde"] }
serde_json = "1.0.128"
diesel_migrations = { version = "2.2.0", features = ["postgres"] }
rangemap = "1.5.1"
axum-macros = "0.4.2"
axum-macros = "0.5.0"
diesel-async = { version = "0.5.2", features = ["postgres", "bb8", "async-connection-wrapper", "sync-connection-wrapper", "tokio"] }
rustc-hash = "2.1.0"
[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = "0.6"

[features]
top = ["dep:console-subscriber"]
Expand All @@ -43,6 +47,12 @@ codegen-units = 1
panic = "abort"
strip = true # Automatically strip symbols from the binary.

[profile.profiling]
inherits = "release"
debug = true
strip = false

[[bin]]
name = "scylla-server"
path = "src/main.rs"

44 changes: 33 additions & 11 deletions scylla-server/integration_test.sh
Original file line number Diff line number Diff line change
@@ -1,15 +1,37 @@
#!/bin/sh
echo "Starting db"
cd ../compose
docker compose up -d odyssey-timescale

cd ../scylla-server
echo "Migrating DB"
diesel migration run
# Navigate to the compose directory
echo "Navigating to compose directory..."
cd ../compose || { echo "Compose directory not found"; exit 1; }

echo "Running tests"
DATABASE_URL=postgresql://postgres:[email protected]:5432/postgres cargo test -- --test-threads=1
# Remove any existing odyssey-timescale container
echo "Stopping and removing any existing odyssey-timescale container..."
docker rm -f odyssey-timescale 2>/dev/null || echo "No existing container to remove."

echo "Exiting"
cd ../compose
docker compose down
# Start a new odyssey-timescale container
echo "Starting a new odyssey-timescale container..."
docker compose up -d odyssey-timescale || { echo "Failed to start odyssey-timescale"; exit 1; }

# Wait for the database to initialize
echo "Waiting for the database to initialize..."
sleep 3

# Navigate to the scylla-server directory
cd ../scylla-server || { echo "scylla-server directory not found"; exit 1; }

# Run database migrations
echo "Running database migrations..."
DATABASE_URL=postgresql://postgres:[email protected]:5432/postgres diesel migration run || { echo "Migration failed"; exit 1; }

# Run tests
echo "Running tests..."
DATABASE_URL=postgresql://postgres:[email protected]:5432/postgres cargo test -- --test-threads=1 || { echo "Tests failed"; exit 1; }

# Navigate back to the compose directory
cd ../compose || { echo "Compose directory not found"; exit 1; }

# Stop and clean up containers
echo "Stopping and cleaning up containers..."
docker compose down || { echo "Failed to clean up containers"; exit 1; }

echo "Script completed successfully!"
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ CREATE TABLE "run" (

-- CreateTable
CREATE TABLE "data" (
"values" DOUBLE PRECISION[],
"values" REAL [] NOT NULL check ("values" <> '{}' AND array_position("values", NULL) IS NULL),
"dataTypeName" TEXT NOT NULL,
"time" TIMESTAMPTZ NOT NULL,
"runId" INTEGER NOT NULL,
Expand Down
2 changes: 1 addition & 1 deletion scylla-server/src/controllers/data_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub async fn get_data(
State(pool): State<PoolHandle>,
Path((data_type_name, run_id)): Path<(String, i32)>,
) -> Result<Json<Vec<PublicData>>, ScyllaError> {
let mut db = pool.get()?;
let mut db = pool.get().await?;
let data = data_service::get_data(&mut db, data_type_name, run_id).await?;

// map data to frontend data types according to the From func of the client struct
Expand Down
2 changes: 1 addition & 1 deletion scylla-server/src/controllers/data_type_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
pub async fn get_all_data_types(
State(pool): State<PoolHandle>,
) -> Result<Json<Vec<PublicDataType>>, ScyllaError> {
let mut db = pool.get()?;
let mut db = pool.get().await?;
let data_types = data_type_service::get_all_data_types(&mut db).await?;

let transformed_data_types: Vec<PublicDataType> =
Expand Down
9 changes: 6 additions & 3 deletions scylla-server/src/controllers/file_insertion_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub async fn insert_file(
mut multipart: Multipart,
) -> Result<String, ScyllaError> {
// create a run ID cache
let mut db = pool.get()?;
let mut db = pool.get().await?;
debug!("Warming up run ID map!");
let mut run_iter = run_service::get_all_runs(&mut db)
.await?
Expand All @@ -45,9 +45,12 @@ pub async fn insert_file(

// iterate through all files
debug!("Converting file data to insertable data!");
while let Some(field) = multipart.next_field().await.unwrap() {
while let Ok(Some(field)) = multipart.next_field().await {
// round up all of the protobuf segments as a giant list
let data = field.bytes().await.unwrap();
let Ok(data) = field.bytes().await else {
warn!("Could not decode file insert, perhaps it was interrupted!");
continue;
};
let mut count_bad_run = 0usize;
let mut insertable_data: Vec<ClientData> = Vec::new();
{
Expand Down
6 changes: 3 additions & 3 deletions scylla-server/src/controllers/run_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
pub async fn get_all_runs(
State(pool): State<PoolHandle>,
) -> Result<Json<Vec<PublicRun>>, ScyllaError> {
let mut db = pool.get()?;
let mut db = pool.get().await?;
let run_data = run_service::get_all_runs(&mut db).await?;

let transformed_run_data: Vec<PublicRun> = run_data.into_iter().map(PublicRun::from).collect();
Expand All @@ -26,7 +26,7 @@ pub async fn get_run_by_id(
State(pool): State<PoolHandle>,
Path(run_id): Path<i32>,
) -> Result<Json<PublicRun>, ScyllaError> {
let mut db = pool.get()?;
let mut db = pool.get().await?;
let run_data = run_service::get_run_by_id(&mut db, run_id).await?;

if run_data.is_none() {
Expand All @@ -43,7 +43,7 @@ pub async fn get_run_by_id(
/// create a new run with an auto-incremented ID
/// note the new run must be updated so the channel passed in notifies the data processor to use the new run
pub async fn new_run(State(pool): State<PoolHandle>) -> Result<Json<PublicRun>, ScyllaError> {
let mut db = pool.get()?;
let mut db = pool.get().await?;
let run_data = run_service::create_run(&mut db, chrono::offset::Utc::now()).await?;

crate::RUN_ID.store(run_data.id, Ordering::Relaxed);
Expand Down
Loading

0 comments on commit 86cf64c

Please sign in to comment.