Skip to content

Commit

Permalink
refine code
Browse files Browse the repository at this point in the history
  • Loading branch information
ssrlive committed Dec 21, 2023
1 parent e988f5d commit 7704a6f
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 104 deletions.
4 changes: 2 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ pub fn execute(args: &args::Args) -> BoxResult<()> {
let mut complete = false;

//config-parsing and pre-connection setup
let mut tcp_port_pool = tcp::receiver::TcpPortPool::new(args.tcp_port_pool.to_string(), args.tcp6_port_pool.to_string());
let mut udp_port_pool = udp::receiver::UdpPortPool::new(args.udp_port_pool.to_string(), args.udp6_port_pool.to_string());
let mut tcp_port_pool = tcp::receiver::TcpPortPool::new(&args.tcp_port_pool, &args.tcp6_port_pool);
let mut udp_port_pool = udp::receiver::UdpPortPool::new(&args.udp_port_pool, &args.udp6_port_pool);

let cpu_affinity_manager = Arc::new(Mutex::new(crate::utils::cpu_affinity::CpuAffinityManager::new(&args.affinity)?));

Expand Down
24 changes: 11 additions & 13 deletions src/protocol/communication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ pub fn send(stream: &mut TcpStream, message: &serde_json::Value) -> BoxResult<()
let serialised_message = serde_json::to_vec(message)?;

log::debug!(
"sending message of length {}, {:?}, to {}...",
"sending message to {} length {}, {:?}...",
stream.peer_addr()?,
serialised_message.len(),
message,
stream.peer_addr()?
);
let mut output_buffer = vec![0_u8; serialised_message.len() + 2];
output_buffer[..2].copy_from_slice(&(serialised_message.len() as u16).to_be_bytes());
Expand All @@ -71,21 +71,19 @@ pub fn send(stream: &mut TcpStream, message: &serde_json::Value) -> BoxResult<()
}
}
}
Err(Box::new(simple_error::simple_error!(
"timed out while attempting to send status-message to {}",
stream.peer_addr()?
)))
let err = simple_error::simple_error!("timed out while attempting to send status-message to {}", stream.peer_addr()?);
Err(Box::new(err))
}

