diff --git a/Cargo.lock b/Cargo.lock index 2e0478df..6cab8c6d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1492,7 +1492,6 @@ dependencies = [ "quick-xml", "regex", "serde", - "test-case", "thiserror", "tokio", "tokio-postgres", @@ -1514,7 +1513,6 @@ dependencies = [ "rove", "rove_connector", "serde", - "test-case", "tokio", "tokio-postgres", ] @@ -2772,39 +2770,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "test-case" -version = "3.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb2550dd13afcd286853192af8601920d959b14c401fcece38071d53bf0768a8" -dependencies = [ - "test-case-macros", -] - -[[package]] -name = "test-case-core" -version = "3.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adcb7fd841cd518e279be3d5a3eb0636409487998a4aff22f3de87b81e88384f" -dependencies = [ - "cfg-if", - "proc-macro2", - "quote", - "syn 2.0.90", -] - -[[package]] -name = "test-case-macros" -version = "3.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c89e72a01ed4c579669add59014b9a524d609c0c88c6a585ce37485879f6ffb" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.90", - "test-case-core", -] - [[package]] name = "thiserror" version = "1.0.69" diff --git a/Cargo.toml b/Cargo.toml index 2744d5df..7760d813 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,7 +29,6 @@ rand_distr = "0.4.3" regex = "1.11.1" rove = { git = "https://github.com/metno/rove.git" } serde = { version = "1.0.215", features = ["derive"] } -test-case = "3.3.1" thiserror = "1.0.69" tokio = { version = "1.41.1", features = ["rt-multi-thread", "macros"] } tokio-postgres = { version = "0.7.12", features = ["with-chrono-0_4"] } diff --git a/ingestion/Cargo.toml b/ingestion/Cargo.toml index c98585b5..238bd47b 100644 --- a/ingestion/Cargo.toml +++ b/ingestion/Cargo.toml @@ -24,6 +24,3 @@ serde.workspace = true thiserror.workspace = true tokio.workspace = true tokio-postgres.workspace = true - -[dev-dependencies] -test-case.workspace = true diff --git a/ingestion/src/kldata.rs b/ingestion/src/kldata.rs index 827a536b..19480f86 100644 --- a/ingestion/src/kldata.rs +++ b/ingestion/src/kldata.rs @@ -378,203 +378,353 @@ pub async fn filter_and_label_kldata<'a>( mod tests { use crate::get_conversions; use chrono::TimeZone; - use test_case::test_case; use super::ObsType::{NonScalar, Scalar}; use super::*; - #[test_case( - "Test message that fails." => Err(Error::Parse("kldata indicator missing or out of order".to_string())); - "missing kldata indicator" - )] - // TODO: missing messageid defaults to 0 - #[test_case( - "kldata/nationalnr=100/type=504" => Ok((100, 504, 0)); - "valid header 1" - )] - #[test_case( - "kldata/type=504/nationalnr=100/messageid=25" => Ok((100, 504, 25)); - "valid header 2" - )] - #[test_case( - "kldata/messageid=23/nationalnr=99993/type=508/add" => Ok((99993, 508, 23)); - "valid header 3" - )] - #[test_case( - "kldata/received_time=\"2024-07-05 08:27:40+00\"/nationalnr=297000/type=70051" => Ok((297000, 70051, 0)); - "valid header 4" - )] - #[test_case( - "kldata/nationalnr=93140/type=501/unexpected" => Err(Error::Parse("unexpected field in kldata header format: unexpected".to_string())); - "unexpected field" - )] - #[test_case( - "kldata/messageid=10/type=501" => Err(Error::Parse("missing field `nationalnr` in kldata header".to_string())); - "missing nationlnr" - )] - #[test_case( - "kldata/messageid=10/nationalnr=93140" => Err(Error::Parse("missing field `type` in kldata header".to_string())); - "missing type" - )] - fn test_parse_meta(msg: &str) -> Result<(i32, i32, usize), Error> { - let header = ObsinnHeader::parse(msg)?; - - Ok((header.station_id, header.type_id, header.message_id)) + #[test] + fn test_parse_meta() { + let cases = vec![ + ( + "Test message that fails.", + Err(Error::Parse( + "kldata indicator missing or out of order".to_string(), + )), + "missing kldata indicator", + ), + // TODO: missing messageid defaults to 0 + ( + "kldata/nationalnr=100/type=504", + Ok((100, 504, 0)), + "valid header 1", + ), + ( + "kldata/type=504/nationalnr=100/messageid=25", + Ok((100, 504, 25)), + "valid header 2", + ), + ( + "kldata/messageid=23/nationalnr=99993/type=508/add", + Ok((99993, 508, 23)), + "valid header 3", + ), + ( + "kldata/received_time=\"2024-07-05 08:27:40+00\"/nationalnr=297000/type=70051", + Ok((297000, 70051, 0)), + "valid header 4", + ), + ( + "kldata/nationalnr=93140/type=501/unexpected", + Err(Error::Parse( + "unexpected field in kldata header format: unexpected".to_string(), + )), + "unexpected field", + ), + ( + "kldata/messageid=10/type=501", + Err(Error::Parse( + "missing field `nationalnr` in kldata header".to_string(), + )), + "missing nationlnr", + ), + ( + "kldata/messageid=10/nationalnr=93140", + Err(Error::Parse( + "missing field `type` in kldata header".to_string(), + )), + "missing type", + ), + ]; + for (msg, expected, case_description) in cases { + let output = ObsinnHeader::parse(msg) + .map(|header| (header.station_id, header.type_id, header.message_id)); + assert_eq!(output, expected, "{}", case_description); + } } - #[test_case( - "KLOBS,QSI_01(0,0)" => Ok(vec![ - ObsinnId{param_code: "KLOBS".to_string(), sensor_and_level: None}, - ObsinnId{param_code: "QSI_01".to_string(), sensor_and_level: Some((0,0))} - ]); - "match 1" - )] - #[test_case( - "param_1,param_2,QSI_01(0,0)" => Ok(vec![ - ObsinnId{param_code: "param_1".to_string(), sensor_and_level: None}, - ObsinnId{param_code: "param_2".to_string(), sensor_and_level: None}, - ObsinnId{param_code: "QSI_01".to_string(), sensor_and_level: Some((0,0))} - ]); - "match 2" - )] - #[test_case( - "param_1(0,0),param_2,param_3(0,0)" => Ok(vec![ - ObsinnId{param_code: "param_1".to_string(), sensor_and_level: Some((0,0))}, - ObsinnId{param_code: "param_2".to_string(), sensor_and_level: None}, - ObsinnId{param_code: "param_3".to_string(), sensor_and_level: Some((0,0))} - ]); - "match 3" - )] // NOTE: cases not taken into account here // - "()" => Vec::new() // - "param(0.1,0)" => vec[param, 0.1, 0] // - "param(0,0.1)" => vec[param, 0.1, 0] - fn test_parse_columns(cols: &str) -> Result, Error> { - parse_columns(cols) + #[test] + fn test_parse_columns() { + let cases = vec![ + ( + "KLOBS,QSI_01(0,0)", + Ok(vec![ + ObsinnId { + param_code: "KLOBS".to_string(), + sensor_and_level: None, + }, + ObsinnId { + param_code: "QSI_01".to_string(), + sensor_and_level: Some((0, 0)), + }, + ]), + "match 1", + ), + ( + "param_1,param_2,QSI_01(0,0)", + Ok(vec![ + ObsinnId { + param_code: "param_1".to_string(), + sensor_and_level: None, + }, + ObsinnId { + param_code: "param_2".to_string(), + sensor_and_level: None, + }, + ObsinnId { + param_code: "QSI_01".to_string(), + sensor_and_level: Some((0, 0)), + }, + ]), + "match 2", + ), + ( + "param_1(0,0),param_2,param_3(0,0)", + Ok(vec![ + ObsinnId { + param_code: "param_1".to_string(), + sensor_and_level: Some((0, 0)), + }, + ObsinnId { + param_code: "param_2".to_string(), + sensor_and_level: None, + }, + ObsinnId { + param_code: "param_3".to_string(), + sensor_and_level: Some((0, 0)), + }, + ]), + "match 3", + ), + ]; + + for (cols, expected, case_description) in cases { + let output = parse_columns(cols); + assert_eq!(output, expected, "{}", case_description); + } } - #[test_case( - "20160201054100,-1.1,0,2.80", - &[ - ObsinnId{param_code: "TA".to_string(), sensor_and_level: None}, - ObsinnId{param_code: "CI".to_string(), sensor_and_level: None}, - ObsinnId{param_code: "IR".to_string(), sensor_and_level: None}, - ] => Ok(vec![ - ObsinnObs{ - timestamp: Utc.with_ymd_and_hms(2016,2, 1, 5, 41, 0).unwrap(), - id: ObsinnId{param_code: "TA".to_string(), sensor_and_level: None}, - value: Scalar(-1.1) - }, - ObsinnObs{ - timestamp: Utc.with_ymd_and_hms(2016,2, 1, 5, 41, 0).unwrap(), - id: ObsinnId{param_code: "CI".to_string(), sensor_and_level: None}, - value: Scalar(0.0) - }, - ObsinnObs{ - timestamp: Utc.with_ymd_and_hms(2016,2, 1, 5, 41, 0).unwrap(), - id: ObsinnId{param_code: "IR".to_string(), sensor_and_level: None}, - value: Scalar(2.8) - }, - ]); - "single line" - )] - #[test_case( - "20160201054100,-1.1,0,2.80\n20160201055100,-1.5,1,2.90", - &[ - ObsinnId{param_code: "TA".to_string(), sensor_and_level: None}, - ObsinnId{param_code: "CI".to_string(), sensor_and_level: None}, - ObsinnId{param_code: "IR".to_string(), sensor_and_level: None}, - ] => Ok(vec![ - ObsinnObs{ - timestamp: Utc.with_ymd_and_hms(2016,2, 1, 5, 41, 0).unwrap(), - id: ObsinnId{param_code: "TA".to_string(), sensor_and_level: None}, - value: Scalar(-1.1) - }, - ObsinnObs{ - timestamp: Utc.with_ymd_and_hms(2016,2, 1, 5, 41, 0).unwrap(), - id: ObsinnId{param_code: "CI".to_string(), sensor_and_level: None}, - value: Scalar(0.0) - }, - ObsinnObs{ - timestamp: Utc.with_ymd_and_hms(2016,2, 1, 5, 41, 0).unwrap(), - id: ObsinnId{param_code: "IR".to_string(), sensor_and_level: None}, - value: Scalar(2.8) - }, - ObsinnObs{ - timestamp: Utc.with_ymd_and_hms(2016,2, 1, 5, 51, 0).unwrap(), - id: ObsinnId{param_code: "TA".to_string(), sensor_and_level: None}, - value: Scalar(-1.5) - }, - ObsinnObs{ - timestamp: Utc.with_ymd_and_hms(2016,2, 1, 5, 51, 0).unwrap(), - id: ObsinnId{param_code: "CI".to_string(), sensor_and_level: None}, - value: Scalar(1.0) - }, - ObsinnObs{ - timestamp: Utc.with_ymd_and_hms(2016,2, 1, 5, 51, 0).unwrap(), - id: ObsinnId{param_code: "IR".to_string(), sensor_and_level: None}, - value: Scalar(2.9) - }, - ]); - "multiple lines" - )] - #[test_case("20240910000000,20240910000000,10.1", - &[ - ObsinnId{param_code: "KLOBS".to_string(), sensor_and_level: None}, - ObsinnId{param_code: "TA".to_string(), sensor_and_level: None}, - ] => Ok(vec![ - ObsinnObs{ - timestamp: Utc.with_ymd_and_hms(2024, 9, 10, 0, 0, 0).unwrap(), - id: ObsinnId{param_code: "KLOBS".to_string(), sensor_and_level: None}, - value: NonScalar("20240910000000") - }, - ObsinnObs{ - timestamp: Utc.with_ymd_and_hms(2024, 9, 10, 0, 0, 0).unwrap(), - id: ObsinnId{param_code: "TA".to_string(), sensor_and_level: None}, - value: Scalar(10.1) - }] - ); - "non scalar parameter" - )] - #[test_case("20240910000000,20240910000000,10.1", - &[ - ObsinnId{param_code: "unknown".to_string(), sensor_and_level: None}, - ObsinnId{param_code: "TA".to_string(), sensor_and_level: None}, - ] => Ok(vec![ - ObsinnObs{ - timestamp: Utc.with_ymd_and_hms(2024, 9, 10, 0, 0, 0).unwrap(), - id: ObsinnId{param_code: "unknown".to_string(), sensor_and_level: None}, - value: NonScalar("20240910000000") - }, - ObsinnObs{ - timestamp: Utc.with_ymd_and_hms(2024, 9, 10, 0, 0, 0).unwrap(), - id: ObsinnId{param_code: "TA".to_string(), sensor_and_level: None}, - value: Scalar(10.1) - }, - ]); - "unrecognised param code" - )] - fn test_parse_obs<'a>(data: &'a str, cols: &[ObsinnId]) -> Result>, Error> { + #[test] + fn test_parse_obs<'a>() { + let cases = vec![ + ( + "20160201054100,-1.1,0,2.80", + vec![ + ObsinnId { + param_code: "TA".to_string(), + sensor_and_level: None, + }, + ObsinnId { + param_code: "CI".to_string(), + sensor_and_level: None, + }, + ObsinnId { + param_code: "IR".to_string(), + sensor_and_level: None, + }, + ], + Ok(vec![ + ObsinnObs { + timestamp: Utc.with_ymd_and_hms(2016, 2, 1, 5, 41, 0).unwrap(), + id: ObsinnId { + param_code: "TA".to_string(), + sensor_and_level: None, + }, + value: Scalar(-1.1), + }, + ObsinnObs { + timestamp: Utc.with_ymd_and_hms(2016, 2, 1, 5, 41, 0).unwrap(), + id: ObsinnId { + param_code: "CI".to_string(), + sensor_and_level: None, + }, + value: Scalar(0.0), + }, + ObsinnObs { + timestamp: Utc.with_ymd_and_hms(2016, 2, 1, 5, 41, 0).unwrap(), + id: ObsinnId { + param_code: "IR".to_string(), + sensor_and_level: None, + }, + value: Scalar(2.8), + }, + ]), + "single line", + ), + ( + "20160201054100,-1.1,0,2.80\n20160201055100,-1.5,1,2.90", + vec![ + ObsinnId { + param_code: "TA".to_string(), + sensor_and_level: None, + }, + ObsinnId { + param_code: "CI".to_string(), + sensor_and_level: None, + }, + ObsinnId { + param_code: "IR".to_string(), + sensor_and_level: None, + }, + ], + Ok(vec![ + ObsinnObs { + timestamp: Utc.with_ymd_and_hms(2016, 2, 1, 5, 41, 0).unwrap(), + id: ObsinnId { + param_code: "TA".to_string(), + sensor_and_level: None, + }, + value: Scalar(-1.1), + }, + ObsinnObs { + timestamp: Utc.with_ymd_and_hms(2016, 2, 1, 5, 41, 0).unwrap(), + id: ObsinnId { + param_code: "CI".to_string(), + sensor_and_level: None, + }, + value: Scalar(0.0), + }, + ObsinnObs { + timestamp: Utc.with_ymd_and_hms(2016, 2, 1, 5, 41, 0).unwrap(), + id: ObsinnId { + param_code: "IR".to_string(), + sensor_and_level: None, + }, + value: Scalar(2.8), + }, + ObsinnObs { + timestamp: Utc.with_ymd_and_hms(2016, 2, 1, 5, 51, 0).unwrap(), + id: ObsinnId { + param_code: "TA".to_string(), + sensor_and_level: None, + }, + value: Scalar(-1.5), + }, + ObsinnObs { + timestamp: Utc.with_ymd_and_hms(2016, 2, 1, 5, 51, 0).unwrap(), + id: ObsinnId { + param_code: "CI".to_string(), + sensor_and_level: None, + }, + value: Scalar(1.0), + }, + ObsinnObs { + timestamp: Utc.with_ymd_and_hms(2016, 2, 1, 5, 51, 0).unwrap(), + id: ObsinnId { + param_code: "IR".to_string(), + sensor_and_level: None, + }, + value: Scalar(2.9), + }, + ]), + "multiple lines", + ), + ( + "20240910000000,20240910000000,10.1", + vec![ + ObsinnId { + param_code: "KLOBS".to_string(), + sensor_and_level: None, + }, + ObsinnId { + param_code: "TA".to_string(), + sensor_and_level: None, + }, + ], + Ok(vec![ + ObsinnObs { + timestamp: Utc.with_ymd_and_hms(2024, 9, 10, 0, 0, 0).unwrap(), + id: ObsinnId { + param_code: "KLOBS".to_string(), + sensor_and_level: None, + }, + value: NonScalar("20240910000000"), + }, + ObsinnObs { + timestamp: Utc.with_ymd_and_hms(2024, 9, 10, 0, 0, 0).unwrap(), + id: ObsinnId { + param_code: "TA".to_string(), + sensor_and_level: None, + }, + value: Scalar(10.1), + }, + ]), + "non scalar parameter", + ), + ( + "20240910000000,20240910000000,10.1", + vec![ + ObsinnId { + param_code: "unknown".to_string(), + sensor_and_level: None, + }, + ObsinnId { + param_code: "TA".to_string(), + sensor_and_level: None, + }, + ], + Ok(vec![ + ObsinnObs { + timestamp: Utc.with_ymd_and_hms(2024, 9, 10, 0, 0, 0).unwrap(), + id: ObsinnId { + param_code: "unknown".to_string(), + sensor_and_level: None, + }, + value: NonScalar("20240910000000"), + }, + ObsinnObs { + timestamp: Utc.with_ymd_and_hms(2024, 9, 10, 0, 0, 0).unwrap(), + id: ObsinnId { + param_code: "TA".to_string(), + sensor_and_level: None, + }, + value: Scalar(10.1), + }, + ]), + "unrecognised param code", + ), + ]; + let param_conversions = get_conversions("resources/paramconversions.csv").unwrap(); - parse_obs(data.lines(), cols, param_conversions) + for (data, cols, expected, case_description) in cases { + let output = parse_obs(data.lines(), &cols, param_conversions.clone()); + assert_eq!(output, expected, "{}", case_description); + } } // NOTE: just test for basic failures, the happy path should already be captured by the other tests - #[test_case( - "" => Err(Error::Parse("kldata message contained too few lines".to_string())); - "empty line" - )] - #[test_case( - "kldata/nationalnr=99993/type=508/messageid=23" => Err(Error::Parse("kldata message contained too few lines".to_string())); - "header only" - )] - #[test_case( - "kldata/nationalnr=93140/type=501/messageid=23 -DD(0,0),FF(0,0),DG_1(0,0),FG_1(0,0),KLFG_1(0,0),FX_1(0,0)" => Err(Error::Parse("empty row in kldata csv".to_string())); - "missing data" - )] - fn test_parse_kldata(body: &str) -> Result<(usize, ObsinnChunk), Error> { + #[test] + fn test_parse_kldata() { + let cases = vec![ + ( + "", + Err(Error::Parse( + "kldata message contained too few lines".to_string(), + )), + "empty line", + ), + ( + "kldata/nationalnr=99993/type=508/messageid=23", + Err(Error::Parse( + "kldata message contained too few lines".to_string(), + )), + "header only", + ), + ( + "kldata/nationalnr=93140/type=501/messageid=23 + DD(0,0),FF(0,0),DG_1(0,0),FG_1(0,0),KLFG_1(0,0),FX_1(0,0)", + Err(Error::Parse("empty row in kldata csv".to_string())), + "missing data", + ), + ]; let param_conversions = get_conversions("resources/paramconversions.csv").unwrap(); - parse_kldata(body, param_conversions) + + for (body, expected, case_description) in cases { + let output = parse_kldata(body, param_conversions.clone()); + assert_eq!(output, expected, "{}", case_description); + } } } diff --git a/integration_tests/Cargo.toml b/integration_tests/Cargo.toml index d2d96ab4..0865299c 100644 --- a/integration_tests/Cargo.toml +++ b/integration_tests/Cargo.toml @@ -18,7 +18,6 @@ bb8-postgres.workspace = true rove.workspace = true rove_connector = { path = "../rove_connector" } serde.workspace = true -test-case.workspace = true futures.workspace = true csv.workspace = true reqwest = {version = "0.12.9", features = ["json"]} diff --git a/integration_tests/tests/end_to_end.rs b/integration_tests/tests/end_to_end.rs index 430366fa..7abc92e4 100644 --- a/integration_tests/tests/end_to_end.rs +++ b/integration_tests/tests/end_to_end.rs @@ -10,7 +10,6 @@ use chrono::{DateTime, Duration, DurationRound, TimeDelta, TimeZone, Utc}; use chronoutil::RelativeDuration; use futures::{Future, FutureExt}; use rove::data_switch::{DataConnector, SpaceSpec, TimeSpec, Timestamp}; -use test_case::test_case; use tokio::sync::mpsc; use tokio_postgres::NoTls; @@ -92,7 +91,7 @@ impl<'a> Param<'a> { struct TestData<'a> { station_id: i32, type_id: i32, - params: &'a [Param<'a>], + params: Vec>, start_time: DateTime, period: Duration, len: usize, @@ -175,14 +174,52 @@ fn mock_permit_tables() -> Arc> { Arc::new(RwLock::new((param_permit, station_permit))) } -#[test_case(0, 0, 0 => false; "stationid not in permit_tables")] -#[test_case(10000, 0, 0 => false; "stationid in ParamPermitTable, timeseries closed")] -#[test_case(10001, 0, 0 => true; "stationid in ParamPermitTable, timeseries open")] -#[test_case(20000, 0, 0 => false; "stationid in StationPermitTable, timeseries closed")] -#[test_case(20001, 0, 1 => true; "stationid in StationPermitTable, timeseries open")] -fn test_timeseries_is_open(station_id: i32, type_id: i32, permit_id: i32) -> bool { +#[test] +fn test_timeseries_is_open() { + let cases = vec![ + (0, 0, 0, false, "stationid not in permit_tables"), + ( + 10000, + 0, + 0, + false, + "stationid in ParamPermitTable, timeseries closed", + ), + ( + 10001, + 0, + 0, + true, + "stationid in ParamPermitTable, timeseries open", + ), + ( + 20000, + 0, + 0, + false, + "stationid in StationPermitTable, timeseries closed", + ), + ( + 20001, + 0, + 1, + true, + "stationid in StationPermitTable, timeseries open", + ), + ]; + let permit_tables = mock_permit_tables(); - timeseries_is_open(permit_tables, station_id, type_id, permit_id).unwrap() + for case in cases { + let station_id = case.0; + let type_id = case.1; + let permit_id = case.2; + let expected = case.3; + let test_case = case.4; + + let output = + timeseries_is_open(permit_tables.clone(), station_id, type_id, permit_id).unwrap(); + assert_eq!(output, expected, "{}", test_case); + } } async fn cleanup(client: &tokio_postgres::Client) { @@ -199,12 +236,36 @@ async fn e2e_test_wrapper>(test: T) { let manager = PostgresConnectionManager::new_from_stringlike(CONNECT_STRING, NoTls).unwrap(); let db_pool = bb8::Pool::builder().build(manager).await.unwrap(); - let api_server = tokio::spawn(lard_api::run(db_pool.clone())); - let ingestor = tokio::spawn(lard_ingestion::run( - db_pool.clone(), + let (init_shutdown_tx, mut init_shutdown_rx1) = tokio::sync::broadcast::channel(1); + let mut init_shutdown_rx2 = init_shutdown_tx.subscribe(); + + let (api_shutdown_tx, api_shutdown_rx) = tokio::sync::oneshot::channel(); + let (ingestor_shutdown_tx, ingestor_shutdown_rx) = tokio::sync::oneshot::channel(); + + let api_pool = db_pool.clone(); + let ingestion_pool = db_pool.clone(); + + let api_server = tokio::spawn(async move { + tokio::select! { + output = lard_api::run(api_pool) => output, + _ = init_shutdown_rx1.recv() => { + api_shutdown_tx.send(()).unwrap(); + () + }, + } + }); + let ingestor = tokio::spawn(async move { + tokio::select! { + output = lard_ingestion::run(ingestion_pool, PARAMCONV_CSV, mock_permit_tables(), - )); + ) => output, + _ = init_shutdown_rx2.recv() => { + ingestor_shutdown_tx.send(()).unwrap(); + Ok(()) + }, + } + }); tokio::select! { _ = api_server => panic!("API server task terminated first"), @@ -220,6 +281,10 @@ async fn e2e_test_wrapper>(test: T) { assert!(test_result.is_ok()) } } + + init_shutdown_tx.send(()).unwrap(); + api_shutdown_rx.await.unwrap(); + ingestor_shutdown_rx.await.unwrap(); } async fn ingest_data(client: &reqwest::Client, obsinn_msg: String) -> KldataResp { @@ -238,7 +303,7 @@ async fn test_stations_endpoint_irregular() { e2e_test_wrapper(async { let ts = TestData { station_id: 20001, - params: &[Param::new("TGM"), Param::new("TGX")], + params: vec![Param::new("TGM"), Param::new("TGX")], start_time: Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap(), period: Duration::hours(1), type_id: 501, @@ -270,140 +335,155 @@ async fn test_stations_endpoint_irregular() { .await } -#[test_case( - TestData { - station_id: 20001, - params: &[Param::new("TA"), Param::new("TGX")], - start_time: Utc::now().duration_trunc(TimeDelta::hours(1)).unwrap() - - Duration::hours(11), - period: Duration::hours(1), - type_id: 501, - len: 12, - }; "Scalar params") -] -// TODO: probably write a separate test, so we can check actual sensor and level -#[test_case( - TestData { - station_id: 20001, - params: &[Param::with_sensor_level("TA", (1, 1)), Param::new("TGX")], - start_time: Utc::now().duration_trunc(TimeDelta::hours(1)).unwrap() - - Duration::hours(11), - period: Duration::hours(1), - type_id: 501, - len: 12, - }; "With sensor and level") -] -#[test_case( - TestData { - station_id: 20001, - params: &[Param::new("KLOBS"), Param::new("TA")], - start_time: Utc::now().duration_trunc(TimeDelta::hours(1)).unwrap() - - Duration::hours(11), - period: Duration::hours(1), - type_id: 501, - len: 12, - }; "Scalar and non-scalar") -] #[tokio::test] -async fn test_stations_endpoint_regular(ts: TestData<'_>) { - e2e_test_wrapper(async { - let client = reqwest::Client::new(); - let ingestor_resp = ingest_data(&client, ts.obsinn_message()).await; - assert_eq!(ingestor_resp.res, 0); - - let resolution = "PT1H"; - for param in ts.params { - let url = format!( - "http://localhost:3000/stations/{}/params/{}?time_resolution={}", - ts.station_id, param.id, resolution - ); - let resp = reqwest::get(url).await.unwrap(); - assert!(resp.status().is_success()); - - let json: TimeseriesResp = resp.json().await.unwrap(); - assert_eq!(json.tseries.len(), 1); - - let Timeseries::Regular(series) = &json.tseries[0] else { - panic!("Expected regular timeseries") - }; - assert_eq!(series.data.len(), ts.len); - } - }) - .await -} - -#[test_case(99999, 211; "missing station")] -#[test_case(20001, 999; "missing param")] -#[tokio::test] -async fn test_stations_endpoint_errors(station_id: i32, param_id: i32) { - e2e_test_wrapper(async { - let ts = TestData { +async fn test_stations_endpoint_regular() { + let cases = vec![ + // Scalar params + TestData { station_id: 20001, - params: &[Param::new("TA")], - start_time: Utc.with_ymd_and_hms(2024, 1, 1, 00, 00, 00).unwrap(), + params: vec![Param::new("TA"), Param::new("TGX")], + start_time: Utc::now().duration_trunc(TimeDelta::hours(1)).unwrap() + - Duration::hours(11), period: Duration::hours(1), type_id: 501, - len: 48, - }; + len: 12, + }, + // TODO: probably write a separate test, so we can check actual sensor and level + // With sensor and level + TestData { + station_id: 20001, + params: vec![Param::with_sensor_level("TA", (1, 1)), Param::new("TGX")], + start_time: Utc::now().duration_trunc(TimeDelta::hours(1)).unwrap() + - Duration::hours(11), + period: Duration::hours(1), + type_id: 501, + len: 12, + }, + // Scalar and non-scalar + TestData { + station_id: 20001, + params: vec![Param::new("KLOBS"), Param::new("TA")], + start_time: Utc::now().duration_trunc(TimeDelta::hours(1)).unwrap() + - Duration::hours(11), + period: Duration::hours(1), + type_id: 501, + len: 12, + }, + ]; - let client = reqwest::Client::new(); - let ingestor_resp = ingest_data(&client, ts.obsinn_message()).await; - assert_eq!(ingestor_resp.res, 0); + for ts in cases { + e2e_test_wrapper(async { + let client = reqwest::Client::new(); + let ingestor_resp = ingest_data(&client, ts.obsinn_message()).await; + assert_eq!(ingestor_resp.res, 0); - for _ in ts.params { - let url = format!( - "http://localhost:3000/stations/{}/params/{}", - station_id, param_id - ); - let resp = reqwest::get(url).await.unwrap(); - // TODO: resp.status() returns 500, maybe it should return 404? - assert!(!resp.status().is_success()); - } - }) - .await + let resolution = "PT1H"; + for param in ts.params { + let url = format!( + "http://localhost:3000/stations/{}/params/{}?time_resolution={}", + ts.station_id, param.id, resolution + ); + let resp = reqwest::get(url).await.unwrap(); + assert!(resp.status().is_success()); + + let json: TimeseriesResp = resp.json().await.unwrap(); + assert_eq!(json.tseries.len(), 1); + + let Timeseries::Regular(series) = &json.tseries[0] else { + panic!("Expected regular timeseries") + }; + assert_eq!(series.data.len(), ts.len); + } + }) + .await + } } -// We insert 4 timeseries, 2 with new data (UTC::now()) and 2 with old data (2020) -#[test_case("", 2; "without query")] -#[test_case("?latest_max_age=2021-01-01T00:00:00Z", 2; "latest max age 1")] -#[test_case("?latest_max_age=2019-01-01T00:00:00Z", 4; "latest max age 2")] #[tokio::test] -async fn test_latest_endpoint(query: &str, n_timeseries_found: usize) { - e2e_test_wrapper(async { - let test_data = [ - TestData { +async fn test_stations_endpoint_errors() { + let cases = vec![ + //missing station + (99999, 211), + //missing param + (20001, 999), + ]; + for (station_id, param_id) in cases { + e2e_test_wrapper(async { + let ts = TestData { station_id: 20001, - params: &[Param::new("TA"), Param::new("TGX")], - start_time: Utc::now().duration_trunc(TimeDelta::minutes(1)).unwrap() - - Duration::hours(3), - period: Duration::minutes(1), - type_id: 508, - len: 180, - }, - TestData { - station_id: 20002, - params: &[Param::new("TA"), Param::new("TGX")], - start_time: Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 0).unwrap(), - period: Duration::minutes(1), - type_id: 508, - len: 180, - }, - ]; + params: vec![Param::new("TA")], + start_time: Utc.with_ymd_and_hms(2024, 1, 1, 00, 00, 00).unwrap(), + period: Duration::hours(1), + type_id: 501, + len: 48, + }; - let client = reqwest::Client::new(); - for ts in test_data { + let client = reqwest::Client::new(); let ingestor_resp = ingest_data(&client, ts.obsinn_message()).await; assert_eq!(ingestor_resp.res, 0); - } - let url = format!("http://localhost:3000/latest{}", query); - let resp = reqwest::get(url).await.unwrap(); - assert!(resp.status().is_success()); + for _ in ts.params { + let url = format!( + "http://localhost:3000/stations/{}/params/{}", + station_id, param_id + ); + let resp = reqwest::get(url).await.unwrap(); + // TODO: resp.status() returns 500, maybe it should return 404? + assert!(!resp.status().is_success()); + } + }) + .await + } +} - let json: LatestResp = resp.json().await.unwrap(); - assert_eq!(json.data.len(), n_timeseries_found); - }) - .await +// We insert 4 timeseries, 2 with new data (UTC::now()) and 2 with old data (2020) +#[tokio::test] +async fn test_latest_endpoint() { + let cases = vec![ + // without query + ("", 2), + // latest max age 1 + ("?latest_max_age=2021-01-01T00:00:00Z", 2), + // latest max age 2 + ("?latest_max_age=2019-01-01T00:00:00Z", 4), + ]; + for (query, n_timeseries_found) in cases { + e2e_test_wrapper(async { + let test_data = [ + TestData { + station_id: 20001, + params: vec![Param::new("TA"), Param::new("TGX")], + start_time: Utc::now().duration_trunc(TimeDelta::minutes(1)).unwrap() + - Duration::hours(3), + period: Duration::minutes(1), + type_id: 508, + len: 180, + }, + TestData { + station_id: 20002, + params: vec![Param::new("TA"), Param::new("TGX")], + start_time: Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 0).unwrap(), + period: Duration::minutes(1), + type_id: 508, + len: 180, + }, + ]; + + let client = reqwest::Client::new(); + for ts in test_data { + let ingestor_resp = ingest_data(&client, ts.obsinn_message()).await; + assert_eq!(ingestor_resp.res, 0); + } + + let url = format!("http://localhost:3000/latest{}", query); + let resp = reqwest::get(url).await.unwrap(); + assert!(resp.status().is_success()); + + let json: LatestResp = resp.json().await.unwrap(); + assert_eq!(json.data.len(), n_timeseries_found); + }) + .await + } } #[tokio::test] @@ -415,7 +495,7 @@ async fn test_timeslice_endpoint() { let test_data = [ TestData { station_id: 20001, - params: ¶ms.clone(), + params: params.clone(), start_time: timestamp - Duration::hours(1), period: Duration::hours(1), type_id: 501, @@ -423,7 +503,7 @@ async fn test_timeslice_endpoint() { }, TestData { station_id: 20002, - params: ¶ms.clone(), + params: params.clone(), start_time: timestamp - Duration::hours(1), period: Duration::minutes(1), type_id: 508, @@ -481,7 +561,7 @@ async fn test_kafka() { tokio::spawn(async move { let ts = TestData { station_id: 20001, - params: &[Param::new("RR_1")], // sum(precipitation_amount PT1H) + params: vec![Param::new("RR_1")], // sum(precipitation_amount PT1H) start_time: Utc.with_ymd_and_hms(2024, 6, 5, 12, 0, 0).unwrap(), period: chrono::Duration::hours(1), type_id: -4, @@ -535,19 +615,17 @@ async fn test_kafka() { .await } -#[test_case( - TestData { +#[tokio::test] +async fn test_rove_connector() { + let ts = TestData { station_id: 20001, - params: &[Param::new("TA"), Param::new("TGX")], - start_time: Utc::now().duration_trunc(TimeDelta::hours(1)).unwrap() - - Duration::hours(11), + params: vec![Param::new("TA"), Param::new("TGX")], + start_time: Utc::now().duration_trunc(TimeDelta::hours(1)).unwrap() - Duration::hours(11), period: Duration::hours(1), type_id: 501, len: 12, - }; "Scalar params") -] -#[tokio::test] -async fn test_rove_connector(ts: TestData<'_>) { + }; + e2e_test_wrapper(async { let client = reqwest::Client::new();