From 53aea54ca95a58f2508acfbb104eccf41c0517cc Mon Sep 17 00:00:00 2001 From: Jack Rubacha Date: Tue, 7 Jan 2025 11:00:43 -0500 Subject: [PATCH] mvp sync deadpool with copy_many --- scylla-server/Cargo.lock | 280 ++++-------------- scylla-server/Cargo.toml | 2 +- .../src/controllers/data_controller.rs | 4 +- .../src/controllers/data_type_controller.rs | 4 +- .../controllers/file_insertion_controller.rs | 4 +- .../src/controllers/run_controller.rs | 12 +- scylla-server/src/db_handler.rs | 64 +--- scylla-server/src/error.rs | 16 +- scylla-server/src/lib.rs | 5 +- scylla-server/src/main.rs | 52 ++-- scylla-server/src/models.rs | 1 + scylla-server/src/services/data_service.rs | 78 +++-- .../src/services/data_type_service.rs | 32 +- scylla-server/src/services/mod.rs | 18 ++ scylla-server/src/services/run_service.rs | 65 ++-- 15 files changed, 235 insertions(+), 402 deletions(-) diff --git a/scylla-server/Cargo.lock b/scylla-server/Cargo.lock index 81f6587d..8b9c464d 100755 --- a/scylla-server/Cargo.lock +++ b/scylla-server/Cargo.lock @@ -330,18 +330,6 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" -[[package]] -name = "bb8" -version = "0.8.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d89aabfae550a5c44b43ab941844ffcd2e993cb6900b342debf59e9ea74acdb8" -dependencies = [ - "async-trait", - "futures-util", - "parking_lot", - "tokio", -] - [[package]] name = "bindgen" version = "0.69.5" @@ -647,6 +635,47 @@ version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" +[[package]] +name = "deadpool" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6541a3916932fe57768d4be0b1ffb5ec7cbf74ca8c903fdfd5c0fe8aa958f0ed" +dependencies = [ + "deadpool-runtime", + "num_cpus", + "tokio", +] + +[[package]] +name = "deadpool-diesel" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "590573e9e29c5190a5ff782136f871e6e652e35d598a349888e028693601adf1" +dependencies = [ + "deadpool", + "deadpool-sync", + "diesel", +] + +[[package]] +name = "deadpool-runtime" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b" +dependencies = [ + "tokio", +] + +[[package]] +name = "deadpool-sync" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524bc3df0d57e98ecd022e21ba31166c2625e7d3e5bcc4510efaeeab4abcab04" +dependencies = [ + "deadpool-runtime", + "tracing", +] + [[package]] name = "diesel" version = "2.2.6" @@ -661,21 +690,6 @@ dependencies = [ "pq-sys", ] -[[package]] -name = "diesel-async" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51a307ac00f7c23f526a04a77761a0519b9f0eb2838ebf5b905a58580095bdcb" -dependencies = [ - "async-trait", - "bb8", - "diesel", - "futures-util", - "scoped-futures", - "tokio", - "tokio-postgres", -] - [[package]] name = "diesel_derives" version = "2.2.3" @@ -717,7 +731,6 @@ checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer", "crypto-common", - "subtle", ] [[package]] @@ -805,12 +818,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "fallible-iterator" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" - [[package]] name = "fastrand" version = "2.3.0" @@ -872,7 +879,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", - "futures-sink", ] [[package]] @@ -1015,13 +1021,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] -name = "hmac" -version = "0.12.1" +name = "hermit-abi" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" -dependencies = [ - "digest", -] +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" [[package]] name = "home" @@ -1306,16 +1309,6 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" -[[package]] -name = "md-5" -version = "0.10.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" -dependencies = [ - "cfg-if", - "digest", -] - [[package]] name = "memchr" version = "2.7.4" @@ -1421,6 +1414,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "object" version = "0.36.7" @@ -1483,24 +1486,6 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" -[[package]] -name = "phf" -version = "0.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ade2d8b8f33c7333b51bcf0428d37e217e9f32192ae4772156f65063b8ce03dc" -dependencies = [ - "phf_shared", -] - -[[package]] -name = "phf_shared" -version = "0.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b" -dependencies = [ - "siphasher", -] - [[package]] name = "pin-project" version = "1.1.7" @@ -1533,35 +1518,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" -[[package]] -name = "postgres-protocol" -version = "0.6.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acda0ebdebc28befa84bee35e651e4c5f09073d668c7aed4cf7e23c3cda84b23" -dependencies = [ - "base64 0.22.1", - "byteorder", - "bytes", - "fallible-iterator", - "hmac", - "md-5", - "memchr", - "rand", - "sha2", - "stringprep", -] - -[[package]] -name = "postgres-types" -version = "0.2.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f66ea23a2d0e5734297357705193335e0a957696f34bed2f2faefacb2fec336f" -dependencies = [ - "bytes", - "fallible-iterator", - "postgres-protocol", -] - [[package]] name = "ppv-lite86" version = "0.2.20" @@ -1938,15 +1894,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "scoped-futures" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b24aae2d0636530f359e9d5ef0c04669d11c5e756699b27a6a6d845d8329091" -dependencies = [ - "pin-project-lite", -] - [[package]] name = "scopeguard" version = "1.2.0" @@ -1963,8 +1910,8 @@ dependencies = [ "chrono", "clap", "console-subscriber", + "deadpool-diesel", "diesel", - "diesel-async", "diesel_migrations", "dotenvy", "pq-sys", @@ -2097,17 +2044,6 @@ dependencies = [ "digest", ] -[[package]] -name = "sha2" -version = "0.10.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" -dependencies = [ - "cfg-if", - "cpufeatures", - "digest", -] - [[package]] name = "sharded-slab" version = "0.1.7" @@ -2132,12 +2068,6 @@ dependencies = [ "libc", ] -[[package]] -name = "siphasher" -version = "0.3.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" - [[package]] name = "slab" version = "0.4.9" @@ -2223,17 +2153,6 @@ dependencies = [ "lock_api", ] -[[package]] -name = "stringprep" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b4df3d392d81bd458a8a621b8bffbd2302a12ffe288a9d931670948749463b1" -dependencies = [ - "unicode-bidi", - "unicode-normalization", - "unicode-properties", -] - [[package]] name = "strsim" version = "0.11.1" @@ -2347,21 +2266,6 @@ dependencies = [ "tikv-jemalloc-sys", ] -[[package]] -name = "tinyvec" -version = "1.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "022db8904dfa342efe721985167e9fcd16c29b226db4397ed752a761cfce81e8" -dependencies = [ - "tinyvec_macros", -] - -[[package]] -name = "tinyvec_macros" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" - [[package]] name = "tokio" version = "1.42.0" @@ -2392,32 +2296,6 @@ dependencies = [ "syn", ] -[[package]] -name = "tokio-postgres" -version = "0.7.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b5d3742945bc7d7f210693b0c58ae542c6fd47b17adbbda0885f3dcb34a6bdb" -dependencies = [ - "async-trait", - "byteorder", - "bytes", - "fallible-iterator", - "futures-channel", - "futures-util", - "log", - "parking_lot", - "percent-encoding", - "phf", - "pin-project-lite", - "postgres-protocol", - "postgres-types", - "rand", - "socket2", - "tokio", - "tokio-util", - "whoami", -] - [[package]] name = "tokio-rustls" version = "0.26.1" @@ -2688,33 +2566,12 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" -[[package]] -name = "unicode-bidi" -version = "0.3.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c1cb5db39152898a79168971543b1cb5020dff7fe43c8dc468b0885f5e29df5" - [[package]] name = "unicode-ident" version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83" -[[package]] -name = "unicode-normalization" -version = "0.1.24" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5033c97c4262335cded6d6fc3e5c18ab755e1a3dc96376350f3d8e9f009ad956" -dependencies = [ - "tinyvec", -] - -[[package]] -name = "unicode-properties" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e70f2a8b45122e719eb623c01822704c4e0907e7e426a05927e1a1cfff5b75d0" - [[package]] name = "untrusted" version = "0.9.0" @@ -2766,12 +2623,6 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" -[[package]] -name = "wasite" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" - [[package]] name = "wasm-bindgen" version = "0.2.99" @@ -2826,16 +2677,6 @@ version = "0.2.99" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "943aab3fdaaa029a6e0271b35ea10b72b943135afe9bffca82384098ad0e06a6" -[[package]] -name = "web-sys" -version = "0.3.76" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04dd7223427d52553d3702c004d3b2fe07c148165faa56313cb00211e31c12bc" -dependencies = [ - "js-sys", - "wasm-bindgen", -] - [[package]] name = "which" version = "4.4.2" @@ -2848,17 +2689,6 @@ dependencies = [ "rustix", ] -[[package]] -name = "whoami" -version = "1.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "372d5b87f58ec45c384ba03563b03544dc5fadc3983e434b286913f5b4a9bb6d" -dependencies = [ - "redox_syscall", - "wasite", - "web-sys", -] - [[package]] name = "winapi" version = "0.3.9" diff --git a/scylla-server/Cargo.toml b/scylla-server/Cargo.toml index 7881327c..4a37a335 100644 --- a/scylla-server/Cargo.toml +++ b/scylla-server/Cargo.toml @@ -30,8 +30,8 @@ serde_json = "1.0.128" diesel_migrations = { version = "2.2.0", features = ["postgres"] } rangemap = "1.5.1" axum-macros = "0.5.0" -diesel-async = { version = "0.5.2", features = ["postgres", "bb8", "async-connection-wrapper", "sync-connection-wrapper", "tokio"] } rustc-hash = "2.1.0" +deadpool-diesel = { version = "0.6.1", features = ["rt_tokio_1", "postgres", "tracing"] } [target.'cfg(not(target_env = "msvc"))'.dependencies] tikv-jemallocator = "0.6" diff --git a/scylla-server/src/controllers/data_controller.rs b/scylla-server/src/controllers/data_controller.rs index 837fcf1a..886942e1 100644 --- a/scylla-server/src/controllers/data_controller.rs +++ b/scylla-server/src/controllers/data_controller.rs @@ -13,8 +13,8 @@ pub async fn get_data( State(pool): State, Path((data_type_name, run_id)): Path<(String, i32)>, ) -> Result>, ScyllaError> { - let mut db = pool.get().await?; - let data = data_service::get_data(&mut db, data_type_name, run_id).await?; + let db = pool.get().await?; + let data = data_service::get_data(db, data_type_name, run_id).await?; // map data to frontend data types according to the From func of the client struct let mut transformed_data: Vec = data.into_iter().map(PublicData::from).collect(); diff --git a/scylla-server/src/controllers/data_type_controller.rs b/scylla-server/src/controllers/data_type_controller.rs index ce958131..84fa9b50 100644 --- a/scylla-server/src/controllers/data_type_controller.rs +++ b/scylla-server/src/controllers/data_type_controller.rs @@ -9,8 +9,8 @@ use crate::{ pub async fn get_all_data_types( State(pool): State, ) -> Result>, ScyllaError> { - let mut db = pool.get().await?; - let data_types = data_type_service::get_all_data_types(&mut db).await?; + let db = pool.get().await?; + let data_types = data_type_service::get_all_data_types(db).await?; let transformed_data_types: Vec = data_types.into_iter().map(PublicDataType::from).collect(); diff --git a/scylla-server/src/controllers/file_insertion_controller.rs b/scylla-server/src/controllers/file_insertion_controller.rs index 3896ebcb..c4e1c52d 100644 --- a/scylla-server/src/controllers/file_insertion_controller.rs +++ b/scylla-server/src/controllers/file_insertion_controller.rs @@ -21,9 +21,9 @@ pub async fn insert_file( mut multipart: Multipart, ) -> Result { // create a run ID cache - let mut db = pool.get().await?; + let db = pool.get().await?; debug!("Warming up run ID map!"); - let mut run_iter = run_service::get_all_runs(&mut db) + let mut run_iter = run_service::get_all_runs(db) .await? .into_iter() .map(|f| (f.id, f.time.timestamp_micros() as u64)) diff --git a/scylla-server/src/controllers/run_controller.rs b/scylla-server/src/controllers/run_controller.rs index bade8d37..eb2255d8 100644 --- a/scylla-server/src/controllers/run_controller.rs +++ b/scylla-server/src/controllers/run_controller.rs @@ -13,8 +13,8 @@ use crate::{ pub async fn get_all_runs( State(pool): State, ) -> Result>, ScyllaError> { - let mut db = pool.get().await?; - let run_data = run_service::get_all_runs(&mut db).await?; + let db = pool.get().await?; + let run_data = run_service::get_all_runs(db).await?; let transformed_run_data: Vec = run_data.into_iter().map(PublicRun::from).collect(); @@ -26,8 +26,8 @@ pub async fn get_run_by_id( State(pool): State, Path(run_id): Path, ) -> Result, ScyllaError> { - let mut db = pool.get().await?; - let run_data = run_service::get_run_by_id(&mut db, run_id).await?; + let db = pool.get().await?; + let run_data = run_service::get_run_by_id(db, run_id).await?; if run_data.is_none() { return Err(ScyllaError::EmptyResult); @@ -43,8 +43,8 @@ pub async fn get_run_by_id( /// create a new run with an auto-incremented ID /// note the new run must be updated so the channel passed in notifies the data processor to use the new run pub async fn new_run(State(pool): State) -> Result, ScyllaError> { - let mut db = pool.get().await?; - let run_data = run_service::create_run(&mut db, chrono::offset::Utc::now()).await?; + let db = pool.get().await?; + let run_data = run_service::create_run(db, chrono::offset::Utc::now()).await?; crate::RUN_ID.store(run_data.id, Ordering::Relaxed); tracing::info!( diff --git a/scylla-server/src/db_handler.rs b/scylla-server/src/db_handler.rs index 2056acbc..55e0bb80 100644 --- a/scylla-server/src/db_handler.rs +++ b/scylla-server/src/db_handler.rs @@ -24,35 +24,6 @@ pub struct DbHandler { upload_interval: u64, } -/// Chunks a vec into roughly equal vectors all under size `max_chunk_size` -/// This precomputes vec capacity but does however call to_vec(), reallocating the slices -fn chunk_vec(input: Vec, max_chunk_size: usize) -> Vec> { - if max_chunk_size == 0 { - panic!("Maximum chunk size must be greater than zero"); - } - - let len = input.len(); - if len == 0 { - return Vec::new(); - } - - // Calculate the number of chunks - let num_chunks = len.div_ceil(max_chunk_size); - - // Recompute a balanced chunk size - let chunk_size = usize::max(1, len.div_ceil(num_chunks)); - - let mut result = Vec::with_capacity(num_chunks); - let mut start = 0; - - while start < len { - let end = usize::min(start + chunk_size, len); - result.push(input[start..end].to_vec()); - start = end; - } - result -} - impl DbHandler { /// Make a new db handler /// * `recv` - the broadcast reciver of which clientdata will be sent @@ -81,10 +52,6 @@ impl DbHandler { loop { tokio::select! { _ = cancel_token.cancelled() => { - let Ok(mut database) = pool.get().await else { - warn!("Could not get connection for cleanup"); - break; - }; // cleanup all remaining messages if batches start backing up while let Some(final_msgs) = batch_queue.recv().await { info!("{} batches remaining!", batch_queue.len()+1); @@ -93,15 +60,14 @@ impl DbHandler { debug!("A batch of zero messages was sent!"); continue; } - let chunk_size = final_msgs.len() / ((final_msgs.len() / 8190) + 1); - let chunks = chunk_vec(final_msgs, chunk_size); - debug!("Batch uploading {} chunks in sequence", chunks.len()); - for chunk in chunks { + let Ok(database) = pool.get().await else { + warn!("Could not get connection for cleanup"); + break; + }; info!( "A cleanup chunk uploaded: {:?}", - data_service::add_many(&mut database, chunk).await + data_service::copy_many(database, final_msgs).await.map_err(|_| "Error!") ); - } } info!("No more messages to cleanup."); break; @@ -115,12 +81,8 @@ impl DbHandler { continue; } let msg_len = msgs.len(); - let chunk_size = msg_len / ((msg_len / 8190) + 1); - let chunks = chunk_vec(msgs, chunk_size); - info!("Batch uploading {} chunks in parrallel, {} messages.", chunks.len(), msg_len); - for chunk in chunks { - tokio::spawn(DbHandler::batch_upload(chunk, pool.clone())); - } + info!("Batch uploading {} messages.", msg_len); + tokio::spawn(DbHandler::batch_upload(msgs, pool.clone())); debug!( "DB send: {} of {}", batch_queue.len(), @@ -151,11 +113,11 @@ impl DbHandler { #[instrument(level = Level::DEBUG, skip(msg, pool))] async fn batch_upload(msg: Vec, pool: PoolHandle) { - let Ok(mut database) = pool.get().await else { + let Ok(database) = pool.get().await else { warn!("Could not get connection for batch upload!"); return; }; - match data_service::add_many(&mut database, msg).await { + match data_service::copy_many(database, msg).await { Ok(count) => info!("Batch uploaded: {:?}", count), Err(err) => warn!("Error in batch upload: {:?}", err), } @@ -209,13 +171,13 @@ impl DbHandler { ); if !self.datatype_list.contains(&msg.name) { - let Ok(mut database) = self.pool.get().await else { + let Ok(database) = self.pool.get().await else { warn!("Could not get connection for dataType upsert"); return; }; info!("Upserting data type: {}", msg.name); if let Err(msg) = data_type_service::upsert_data_type( - &mut database, + database, msg.name.clone(), msg.unit.clone(), msg.node.clone(), @@ -230,7 +192,7 @@ impl DbHandler { // Check for GPS points, insert them into current run if available if msg.name == "TPU/GPS/Location" { debug!("Upserting run with location points!"); - let Ok(mut database) = self.pool.get().await else { + let Ok(database) = self.pool.get().await else { warn!("Could not get connection for db points update"); return; }; @@ -238,7 +200,7 @@ impl DbHandler { if msg.values.len() < 2 { warn!("GPS message found without both lat and long!"); } else if let Err(err) = run_service::update_run_with_coords( - &mut database, + database, RUN_ID.load(std::sync::atomic::Ordering::Relaxed), msg.values[0].into(), msg.values[1].into(), diff --git a/scylla-server/src/error.rs b/scylla-server/src/error.rs index e7bf009c..2ed3120d 100644 --- a/scylla-server/src/error.rs +++ b/scylla-server/src/error.rs @@ -4,11 +4,13 @@ use axum::{ }; use tracing::warn; +use crate::services; + pub enum ScyllaError { /// Deseil error - DbError(diesel::result::Error), + DbError(services::DbError), /// Diesel db connection error, - ConnError(diesel_async::pooled_connection::bb8::RunError), + ConnError(deadpool_diesel::PoolError), /// An instruction was not encodable InvalidEncoding(String), /// Could not communicate to car @@ -17,14 +19,14 @@ pub enum ScyllaError { EmptyResult, } -impl From for ScyllaError { - fn from(error: diesel::result::Error) -> Self { +impl From for ScyllaError { + fn from(error: services::DbError) -> Self { ScyllaError::DbError(error) } } -impl From for ScyllaError { - fn from(error: diesel_async::pooled_connection::bb8::RunError) -> Self { +impl From for ScyllaError { + fn from(error: deadpool_diesel::PoolError) -> Self { ScyllaError::ConnError(error) } } @@ -39,7 +41,7 @@ impl IntoResponse for ScyllaError { ), ScyllaError::DbError(error) => ( StatusCode::BAD_REQUEST, - format!("Misc query error: {}", error), + format!("Misc query error: {:?}", error), ), ScyllaError::InvalidEncoding(reason) => (StatusCode::UNPROCESSABLE_ENTITY, reason), ScyllaError::CommFailure(reason) => (StatusCode::BAD_GATEWAY, reason), diff --git a/scylla-server/src/lib.rs b/scylla-server/src/lib.rs index 47cff620..142924f4 100644 --- a/scylla-server/src/lib.rs +++ b/scylla-server/src/lib.rs @@ -19,10 +19,9 @@ pub mod serverdata; pub mod transformers; /// The type descriptor of the database passed to the middlelayer through axum state -pub type Database<'a> = - diesel_async::pooled_connection::bb8::PooledConnection<'a, diesel_async::AsyncPgConnection>; +pub type Database = deadpool_diesel::postgres::Connection; -pub type PoolHandle = diesel_async::pooled_connection::bb8::Pool; +pub type PoolHandle = deadpool_diesel::Pool>; #[derive(clap::ValueEnum, Debug, PartialEq, Copy, Clone, Default)] #[clap(rename_all = "kebab_case")] diff --git a/scylla-server/src/main.rs b/scylla-server/src/main.rs index 6d8a92d6..b2fb9765 100755 --- a/scylla-server/src/main.rs +++ b/scylla-server/src/main.rs @@ -10,11 +10,7 @@ use axum::{ Extension, Router, }; use clap::Parser; -use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; -use diesel_async::{ - pooled_connection::{bb8::Pool, AsyncDieselConnectionManager}, - AsyncConnection, AsyncPgConnection, -}; +use deadpool_diesel::postgres::{Manager, Pool}; use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; use dotenvy::dotenv; use rumqttc::v5::AsyncClient; @@ -30,7 +26,7 @@ use scylla_server::{ use scylla_server::{ db_handler, mqtt_processor::{MqttProcessor, MqttProcessorOptions}, - ClientData, RUN_ID, + ClientData, PoolHandle, RUN_ID, }; use socketioxide::{extract::SocketRef, SocketIo}; use tokio::{signal, sync::mpsc}; @@ -143,30 +139,25 @@ async fn main() -> Result<(), Box> { } dotenv().ok(); - let db_url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be specified"); + info!("Beginning DB migration..."); + let manager: deadpool_diesel::Manager = Manager::new( + std::env::var("DATABASE_URL").unwrap(), + deadpool_diesel::Runtime::Tokio1, + ); + let pool: PoolHandle = Pool::builder(manager) + .build() + .expect("Could not build pool"); - info!("Beginning DB migration w/ temporary connection..."); - // it is best to create a temporary unmanaged connection to run the migrations - // a completely new set of connections is created by the pool manager because it cannot understand an already established connection - let conn: AsyncPgConnection = AsyncPgConnection::establish(&db_url).await?; - let mut async_wrapper: AsyncConnectionWrapper = - AsyncConnectionWrapper::from(conn); - tokio::task::spawn_blocking(move || { - async_wrapper.run_pending_migrations(MIGRATIONS).unwrap(); - }) - .await?; + let conn = pool.get().await.unwrap(); + let res = conn + .interact(|conn| conn.run_pending_migrations(MIGRATIONS).err()) + .await + .expect("Could not migrate DB!"); + if res.is_some() { + panic!("Could not migrate DB!") + } info!("Successfully migrated DB!"); - info!("Initializing database connections..."); - let manager = AsyncDieselConnectionManager::::new(db_url); - let pool: Pool = Pool::builder() - .max_size(10) - .min_idle(Some(2)) - .max_lifetime(Some(Duration::from_secs(60 * 60 * 24))) - .idle_timeout(Some(Duration::from_secs(60 * 2))) - .build(manager) - .await?; - // create the socket stuff let (socket_layer, io) = SocketIo::builder() .max_buffer_size(4096) // TODO tune values @@ -210,10 +201,9 @@ async fn main() -> Result<(), Box> { } // creates the initial run - let curr_run = - run_service::create_run(&mut pool.get().await.unwrap(), chrono::offset::Utc::now()) - .await - .expect("Could not create initial run!"); + let curr_run = run_service::create_run(pool.get().await.unwrap(), chrono::offset::Utc::now()) + .await + .unwrap(); debug!("Configuring current run: {:?}", curr_run); RUN_ID.store(curr_run.id, Ordering::Relaxed); diff --git a/scylla-server/src/models.rs b/scylla-server/src/models.rs index 0babfc30..3023f338 100644 --- a/scylla-server/src/models.rs +++ b/scylla-server/src/models.rs @@ -23,6 +23,7 @@ pub struct Data { #[derive(Insertable)] #[diesel(table_name = crate::schema::data)] #[diesel(belongs_to(DataType, foreign_key = dataTypeName))] +#[diesel(treat_none_as_default_value = false)] #[diesel(check_for_backend(diesel::pg::Pg))] #[diesel(primary_key(dataTypeName, time))] pub struct DataInsert { diff --git a/scylla-server/src/services/data_service.rs b/scylla-server/src/services/data_service.rs index 35428e17..ba6e6618 100644 --- a/scylla-server/src/services/data_service.rs +++ b/scylla-server/src/services/data_service.rs @@ -1,10 +1,12 @@ +use diesel::prelude::*; + use crate::{ models::{Data, DataInsert}, schema::data::dsl::*, ClientData, Database, }; -use diesel::prelude::*; -use diesel_async::RunQueryDsl; + +use super::DbError; /// Get datapoints that mach criteria /// * `db` - The database connection to use @@ -12,13 +14,16 @@ use diesel_async::RunQueryDsl; /// * `run_id` - The run id to filter the data /// returns: A result containing the data or the error propogated by the db pub async fn get_data( - db: &mut Database<'_>, + db: Database, data_type_name: String, run_id: i32, -) -> Result, diesel::result::Error> { - data.filter(runId.eq(run_id).and(dataTypeName.eq(data_type_name))) - .load(db) - .await +) -> Result, DbError> { + Ok(db + .interact(move |conn| { + data.filter(runId.eq(run_id).and(dataTypeName.eq(data_type_name))) + .load::(conn) + }) + .await??) } /// Adds a datapoint @@ -28,28 +33,43 @@ pub async fn get_data( /// * `data_type_name` - The name of the data type, note this data type must already exist! /// * `rin_id` - The run id to assign the data point to, note this run must already exist! /// returns: A result containing the data or the QueryError propogated by the db -pub async fn add_data( - db: &mut Database<'_>, - client_data: ClientData, -) -> Result { - diesel::insert_into(data) - .values(Into::::into(client_data)) - .get_result(db) - .await +pub async fn add_data(db: Database, client_data: ClientData) -> Result { + Ok(db + .interact(move |conn| { + diesel::insert_into(data) + .values(Into::::into(client_data)) + .get_result(conn) + }) + .await??) +} + +pub async fn add_many(db: Database, client_data: Vec) -> Result { + Ok(db + .interact(move |conn| { + diesel::insert_into(data) + .values( + client_data + .into_iter() + .map(Into::::into) + .collect::>(), + ) + .on_conflict_do_nothing() + .execute(conn) + }) + .await??) } -pub async fn add_many( - db: &mut Database<'_>, - client_data: Vec, -) -> Result { - diesel::insert_into(data) - .values( - client_data - .into_iter() - .map(Into::::into) - .collect::>(), - ) - .on_conflict_do_nothing() - .execute(db) - .await +pub async fn copy_many(db: Database, client_data: Vec) -> Result { + Ok(db + .interact(move |conn| { + diesel::copy_from(data) + .from_insertable( + client_data + .into_iter() + .map(Into::::into) + .collect::>(), + ) + .execute(conn) + }) + .await??) } diff --git a/scylla-server/src/services/data_type_service.rs b/scylla-server/src/services/data_type_service.rs index b9acd499..f36caa41 100644 --- a/scylla-server/src/services/data_type_service.rs +++ b/scylla-server/src/services/data_type_service.rs @@ -1,14 +1,13 @@ use crate::{models::DataType, schema::dataType::dsl::*, Database}; use diesel::prelude::*; -use diesel_async::RunQueryDsl; + +use super::DbError; /// Gets all datatypes /// * `d ` - The connection to the database /// returns: A result containing the data or the QueryError propogated by the db -pub async fn get_all_data_types( - db: &mut Database<'_>, -) -> Result, diesel::result::Error> { - dataType.load(db).await +pub async fn get_all_data_types(db: Database) -> Result, DbError> { + Ok(db.interact(move |conn| dataType.load(conn)).await??) } /// Upserts a datatype, either creating or updating one depending on its existence @@ -18,22 +17,25 @@ pub async fn get_all_data_types( /// * `node_name` - The name of the node linked to the data type, must already exist! /// returns: A result containing the data or the QueryError propogated by the db pub async fn upsert_data_type( - db: &mut Database<'_>, + db: Database, data_type_name: String, new_unit: String, node_name: String, -) -> Result { +) -> Result { let val = DataType { name: data_type_name, unit: new_unit, nodeName: node_name, }; - diesel::insert_into(dataType) - .values(&val) - .on_conflict(name) - .do_update() // actually allows for the upsert ability - .set(&val) - .returning(DataType::as_returning()) - .get_result(db) - .await + Ok(db + .interact(move |conn| { + diesel::insert_into(dataType) + .values(&val) + .on_conflict(name) + .do_update() // actually allows for the upsert ability + .set(&val) + .returning(DataType::as_returning()) + .get_result(conn) + }) + .await??) } diff --git a/scylla-server/src/services/mod.rs b/scylla-server/src/services/mod.rs index 2de58ca6..05ed354c 100644 --- a/scylla-server/src/services/mod.rs +++ b/scylla-server/src/services/mod.rs @@ -1,3 +1,21 @@ pub mod data_service; pub mod data_type_service; pub mod run_service; + +#[derive(Debug)] +pub enum DbError { + DieselError(diesel::result::Error), + ClosureError(deadpool_diesel::InteractError), +} + +impl From for DbError { + fn from(error: diesel::result::Error) -> Self { + DbError::DieselError(error) + } +} + +impl From for DbError { + fn from(error: deadpool_diesel::InteractError) -> Self { + DbError::ClosureError(error) + } +} diff --git a/scylla-server/src/services/run_service.rs b/scylla-server/src/services/run_service.rs index 4c9bdf9d..00b25a6a 100644 --- a/scylla-server/src/services/run_service.rs +++ b/scylla-server/src/services/run_service.rs @@ -1,38 +1,41 @@ use crate::{models::Run, schema::run::dsl::*, Database}; use chrono::{DateTime, Utc}; use diesel::prelude::*; -use diesel_async::RunQueryDsl; + +use super::DbError; /// Gets all runs /// * `db` - The prisma client to make the call to /// returns: A result containing the data or the QueryError propogated by the db -pub async fn get_all_runs(db: &mut Database<'_>) -> Result, diesel::result::Error> { - run.order(id.asc()).get_results(db).await +pub async fn get_all_runs(db: Database) -> Result, DbError> { + Ok(db + .interact(move |conn| run.order(id.asc()).get_results(conn)) + .await??) } /// Gets a single run by its id /// * `db` - The prisma client to make the call to /// * `run_id` - The id of the run to search for /// returns: A result containing the data (or None if the `run_id` was not a valid run) or the QueryError propogated by the db -pub async fn get_run_by_id( - db: &mut Database<'_>, - run_id: i32, -) -> Result, diesel::result::Error> { - run.find(run_id).first::(db).await.optional() +pub async fn get_run_by_id(db: Database, run_id: i32) -> Result, DbError> { + Ok(Ok(db + .interact(move |conn| run.find(run_id).first::(conn)) + .await??) + .optional()?) } /// Creates a run /// * `db` - The prisma client to make the call to /// * `timestamp` - time when the run starts /// returns: A result containing the data or the QueryError propogated by the db -pub async fn create_run( - db: &mut Database<'_>, - timestamp: DateTime, -) -> Result { - diesel::insert_into(run) - .values(time.eq(timestamp)) - .get_result(db) - .await +pub async fn create_run(db: Database, timestamp: DateTime) -> Result { + Ok(db + .interact(move |conn| { + diesel::insert_into(run) + .values(time.eq(timestamp)) + .get_result(conn) + }) + .await??) } /// Creates a run with a given id @@ -41,14 +44,17 @@ pub async fn create_run( /// * `run_id` - The id of the run to create, must not already be in use! /// returns: A result containing the data or the QueryError propogated by the db pub async fn create_run_with_id( - db: &mut Database<'_>, + db: Database, timestamp: DateTime, run_id: i32, -) -> Result { - diesel::insert_into(run) - .values((time.eq(timestamp), id.eq(run_id))) - .get_result(db) - .await +) -> Result { + Ok(db + .interact(move |conn| { + diesel::insert_into(run) + .values((time.eq(timestamp), id.eq(run_id))) + .get_result(conn) + }) + .await??) } /// Updates a run with GPS points @@ -57,13 +63,16 @@ pub async fn create_run_with_id( /// * `lat` - The latitude /// * `long` - The longitude pub async fn update_run_with_coords( - db: &mut Database<'_>, + db: Database, run_id: i32, lat: f64, long: f64, -) -> Result { - diesel::update(run.filter(id.eq(run_id))) - .set((latitude.eq(lat), longitude.eq(long))) - .get_result(db) - .await +) -> Result { + Ok(db + .interact(move |conn| { + diesel::update(run.filter(id.eq(run_id))) + .set((latitude.eq(lat), longitude.eq(long))) + .get_result(conn) + }) + .await??) }