/// receives the length-count of a pending message over a client-server communications stream
fn receive_length(stream: &mut TcpStream, alive_check: fn() -> bool, results_handler: &mut dyn FnMut() -> BoxResult<()>) -> BoxResult<u16> {
fn receive_length(stream: &mut TcpStream, alive_check: fn() -> bool, handler: &mut dyn FnMut() -> BoxResult<()>) -> BoxResult<usize> {
stream.set_read_timeout(Some(POLL_TIMEOUT)).expect("unable to set TCP read-timeout");

let mut length_bytes_read = 0;
let mut length_spec: [u8; 2] = [0; 2];
while alive_check() {
//waiting to find out how long the next message is
results_handler()?; //send any outstanding results between cycles
handler()?; //send any outstanding results between cycles

let size = match stream.read(&mut length_spec[length_bytes_read..]) {
Ok(size) => size,
Expand All @@ -110,7 +108,7 @@ fn receive_length(stream: &mut TcpStream, alive_check: fn() -> bool, results_han
if length_bytes_read == 2 {
let length = u16::from_be_bytes(length_spec);
log::debug!("received length-spec of {} from {}", length, stream.peer_addr()?);
return Ok(length);
return Ok(length as usize);
} else {
log::debug!("received partial length-spec from {}", stream.peer_addr()?);
}
Expand All @@ -123,12 +121,12 @@ fn receive_payload(
stream: &mut TcpStream,
alive_check: fn() -> bool,
results_handler: &mut dyn FnMut() -> BoxResult<()>,
length: u16,
length: usize,
) -> BoxResult<serde_json::Value> {
stream.set_read_timeout(Some(POLL_TIMEOUT)).expect("unable to set TCP read-timeout");

let mut bytes_read = 0;
let mut buffer = vec![0_u8; length.into()];
let mut buffer = vec![0_u8; length];
while alive_check() {
//waiting to receive the payload
results_handler()?; //send any outstanding results between cycles
Expand All @@ -153,10 +151,10 @@ fn receive_payload(
}

bytes_read += size;
if bytes_read == length as usize {
if bytes_read == length {
match serde_json::from_slice(&buffer) {
Ok(v) => {
log::debug!("received {:?} from {}", v, stream.peer_addr()?);
log::debug!("received from {} {:?}", stream.peer_addr()?, v);
return Ok(v);
}
Err(e) => {
Expand Down
4 changes: 2 additions & 2 deletions src/protocol/results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub trait IntervalResult {
fn to_string(&self, bit: bool) -> String;
}

pub type IntervalResultBox = Box<dyn IntervalResult + Sync + Send>;
pub type IntervalResultBox = Box<dyn IntervalResult + Sync + Send + 'static>;

pub struct ClientDoneResult {
pub stream_idx: u8,
Expand Down Expand Up @@ -477,7 +477,7 @@ impl IntervalResult for UdpSendResult {
}
}

pub fn interval_result_from_json(value: serde_json::Value) -> BoxResult<Box<dyn IntervalResult>> {
pub fn interval_result_from_json(value: serde_json::Value) -> BoxResult<IntervalResultBox> {
match value.get("family") {
Some(f) => match f.as_str() {
Some(family) => match family {
Expand Down
101 changes: 49 additions & 52 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
use std::io;
use std::net::{Shutdown, SocketAddr};
use std::sync::atomic::{AtomicBool, AtomicU16, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex};
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::Duration;

Expand All @@ -31,7 +30,7 @@ use std::net::{TcpListener, TcpStream};
use crate::args::Args;
use crate::protocol::communication::{receive, send};
use crate::protocol::messaging::{prepare_connect, prepare_connect_ready};
use crate::protocol::results::ServerDoneResult;
use crate::protocol::results::{IntervalResultBox, ServerDoneResult};
use crate::stream::{tcp, udp, TestStream};
use crate::BoxResult;

Expand All @@ -56,10 +55,7 @@ fn handle_client(
let mut parallel_streams: Vec<Arc<Mutex<(dyn TestStream + Sync + Send)>>> = Vec::new();
let mut parallel_streams_joinhandles = Vec::new();

let (results_tx, results_rx): (
std::sync::mpsc::Sender<crate::protocol::results::IntervalResultBox>,
std::sync::mpsc::Receiver<crate::protocol::results::IntervalResultBox>,
) = channel();
let (results_tx, results_rx) = mpsc::channel::<IntervalResultBox>();

//a closure used to pass results from stream-handlers to the client-communication stream
let mut forwarding_send_stream = stream.try_clone()?;
Expand Down Expand Up @@ -295,12 +291,12 @@ impl Drop for ClientThreadMonitor {
pub fn serve(args: &Args) -> BoxResult<()> {
//config-parsing and pre-connection setup
let tcp_port_pool = Arc::new(Mutex::new(tcp::receiver::TcpPortPool::new(
args.tcp_port_pool.to_string(),
args.tcp6_port_pool.to_string(),
&args.tcp_port_pool,
&args.tcp6_port_pool,
)));
let udp_port_pool = Arc::new(Mutex::new(udp::receiver::UdpPortPool::new(
args.udp_port_pool.to_string(),
args.udp6_port_pool.to_string(),
&args.udp_port_pool,
&args.udp6_port_pool,
)));

let cpu_affinity_manager = Arc::new(Mutex::new(crate::utils::cpu_affinity::CpuAffinityManager::new(&args.affinity)?));
Expand All @@ -317,53 +313,54 @@ pub fn serve(args: &Args) -> BoxResult<()> {
log::info!("server listening on {}", listener.local_addr()?);

while is_alive() {
match listener.accept() {
Ok((mut stream, address)) => {
log::info!("connection from {}", address);

stream.set_nodelay(true).expect("cannot disable Nagle's algorithm");

#[cfg(unix)]
{
use crate::protocol::communication::KEEPALIVE_DURATION;
let keepalive_parameters = socket2::TcpKeepalive::new().with_time(KEEPALIVE_DURATION);
let raw_socket = socket2::SockRef::from(&stream);
raw_socket.set_tcp_keepalive(&keepalive_parameters)?;
}

let client_count = CLIENTS.fetch_add(1, Ordering::Relaxed) + 1;
if client_limit > 0 && client_count > client_limit {
log::warn!("client-limit ({}) reached; disconnecting {}...", client_limit, address.to_string());
stream.shutdown(Shutdown::Both).unwrap_or_default();
CLIENTS.fetch_sub(1, Ordering::Relaxed);
} else {
let c_cam = cpu_affinity_manager.clone();
let c_tcp_port_pool = tcp_port_pool.clone();
let c_udp_port_pool = udp_port_pool.clone();
let thread_builder = thread::Builder::new().name(address.to_string());
thread_builder.spawn(move || {
//ensure the client is accounted-for even if the handler panics
let _client_thread_monitor = ClientThreadMonitor {
client_address: address.to_string(),
};

match handle_client(&mut stream, c_cam, c_tcp_port_pool, c_udp_port_pool) {
Ok(_) => (),
Err(e) => log::error!("error in client-handler: {}", e),
}

//in the event of panic, this will happen when the stream is dropped
stream.shutdown(Shutdown::Both).unwrap_or_default();
})?;
}
}
let (mut stream, address) = match listener.accept() {
Ok((stream, address)) => (stream, address),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
//no pending clients
// no pending clients
thread::sleep(POLL_TIMEOUT);
continue;
}
Err(e) => {
return Err(Box::new(e));
}
};

log::info!("connection from {}", address);

stream.set_nodelay(true).expect("cannot disable Nagle's algorithm");

#[cfg(unix)]
{
use crate::protocol::communication::KEEPALIVE_DURATION;
let keepalive_parameters = socket2::TcpKeepalive::new().with_time(KEEPALIVE_DURATION);
let raw_socket = socket2::SockRef::from(&stream);
raw_socket.set_tcp_keepalive(&keepalive_parameters)?;
}

let client_count = CLIENTS.fetch_add(1, Ordering::Relaxed) + 1;
if client_limit > 0 && client_count > client_limit {
log::warn!("client-limit ({}) reached; disconnecting {}...", client_limit, address.to_string());
stream.shutdown(Shutdown::Both).unwrap_or_default();
CLIENTS.fetch_sub(1, Ordering::Relaxed);
} else {
let c_cam = cpu_affinity_manager.clone();
let c_tcp_port_pool = tcp_port_pool.clone();
let c_udp_port_pool = udp_port_pool.clone();
let thread_builder = thread::Builder::new().name(address.to_string());
thread_builder.spawn(move || {
// ensure the client is accounted-for even if the handler panics
let _client_thread_monitor = ClientThreadMonitor {
client_address: address.to_string(),
};

match handle_client(&mut stream, c_cam, c_tcp_port_pool, c_udp_port_pool) {
Ok(_) => (),
Err(e) => log::error!("error in client-handler: {}", e),
}

//in the event of panic, this will happen when the stream is dropped
stream.shutdown(Shutdown::Both).unwrap_or_default();
})?;
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
pub mod tcp;
pub mod udp;

use crate::BoxResult;
use crate::{protocol::results::IntervalResultBox, BoxResult};

pub const INTERVAL: std::time::Duration = std::time::Duration::from_secs(1);

Expand All @@ -30,7 +30,7 @@ pub const INTERVAL: std::time::Duration = std::time::Duration::from_secs(1);
/// INTERVAL while gathering data.
pub trait TestStream {
/// gather data; returns None when the test is over
fn run_interval(&mut self) -> Option<BoxResult<crate::protocol::results::IntervalResultBox>>;
fn run_interval(&mut self) -> Option<BoxResult<IntervalResultBox>>;
/// return the port associated with the test-stream; this may vary over the test's lifetime
fn get_port(&self) -> BoxResult<u16>;
/// returns the index of the test, used to match client and server data
Expand All @@ -39,7 +39,7 @@ pub trait TestStream {
fn stop(&mut self);
}

fn parse_port_spec(port_spec: String) -> Vec<u16> {
fn parse_port_spec(port_spec: &str) -> Vec<u16> {
let mut ports = Vec::<u16>::new();
if !port_spec.is_empty() {
for range in port_spec.split(',') {
Expand Down
Loading

0 comments on commit 7704a6f

Please sign in to comment.