Skip to content

Commit

Permalink
remove transformers that areunecessary, remove mock, move db_handler
Browse files Browse the repository at this point in the history
  • Loading branch information
jr1221 committed Nov 10, 2024
1 parent eda7acf commit 1fd38ea
Show file tree
Hide file tree
Showing 11 changed files with 61 additions and 363 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ use tokio::{sync::mpsc::Sender, time::Duration};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, instrument, trace, warn, Level};

use crate::{ClientData, LocationData};
use crate::{
services::{data_service, data_type_service},
Database,
};

use super::{ClientData, LocationData};

/// A struct defining an in progress location packet
struct LocLock {
location_name: Option<String>,
Expand Down
37 changes: 36 additions & 1 deletion scylla-server/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use std::sync::atomic::AtomicI32;

use chrono::{DateTime, Utc, serde::ts_milliseconds};
use diesel::PgConnection;

pub mod controllers;
pub mod error;
pub mod processors;
pub mod services;

pub mod db_handler;
pub mod mqtt_processor;

pub mod models;
pub mod schema;

Expand All @@ -28,3 +31,35 @@ pub enum RateLimitMode {

// Atomic to keep track the current run id across EVERYTHING (very scary)
pub static RUN_ID: AtomicI32 = AtomicI32::new(-1);


/// Represents the client data
/// This has the dual purposes of
/// * - representing the packet sent over the socket for live data
/// * - representing the struct for the service layer to unpack for insertion
/// Note: node name is only considered for database storage and convenience, it is not serialized in a socket packet
#[derive(serde::Serialize, Clone, Debug)]
pub struct ClientData {
#[serde(rename = "runId")]
pub run_id: i32,
pub name: String,
pub unit: String,
pub values: Vec<f32>,
/// Client expects time in milliseconds, so serialize as such
#[serde(with = "ts_milliseconds")]
pub timestamp: DateTime<Utc>,

/// client doesnt parse node
#[serde(skip_serializing)]
pub node: String,
}

/// A final location packet
/// This has the purpose of representing the struct for the service layer to unpack for insertion, and therefore is not serialized
#[derive(Debug)]
struct LocationData {
location_name: String,
lat: f32,
long: f32,
radius: f32,
}
61 changes: 24 additions & 37 deletions scylla-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ use tracing_subscriber::{fmt::format::FmtSpan, EnvFilter};
#[derive(Parser, Debug)]
#[command(version)]
struct ScyllaArgs {
/// Whether to enable the Scylla production mode
#[arg(short = 'p', long, env = "SCYLLA_PROD")]
prod: bool,

/// Whether to enable batch saturation (parallel batching)
#[arg(short = 's', long, env = "SCYLLA_SATURATE_BATCH")]
saturate_batch: bool,
Expand Down Expand Up @@ -188,40 +184,31 @@ async fn main() {
));
}

// if PROD_SCYLLA=false, also procur a client for use in the config state
let client: Option<Arc<AsyncClient>> = if !cli.prod {
info!("Running processor in mock mode, no data will be stored");
let recv = MockProcessor::new(io);
tokio::spawn(recv.generate_mock());
None
} else {
// creates the initial run
let curr_run = run_service::create_run(db.clone(), chrono::offset::Utc::now())
.await
.expect("Could not create initial run!");
debug!("Configuring current run: {:?}", curr_run);
// creates the initial run
let curr_run = run_service::create_run(db.clone(), chrono::offset::Utc::now())
.await
.expect("Could not create initial run!");
debug!("Configuring current run: {:?}", curr_run);

RUN_ID.store(curr_run.id, Ordering::Relaxed);
// run prod if this isnt present
// create and spawn the mqtt processor
info!("Running processor in MQTT (production) mode");
let (recv, opts) = MqttProcessor::new(
mqtt_send,
io,
token.clone(),
MqttProcessorOptions {
mqtt_path: cli.siren_host_url,
initial_run: curr_run.id,
static_rate_limit_time: cli.static_rate_limit_value,
rate_limit_mode: cli.rate_limit_mode,
upload_ratio: cli.socketio_discard_percent,
},
);
let (client, eventloop) = AsyncClient::new(opts, 600);
let client_sharable: Arc<AsyncClient> = Arc::new(client);
task_tracker.spawn(recv.process_mqtt(client_sharable.clone(), eventloop));
Some(client_sharable)
};
RUN_ID.store(curr_run.id, Ordering::Relaxed);
// run prod if this isnt present
// create and spawn the mqtt processor
info!("Running processor in MQTT (production) mode");
let (recv, opts) = MqttProcessor::new(
mqtt_send,
io,
token.clone(),
MqttProcessorOptions {
mqtt_path: cli.siren_host_url,
initial_run: curr_run.id,
static_rate_limit_time: cli.static_rate_limit_value,
rate_limit_mode: cli.rate_limit_mode,
upload_ratio: cli.socketio_discard_percent,
},
);
let (client, eventloop) = AsyncClient::new(opts, 600);
let client_sharable: Arc<AsyncClient> = Arc::new(client);
task_tracker.spawn(recv.process_mqtt(client_sharable.clone(), eventloop));

let app = Router::new()
// DATA
Expand Down
File renamed without changes.
123 changes: 0 additions & 123 deletions scylla-server/src/processors/mock_data.rs

This file was deleted.

66 changes: 0 additions & 66 deletions scylla-server/src/processors/mock_processor.rs

This file was deleted.

38 changes: 0 additions & 38 deletions scylla-server/src/processors/mod.rs

This file was deleted.

21 changes: 0 additions & 21 deletions scylla-server/src/transformers/driver_transformer.rs

This file was deleted.

Loading

0 comments on commit 1fd38ea

Please sign in to comment.