Skip to content

Commit

Permalink
fix up run so gps points upsert, resolve clippy warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
jr1221 committed Nov 21, 2024
1 parent 1d7908d commit 47097b9
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ CREATE TABLE "run" (
"locationName" TEXT,
"latitude" DOUBLE PRECISION,
"longitude" DOUBLE PRECISION,
"radius" DOUBLE PRECISION,
"driverName" TEXT,
"notes" TEXT NOT NULL,
"time" TIMESTAMPTZ NOT NULL,
Expand Down
85 changes: 24 additions & 61 deletions scylla-server/src/db_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,61 +5,8 @@ use tokio::{sync::mpsc::Sender, time::Duration};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, instrument, trace, warn, Level};

use crate::services::{data_service, data_type_service};
use crate::{ClientData, LocationData, PoolHandle};

/// A struct defining an in progress location packet
struct LocLock {
location_name: Option<String>,
points: Option<(f32, f32)>,
radius: Option<f32>,
}

impl LocLock {
pub fn new() -> LocLock {
LocLock {
location_name: None,
points: None,
radius: None,
}
}

/// Add the location name to the packet
pub fn add_loc_name(&mut self, loc_name: String) {
self.location_name = Some(loc_name);
}

/// Add points to the packet
pub fn add_points(&mut self, lat: f32, long: f32) {
self.points = Some((lat, long));
}

/// Add a radius to the packet
pub fn add_radius(&mut self, radius: f32) {
self.radius = Some(radius);
}

/// Attempt to finalize the packet, returning a location data and clearing this object or None if still in progress
pub fn finalize(&mut self) -> Option<LocationData> {
if self.location_name.is_some() && self.points.is_some() && self.radius.is_some() {
self.clear();
return Some(LocationData {
location_name: self.location_name.clone().unwrap(),
lat: self.points.unwrap().0,
long: self.points.unwrap().1,
radius: self.radius.unwrap(),
});
}
None
}

/// Clear the internal state
fn clear(&mut self) {
self.location_name = None;
self.points = None;
self.radius = None;
}
}
use crate::services::{data_service, data_type_service, run_service};
use crate::{ClientData, PoolHandle, RUN_ID};

/// A few threads to manage the processing and inserting of special types,
/// upserting of metadata for data, and batch uploading the database
Expand All @@ -70,10 +17,6 @@ pub struct DbHandler {
receiver: Receiver<ClientData>,
/// The database pool handle
pool: PoolHandle,
/// An internal state of an in progress location packet
location_lock: LocLock,
/// Whether the location has been modified this loop
is_location: bool,
/// the queue of data
data_queue: Vec<ClientData>,
/// the time since last batch
Expand All @@ -94,8 +37,6 @@ impl DbHandler {
datatype_list: vec![],
receiver,
pool,
location_lock: LocLock::new(),
is_location: false,
data_queue: vec![],
last_time: tokio::time::Instant::now(),
upload_interval,
Expand Down Expand Up @@ -246,6 +187,28 @@ impl DbHandler {
self.datatype_list.push(msg.name.clone());
}

// Check for GPS points, insert them into current run if available
if msg.name == "TPU/GPS/Location" {
debug!("Upserting run with location points!");
let Ok(mut database) = self.pool.get() else {
warn!("Could not get connection for db points update");
return;
};
// ensure lat AND long present in message, just a sanity check
if msg.values.len() < 2 {
warn!("GPS message found without both lat and long!");
} else if let Err(err) = run_service::update_run_with_coords(
&mut database,
RUN_ID.load(std::sync::atomic::Ordering::Relaxed),
msg.values[0].into(),
msg.values[1].into(),
)
.await
{
warn!("DB error run gps points upsert: {:?}", err);
}
}

// no matter what, batch upload the message
trace!("Pushing msg to queue: {:?}", msg);
self.data_queue.push(msg);
Expand Down
12 changes: 2 additions & 10 deletions scylla-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ pub mod services;
pub mod db_handler;
pub mod mqtt_processor;

#[allow(non_snake_case)]
pub mod models;
#[allow(non_snake_case)]
pub mod schema;

pub mod command_data;
Expand Down Expand Up @@ -53,13 +55,3 @@ pub struct ClientData {
#[serde(skip_serializing)]
pub node: String,
}

/// A final location packet
/// This has the purpose of representing the struct for the service layer to unpack for insertion, and therefore is not serialized
#[derive(Debug)]
struct LocationData {
location_name: String,
lat: f32,
long: f32,
radius: f32,
}
1 change: 0 additions & 1 deletion scylla-server/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ pub struct Run {
pub locationName: Option<String>,
pub latitude: Option<f64>,
pub longitude: Option<f64>,
pub radius: Option<f64>,
pub driverName: Option<String>,
pub notes: String,
pub time: DateTime<Utc>,
Expand Down
1 change: 0 additions & 1 deletion scylla-server/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ diesel::table! {
locationName -> Nullable<Text>,
latitude -> Nullable<Float8>,
longitude -> Nullable<Float8>,
radius -> Nullable<Float8>,
driverName -> Nullable<Text>,
notes -> Text,
time -> Timestamptz,
Expand Down
18 changes: 17 additions & 1 deletion scylla-server/src/services/run_service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{models::Run, schema::run::dsl::*, Database, LocationData};
use crate::{models::Run, schema::run::dsl::*, Database};
use chrono::{DateTime, Utc};
use diesel::prelude::*;

Expand Down Expand Up @@ -47,3 +47,19 @@ pub async fn create_run_with_id(
.values((time.eq(timestamp), id.eq(run_id), notes.eq("A")))
.get_result(db)
}

/// Updates a run with GPS points
/// * `db` - The prisma client to make the call to
/// * `run_id` - The run id to upsert
/// * `lat` - The latitude
/// * `long` - The longitude
pub async fn update_run_with_coords(
db: &mut Database,
run_id: i32,
lat: f64,
long: f64,
) -> Result<Run, diesel::result::Error> {
diesel::update(run.filter(id.eq(run_id)))
.set((latitude.eq(lat), longitude.eq(long)))
.get_result(db)
}
2 changes: 1 addition & 1 deletion scylla-server/src/transformers/data_transformer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl From<crate::models::Data> for PublicData {
PublicData {
values: value
.values
.unwrap_or(vec![])
.unwrap_or_default()
.into_iter()
.flatten()
.collect(),
Expand Down
2 changes: 0 additions & 2 deletions scylla-server/tests/data_type_service_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ use scylla_server::{
};
use test_utils::cleanup_and_prepare;

const TEST_KEYWORD: &str = "test";

#[tokio::test]
async fn test_get_all_datatypes() -> Result<(), diesel::result::Error> {
let mut db = cleanup_and_prepare().await?;
Expand Down

0 comments on commit 47097b9

Please sign in to comment.