From 7dc10614db83d53a66ebc217ff815c1f35e2c9d6 Mon Sep 17 00:00:00 2001 From: ssrlive <30760636+ssrlive@users.noreply.github.com> Date: Wed, 20 Dec 2023 22:49:49 +0800 Subject: [PATCH] refine code --- src/client.rs | 4 +- src/protocol/communication.rs | 18 +++--- src/protocol/results.rs | 4 +- src/server.rs | 101 +++++++++++++++++----------------- src/stream/mod.rs | 6 +- src/stream/tcp.rs | 35 ++++++------ src/stream/udp.rs | 34 +++++++----- 7 files changed, 101 insertions(+), 101 deletions(-) diff --git a/src/client.rs b/src/client.rs index f70f506..2b59d8b 100644 --- a/src/client.rs +++ b/src/client.rs @@ -100,8 +100,8 @@ pub fn execute(args: &args::Args) -> BoxResult<()> { let mut complete = false; //config-parsing and pre-connection setup - let mut tcp_port_pool = tcp::receiver::TcpPortPool::new(args.tcp_port_pool.to_string(), args.tcp6_port_pool.to_string()); - let mut udp_port_pool = udp::receiver::UdpPortPool::new(args.udp_port_pool.to_string(), args.udp6_port_pool.to_string()); + let mut tcp_port_pool = tcp::receiver::TcpPortPool::new(&args.tcp_port_pool, &args.tcp6_port_pool); + let mut udp_port_pool = udp::receiver::UdpPortPool::new(&args.udp_port_pool, &args.udp6_port_pool); let cpu_affinity_manager = Arc::new(Mutex::new(crate::utils::cpu_affinity::CpuAffinityManager::new(&args.affinity)?)); diff --git a/src/protocol/communication.rs b/src/protocol/communication.rs index ab90bfc..a59d498 100644 --- a/src/protocol/communication.rs +++ b/src/protocol/communication.rs @@ -42,10 +42,10 @@ pub fn send(stream: &mut TcpStream, message: &serde_json::Value) -> BoxResult<() let serialised_message = serde_json::to_vec(message)?; log::debug!( - "sending message of length {}, {:?}, to {}...", + "sending message to {}, length {}, {:?}...", + stream.peer_addr()?, serialised_message.len(), message, - stream.peer_addr()? ); let mut output_buffer = vec![0_u8; serialised_message.len() + 2]; output_buffer[..2].copy_from_slice(&(serialised_message.len() as u16).to_be_bytes()); @@ -71,21 +71,19 @@ pub fn send(stream: &mut TcpStream, message: &serde_json::Value) -> BoxResult<() } } } - Err(Box::new(simple_error::simple_error!( - "timed out while attempting to send status-message to {}", - stream.peer_addr()? - ))) + let err = simple_error::simple_error!("timed out while attempting to send status-message to {}", stream.peer_addr()?); + Err(Box::new(err)) } /// receives the length-count of a pending message over a client-server communications stream -fn receive_length(stream: &mut TcpStream, alive_check: fn() -> bool, results_handler: &mut dyn FnMut() -> BoxResult<()>) -> BoxResult { +fn receive_length(stream: &mut TcpStream, alive_check: fn() -> bool, handler: &mut dyn FnMut() -> BoxResult<()>) -> BoxResult { stream.set_read_timeout(Some(POLL_TIMEOUT)).expect("unable to set TCP read-timeout"); let mut length_bytes_read = 0; let mut length_spec: [u8; 2] = [0; 2]; while alive_check() { //waiting to find out how long the next message is - results_handler()?; //send any outstanding results between cycles + handler()?; //send any outstanding results between cycles let size = match stream.read(&mut length_spec[length_bytes_read..]) { Ok(size) => size, @@ -128,7 +126,7 @@ fn receive_payload( stream.set_read_timeout(Some(POLL_TIMEOUT)).expect("unable to set TCP read-timeout"); let mut bytes_read = 0; - let mut buffer = vec![0_u8; length.into()]; + let mut buffer = vec![0_u8; length as usize]; while alive_check() { //waiting to receive the payload results_handler()?; //send any outstanding results between cycles @@ -156,7 +154,7 @@ fn receive_payload( if bytes_read == length as usize { match serde_json::from_slice(&buffer) { Ok(v) => { - log::debug!("received {:?} from {}", v, stream.peer_addr()?); + log::debug!("received message from {}: {:?}", stream.peer_addr()?, v); return Ok(v); } Err(e) => { diff --git a/src/protocol/results.rs b/src/protocol/results.rs index 037e601..88db0cd 100644 --- a/src/protocol/results.rs +++ b/src/protocol/results.rs @@ -58,7 +58,7 @@ pub trait IntervalResult { fn to_string(&self, bit: bool) -> String; } -pub type IntervalResultBox = Box; +pub type IntervalResultBox = Box; pub struct ClientDoneResult { pub stream_idx: u8, @@ -477,7 +477,7 @@ impl IntervalResult for UdpSendResult { } } -pub fn interval_result_from_json(value: serde_json::Value) -> BoxResult> { +pub fn interval_result_from_json(value: serde_json::Value) -> BoxResult { match value.get("family") { Some(f) => match f.as_str() { Some(family) => match family { diff --git a/src/server.rs b/src/server.rs index 9bfeda0..e686c31 100644 --- a/src/server.rs +++ b/src/server.rs @@ -21,8 +21,7 @@ use std::io; use std::net::{Shutdown, SocketAddr}; use std::sync::atomic::{AtomicBool, AtomicU16, Ordering}; -use std::sync::mpsc::channel; -use std::sync::{Arc, Mutex}; +use std::sync::{mpsc, Arc, Mutex}; use std::thread; use std::time::Duration; @@ -31,7 +30,7 @@ use std::net::{TcpListener, TcpStream}; use crate::args::Args; use crate::protocol::communication::{receive, send}; use crate::protocol::messaging::{prepare_connect, prepare_connect_ready}; -use crate::protocol::results::ServerDoneResult; +use crate::protocol::results::{IntervalResultBox, ServerDoneResult}; use crate::stream::{tcp, udp, TestStream}; use crate::BoxResult; @@ -56,10 +55,7 @@ fn handle_client( let mut parallel_streams: Vec>> = Vec::new(); let mut parallel_streams_joinhandles = Vec::new(); - let (results_tx, results_rx): ( - std::sync::mpsc::Sender, - std::sync::mpsc::Receiver, - ) = channel(); + let (results_tx, results_rx) = mpsc::channel::(); //a closure used to pass results from stream-handlers to the client-communication stream let mut forwarding_send_stream = stream.try_clone()?; @@ -295,12 +291,12 @@ impl Drop for ClientThreadMonitor { pub fn serve(args: &Args) -> BoxResult<()> { //config-parsing and pre-connection setup let tcp_port_pool = Arc::new(Mutex::new(tcp::receiver::TcpPortPool::new( - args.tcp_port_pool.to_string(), - args.tcp6_port_pool.to_string(), + &args.tcp_port_pool, + &args.tcp6_port_pool, ))); let udp_port_pool = Arc::new(Mutex::new(udp::receiver::UdpPortPool::new( - args.udp_port_pool.to_string(), - args.udp6_port_pool.to_string(), + &args.udp_port_pool, + &args.udp6_port_pool, ))); let cpu_affinity_manager = Arc::new(Mutex::new(crate::utils::cpu_affinity::CpuAffinityManager::new(&args.affinity)?)); @@ -317,53 +313,54 @@ pub fn serve(args: &Args) -> BoxResult<()> { log::info!("server listening on {}", listener.local_addr()?); while is_alive() { - match listener.accept() { - Ok((mut stream, address)) => { - log::info!("connection from {}", address); - - stream.set_nodelay(true).expect("cannot disable Nagle's algorithm"); - - #[cfg(unix)] - { - use crate::protocol::communication::KEEPALIVE_DURATION; - let keepalive_parameters = socket2::TcpKeepalive::new().with_time(KEEPALIVE_DURATION); - let raw_socket = socket2::SockRef::from(&stream); - raw_socket.set_tcp_keepalive(&keepalive_parameters)?; - } - - let client_count = CLIENTS.fetch_add(1, Ordering::Relaxed) + 1; - if client_limit > 0 && client_count > client_limit { - log::warn!("client-limit ({}) reached; disconnecting {}...", client_limit, address.to_string()); - stream.shutdown(Shutdown::Both).unwrap_or_default(); - CLIENTS.fetch_sub(1, Ordering::Relaxed); - } else { - let c_cam = cpu_affinity_manager.clone(); - let c_tcp_port_pool = tcp_port_pool.clone(); - let c_udp_port_pool = udp_port_pool.clone(); - let thread_builder = thread::Builder::new().name(address.to_string()); - thread_builder.spawn(move || { - //ensure the client is accounted-for even if the handler panics - let _client_thread_monitor = ClientThreadMonitor { - client_address: address.to_string(), - }; - - match handle_client(&mut stream, c_cam, c_tcp_port_pool, c_udp_port_pool) { - Ok(_) => (), - Err(e) => log::error!("error in client-handler: {}", e), - } - - //in the event of panic, this will happen when the stream is dropped - stream.shutdown(Shutdown::Both).unwrap_or_default(); - })?; - } - } + let (mut stream, address) = match listener.accept() { + Ok((stream, address)) => (stream, address), Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - //no pending clients + // no pending clients thread::sleep(POLL_TIMEOUT); + continue; } Err(e) => { return Err(Box::new(e)); } + }; + + log::info!("connection from {}", address); + + stream.set_nodelay(true).expect("cannot disable Nagle's algorithm"); + + #[cfg(unix)] + { + use crate::protocol::communication::KEEPALIVE_DURATION; + let keepalive_parameters = socket2::TcpKeepalive::new().with_time(KEEPALIVE_DURATION); + let raw_socket = socket2::SockRef::from(&stream); + raw_socket.set_tcp_keepalive(&keepalive_parameters)?; + } + + let client_count = CLIENTS.fetch_add(1, Ordering::Relaxed) + 1; + if client_limit > 0 && client_count > client_limit { + log::warn!("client-limit ({}) reached; disconnecting {}...", client_limit, address.to_string()); + stream.shutdown(Shutdown::Both).unwrap_or_default(); + CLIENTS.fetch_sub(1, Ordering::Relaxed); + } else { + let c_cam = cpu_affinity_manager.clone(); + let c_tcp_port_pool = tcp_port_pool.clone(); + let c_udp_port_pool = udp_port_pool.clone(); + let thread_builder = thread::Builder::new().name(address.to_string()); + thread_builder.spawn(move || { + // ensure the client is accounted-for even if the handler panics + let _client_thread_monitor = ClientThreadMonitor { + client_address: address.to_string(), + }; + + match handle_client(&mut stream, c_cam, c_tcp_port_pool, c_udp_port_pool) { + Ok(_) => (), + Err(e) => log::error!("error in client-handler: {}", e), + } + + //in the event of panic, this will happen when the stream is dropped + stream.shutdown(Shutdown::Both).unwrap_or_default(); + })?; } } diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 49767d2..1cd0455 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -21,7 +21,7 @@ pub mod tcp; pub mod udp; -use crate::BoxResult; +use crate::{protocol::results::IntervalResultBox, BoxResult}; pub const INTERVAL: std::time::Duration = std::time::Duration::from_secs(1); @@ -30,7 +30,7 @@ pub const INTERVAL: std::time::Duration = std::time::Duration::from_secs(1); /// INTERVAL while gathering data. pub trait TestStream { /// gather data; returns None when the test is over - fn run_interval(&mut self) -> Option>; + fn run_interval(&mut self) -> Option>; /// return the port associated with the test-stream; this may vary over the test's lifetime fn get_port(&self) -> BoxResult; /// returns the index of the test, used to match client and server data @@ -39,7 +39,7 @@ pub trait TestStream { fn stop(&mut self); } -fn parse_port_spec(port_spec: String) -> Vec { +fn parse_port_spec(port_spec: &str) -> Vec { let mut ports = Vec::::new(); if !port_spec.is_empty() { for range in port_spec.split(',') { diff --git a/src/stream/tcp.rs b/src/stream/tcp.rs index 0d2ca85..fdae5b0 100644 --- a/src/stream/tcp.rs +++ b/src/stream/tcp.rs @@ -18,11 +18,10 @@ * along with rperf. If not, see . */ -use crate::protocol::results::{get_unix_timestamp, IntervalResult, TcpReceiveResult, TcpSendResult}; +use crate::protocol::results::{get_unix_timestamp, TcpReceiveResult, TcpSendResult}; +use crate::stream::{parse_port_spec, TestStream, INTERVAL}; use crate::BoxResult; -use super::{parse_port_spec, TestStream, INTERVAL}; - pub const TEST_HEADER_SIZE: usize = 16; #[cfg(unix)] @@ -38,7 +37,7 @@ pub struct TcpTestDefinition { pub length: usize, } impl TcpTestDefinition { - pub fn new(details: &serde_json::Value) -> super::BoxResult { + pub fn new(details: &serde_json::Value) -> BoxResult { let mut test_id_bytes = [0_u8; 16]; for (i, v) in details .get("test_id") @@ -76,13 +75,14 @@ impl TcpTestDefinition { } pub mod receiver { + use mio::net::{TcpListener, TcpStream}; use std::io::Read; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::atomic::{AtomicBool, Ordering::Relaxed}; use std::sync::Mutex; use std::time::{Duration, Instant}; - use mio::net::{TcpListener, TcpStream}; + use crate::{protocol::results::IntervalResultBox, BoxResult}; const POLL_TIMEOUT: Duration = Duration::from_millis(250); const CONNECTION_TIMEOUT: Duration = Duration::from_secs(1); @@ -97,7 +97,7 @@ pub mod receiver { lock_ip6: Mutex, } impl TcpPortPool { - pub fn new(port_spec: String, port_spec6: String) -> TcpPortPool { + pub fn new(port_spec: &str, port_spec6: &str) -> TcpPortPool { let ports = super::parse_port_spec(port_spec); if !ports.is_empty() { log::debug!("configured IPv4 TCP port pool: {:?}", ports); @@ -123,7 +123,7 @@ pub mod receiver { } } - pub fn bind(&mut self, peer_ip: &IpAddr) -> super::BoxResult { + pub fn bind(&mut self, peer_ip: &IpAddr) -> BoxResult { match peer_ip { IpAddr::V6(_) => { if self.ports_ip6.is_empty() { @@ -193,7 +193,6 @@ pub mod receiver { } } - #[allow(dead_code)] pub struct TcpReceiver { active: AtomicBool, test_definition: super::TcpTestDefinition, @@ -213,7 +212,7 @@ pub mod receiver { stream_idx: &u8, port_pool: &mut TcpPortPool, peer_ip: &IpAddr, - ) -> super::BoxResult { + ) -> BoxResult { log::debug!("binding TCP listener for stream {}...", stream_idx); let mut listener: TcpListener = port_pool.bind(peer_ip).expect("failed to bind TCP socket"); log::debug!("bound TCP listener for stream {}: {}", stream_idx, listener.local_addr()?); @@ -238,7 +237,7 @@ pub mod receiver { }) } - fn process_connection(&mut self) -> super::BoxResult<(TcpStream, u64, f32)> { + fn process_connection(&mut self) -> BoxResult<(TcpStream, u64, f32)> { log::debug!("preparing to receive TCP stream {} connection...", self.stream_idx); let listener = self.listener.as_mut().unwrap(); @@ -357,7 +356,7 @@ pub mod receiver { } impl super::TestStream for TcpReceiver { - fn run_interval(&mut self) -> Option>> { + fn run_interval(&mut self) -> Option> { let mut bytes_received: u64 = 0; let mut additional_time_elapsed: f32 = 0.0; @@ -450,7 +449,7 @@ pub mod receiver { } } - fn get_port(&self) -> super::BoxResult { + fn get_port(&self) -> BoxResult { match &self.listener { Some(listener) => Ok(listener.local_addr()?.port()), None => match &self.stream { @@ -476,6 +475,8 @@ pub mod sender { use std::thread::sleep; use std::time::{Duration, Instant}; + use crate::{protocol::results::IntervalResultBox, BoxResult}; + const CONNECT_TIMEOUT: Duration = Duration::from_secs(2); const WRITE_TIMEOUT: Duration = Duration::from_millis(50); const BUFFER_FULL_TIMEOUT: Duration = Duration::from_millis(1); @@ -509,7 +510,7 @@ pub mod sender { send_interval: &f32, send_buffer: &usize, no_delay: &bool, - ) -> super::BoxResult { + ) -> BoxResult { let mut staged_buffer = vec![0_u8; test_definition.length]; for (i, staged_buffer_i) in staged_buffer.iter_mut().enumerate().skip(super::TEST_HEADER_SIZE) { //fill the packet with a fixed sequence @@ -536,8 +537,8 @@ pub mod sender { }) } - fn process_connection(&mut self) -> super::BoxResult { - log::debug!("preparing to connect TCP stream {}...", self.stream_idx); + fn process_connection(&mut self) -> BoxResult { + log::debug!("preparing to connect TCP stream {} to {} ...", self.stream_idx, self.socket_addr); let stream = match TcpStream::connect_timeout(&self.socket_addr, CONNECT_TIMEOUT) { Ok(s) => s, @@ -575,7 +576,7 @@ pub mod sender { } } impl super::TestStream for TcpSender { - fn run_interval(&mut self) -> Option>> { + fn run_interval(&mut self) -> Option> { if self.stream.is_none() { //if still in the setup phase, connect to the receiver match self.process_connection() { @@ -704,7 +705,7 @@ pub mod sender { } } - fn get_port(&self) -> super::BoxResult { + fn get_port(&self) -> BoxResult { match &self.stream { Some(stream) => Ok(stream.local_addr()?.port()), None => Err(Box::new(simple_error::simple_error!("no stream currently exists"))), diff --git a/src/stream/udp.rs b/src/stream/udp.rs index 82c27c4..70fb450 100644 --- a/src/stream/udp.rs +++ b/src/stream/udp.rs @@ -18,8 +18,10 @@ * along with rperf. If not, see . */ -use crate::protocol::results::{get_unix_timestamp, IntervalResult, UdpReceiveResult, UdpSendResult}; -use crate::BoxResult; +use crate::{ + protocol::results::{get_unix_timestamp, UdpReceiveResult, UdpSendResult}, + BoxResult, +}; use super::{parse_port_spec, TestStream, INTERVAL}; @@ -36,7 +38,7 @@ pub struct UdpTestDefinition { pub length: u16, } impl UdpTestDefinition { - pub fn new(details: &serde_json::Value) -> super::BoxResult { + pub fn new(details: &serde_json::Value) -> BoxResult { let mut test_id_bytes = [0_u8; 16]; for (i, v) in details .get("test_id") @@ -74,6 +76,8 @@ impl UdpTestDefinition { } pub mod receiver { + use crate::protocol::results::IntervalResultBox; + use crate::BoxResult; use chrono::NaiveDateTime; use std::convert::TryInto; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}; @@ -92,7 +96,7 @@ pub mod receiver { lock_ip6: Mutex, } impl UdpPortPool { - pub fn new(port_spec: String, port_spec6: String) -> UdpPortPool { + pub fn new(port_spec: &str, port_spec6: &str) -> UdpPortPool { let ports = super::parse_port_spec(port_spec); if !ports.is_empty() { log::debug!("configured IPv4 UDP port pool: {:?}", ports); @@ -118,7 +122,7 @@ pub mod receiver { } } - pub fn bind(&mut self, peer_ip: &IpAddr) -> super::BoxResult { + pub fn bind(&mut self, peer_ip: &IpAddr) -> BoxResult { match peer_ip { IpAddr::V6(_) => { if self.ports_ip6.is_empty() { @@ -218,7 +222,7 @@ pub mod receiver { port_pool: &mut UdpPortPool, peer_ip: &IpAddr, receive_buffer: &usize, - ) -> super::BoxResult { + ) -> BoxResult { log::debug!("binding UDP receive socket for stream {}...", stream_idx); let socket: UdpSocket = port_pool.bind(peer_ip).expect("failed to bind UDP socket"); socket.set_read_timeout(Some(READ_TIMEOUT))?; @@ -352,7 +356,7 @@ pub mod receiver { } } impl super::TestStream for UdpReceiver { - fn run_interval(&mut self) -> Option>> { + fn run_interval(&mut self) -> Option> { let mut buf = vec![0_u8; self.test_definition.length.into()]; let mut bytes_received: u64 = 0; @@ -448,7 +452,7 @@ pub mod receiver { } } - fn get_port(&self) -> super::BoxResult { + fn get_port(&self) -> BoxResult { let socket_addr = self.socket.local_addr()?; Ok(socket_addr.port()) } @@ -464,12 +468,12 @@ pub mod receiver { } pub mod sender { - use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; - use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; - + use crate::protocol::results::IntervalResultBox; + use crate::BoxResult; use std::net::UdpSocket; - + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::thread::sleep; + use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; const WRITE_TIMEOUT: Duration = Duration::from_millis(50); const BUFFER_FULL_TIMEOUT: Duration = Duration::from_millis(1); @@ -499,7 +503,7 @@ pub mod sender { send_duration: &f32, send_interval: &f32, send_buffer: &usize, - ) -> super::BoxResult { + ) -> BoxResult { log::debug!("preparing to connect UDP stream {}...", stream_idx); let socket_addr_receiver = SocketAddr::new(*receiver_ip, *receiver_port); let socket = match receiver_ip { @@ -554,7 +558,7 @@ pub mod sender { } } impl super::TestStream for UdpSender { - fn run_interval(&mut self) -> Option>> { + fn run_interval(&mut self) -> Option> { let interval_duration = Duration::from_secs_f32(self.send_interval); let mut interval_iteration = 0; let bytes_to_send = ((self.test_definition.bandwidth as f32) * super::INTERVAL.as_secs_f32()) as i64; @@ -681,7 +685,7 @@ pub mod sender { } } - fn get_port(&self) -> super::BoxResult { + fn get_port(&self) -> BoxResult { let socket_addr = self.socket.local_addr()?; Ok(socket_addr.port()) }