Skip to content

Commit

Permalink
refine code
Browse files Browse the repository at this point in the history
  • Loading branch information
ssrlive committed Dec 22, 2023
1 parent e988f5d commit bc6dbb7
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 100 deletions.
4 changes: 2 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ pub fn execute(args: &args::Args) -> BoxResult<()> {
let mut complete = false;

//config-parsing and pre-connection setup
let mut tcp_port_pool = tcp::receiver::TcpPortPool::new(args.tcp_port_pool.to_string(), args.tcp6_port_pool.to_string());
let mut udp_port_pool = udp::receiver::UdpPortPool::new(args.udp_port_pool.to_string(), args.udp6_port_pool.to_string());
let mut tcp_port_pool = tcp::receiver::TcpPortPool::new(&args.tcp_port_pool, &args.tcp6_port_pool);
let mut udp_port_pool = udp::receiver::UdpPortPool::new(&args.udp_port_pool, &args.udp6_port_pool);

let cpu_affinity_manager = Arc::new(Mutex::new(crate::utils::cpu_affinity::CpuAffinityManager::new(&args.affinity)?));

Expand Down
16 changes: 7 additions & 9 deletions src/protocol/communication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ pub fn send(stream: &mut TcpStream, message: &serde_json::Value) -> BoxResult<()
let serialised_message = serde_json::to_vec(message)?;

log::debug!(
"sending message of length {}, {:?}, to {}...",
"sending message to {}, length {}, {:?}...",
stream.peer_addr()?,
serialised_message.len(),
message,
stream.peer_addr()?
);
let mut output_buffer = vec![0_u8; serialised_message.len() + 2];
output_buffer[..2].copy_from_slice(&(serialised_message.len() as u16).to_be_bytes());
Expand All @@ -71,21 +71,19 @@ pub fn send(stream: &mut TcpStream, message: &serde_json::Value) -> BoxResult<()
}
}
}
Err(Box::new(simple_error::simple_error!(
"timed out while attempting to send status-message to {}",
stream.peer_addr()?
)))
let err = simple_error::simple_error!("timed out while attempting to send status-message to {}", stream.peer_addr()?);
Err(Box::new(err))
}

/// receives the length-count of a pending message over a client-server communications stream
fn receive_length(stream: &mut TcpStream, alive_check: fn() -> bool, results_handler: &mut dyn FnMut() -> BoxResult<()>) -> BoxResult<u16> {
fn receive_length(stream: &mut TcpStream, alive_check: fn() -> bool, handler: &mut dyn FnMut() -> BoxResult<()>) -> BoxResult<u16> {
stream.set_read_timeout(Some(POLL_TIMEOUT)).expect("unable to set TCP read-timeout");

let mut length_bytes_read = 0;
let mut length_spec: [u8; 2] = [0; 2];
while alive_check() {
//waiting to find out how long the next message is
results_handler()?; //send any outstanding results between cycles
handler()?; //send any outstanding results between cycles

let size = match stream.read(&mut length_spec[length_bytes_read..]) {
Ok(size) => size,
Expand Down Expand Up @@ -156,7 +154,7 @@ fn receive_payload(
if bytes_read == length as usize {
match serde_json::from_slice(&buffer) {
Ok(v) => {
log::debug!("received {:?} from {}", v, stream.peer_addr()?);
log::debug!("received message from {}: {:?}", stream.peer_addr()?, v);
return Ok(v);
}
Err(e) => {
Expand Down
4 changes: 2 additions & 2 deletions src/protocol/results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub trait IntervalResult {
fn to_string(&self, bit: bool) -> String;
}

pub type IntervalResultBox = Box<dyn IntervalResult + Sync + Send>;
pub type IntervalResultBox = Box<dyn IntervalResult + Sync + Send + 'static>;

pub struct ClientDoneResult {
pub stream_idx: u8,
Expand Down Expand Up @@ -477,7 +477,7 @@ impl IntervalResult for UdpSendResult {
}
}

pub fn interval_result_from_json(value: serde_json::Value) -> BoxResult<Box<dyn IntervalResult>> {
pub fn interval_result_from_json(value: serde_json::Value) -> BoxResult<IntervalResultBox> {
match value.get("family") {
Some(f) => match f.as_str() {
Some(family) => match family {
Expand Down
101 changes: 49 additions & 52 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
use std::io;
use std::net::{Shutdown, SocketAddr};
use std::sync::atomic::{AtomicBool, AtomicU16, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex};
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::Duration;

Expand All @@ -31,7 +30,7 @@ use std::net::{TcpListener, TcpStream};
use crate::args::Args;
use crate::protocol::communication::{receive, send};
use crate::protocol::messaging::{prepare_connect, prepare_connect_ready};
use crate::protocol::results::ServerDoneResult;
use crate::protocol::results::{IntervalResultBox, ServerDoneResult};
use crate::stream::{tcp, udp, TestStream};
use crate::BoxResult;

Expand All @@ -56,10 +55,7 @@ fn handle_client(
let mut parallel_streams: Vec<Arc<Mutex<(dyn TestStream + Sync + Send)>>> = Vec::new();
let mut parallel_streams_joinhandles = Vec::new();

let (results_tx, results_rx): (
std::sync::mpsc::Sender<crate::protocol::results::IntervalResultBox>,
std::sync::mpsc::Receiver<crate::protocol::results::IntervalResultBox>,
) = channel();
let (results_tx, results_rx) = mpsc::channel::<IntervalResultBox>();

//a closure used to pass results from stream-handlers to the client-communication stream
let mut forwarding_send_stream = stream.try_clone()?;
Expand Down Expand Up @@ -295,12 +291,12 @@ impl Drop for ClientThreadMonitor {
pub fn serve(args: &Args) -> BoxResult<()> {
//config-parsing and pre-connection setup
let tcp_port_pool = Arc::new(Mutex::new(tcp::receiver::TcpPortPool::new(
args.tcp_port_pool.to_string(),
args.tcp6_port_pool.to_string(),
&args.tcp_port_pool,
&args.tcp6_port_pool,
)));
let udp_port_pool = Arc::new(Mutex::new(udp::receiver::UdpPortPool::new(
args.udp_port_pool.to_string(),
args.udp6_port_pool.to_string(),
&args.udp_port_pool,
&args.udp6_port_pool,
)));

let cpu_affinity_manager = Arc::new(Mutex::new(crate::utils::cpu_affinity::CpuAffinityManager::new(&args.affinity)?));
Expand All @@ -317,53 +313,54 @@ pub fn serve(args: &Args) -> BoxResult<()> {
log::info!("server listening on {}", listener.local_addr()?);

while is_alive() {
match listener.accept() {
Ok((mut stream, address)) => {
log::info!("connection from {}", address);

stream.set_nodelay(true).expect("cannot disable Nagle's algorithm");

#[cfg(unix)]
{
use crate::protocol::communication::KEEPALIVE_DURATION;
let keepalive_parameters = socket2::TcpKeepalive::new().with_time(KEEPALIVE_DURATION);
let raw_socket = socket2::SockRef::from(&stream);
raw_socket.set_tcp_keepalive(&keepalive_parameters)?;
}

let client_count = CLIENTS.fetch_add(1, Ordering::Relaxed) + 1;
if client_limit > 0 && client_count > client_limit {
log::warn!("client-limit ({}) reached; disconnecting {}...", client_limit, address.to_string());
stream.shutdown(Shutdown::Both).unwrap_or_default();
CLIENTS.fetch_sub(1, Ordering::Relaxed);
} else {
let c_cam = cpu_affinity_manager.clone();
let c_tcp_port_pool = tcp_port_pool.clone();
let c_udp_port_pool = udp_port_pool.clone();
let thread_builder = thread::Builder::new().name(address.to_string());
thread_builder.spawn(move || {
//ensure the client is accounted-for even if the handler panics
let _client_thread_monitor = ClientThreadMonitor {
client_address: address.to_string(),
};

match handle_client(&mut stream, c_cam, c_tcp_port_pool, c_udp_port_pool) {
Ok(_) => (),
Err(e) => log::error!("error in client-handler: {}", e),
}

//in the event of panic, this will happen when the stream is dropped
stream.shutdown(Shutdown::Both).unwrap_or_default();
})?;
}
}
let (mut stream, address) = match listener.accept() {
Ok((stream, address)) => (stream, address),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
//no pending clients
// no pending clients
thread::sleep(POLL_TIMEOUT);
continue;
}
Err(e) => {
return Err(Box::new(e));
}
};

log::info!("connection from {}", address);

stream.set_nodelay(true).expect("cannot disable Nagle's algorithm");

#[cfg(unix)]
{
use crate::protocol::communication::KEEPALIVE_DURATION;
let keepalive_parameters = socket2::TcpKeepalive::new().with_time(KEEPALIVE_DURATION);
let raw_socket = socket2::SockRef::from(&stream);
raw_socket.set_tcp_keepalive(&keepalive_parameters)?;
}

let client_count = CLIENTS.fetch_add(1, Ordering::Relaxed) + 1;
if client_limit > 0 && client_count > client_limit {
log::warn!("client-limit ({}) reached; disconnecting {}...", client_limit, address.to_string());
stream.shutdown(Shutdown::Both).unwrap_or_default();
CLIENTS.fetch_sub(1, Ordering::Relaxed);
} else {
let c_cam = cpu_affinity_manager.clone();
let c_tcp_port_pool = tcp_port_pool.clone();
let c_udp_port_pool = udp_port_pool.clone();
let thread_builder = thread::Builder::new().name(address.to_string());
thread_builder.spawn(move || {
// ensure the client is accounted-for even if the handler panics
let _client_thread_monitor = ClientThreadMonitor {
client_address: address.to_string(),
};

match handle_client(&mut stream, c_cam, c_tcp_port_pool, c_udp_port_pool) {
Ok(_) => (),
Err(e) => log::error!("error in client-handler: {}", e),
}

//in the event of panic, this will happen when the stream is dropped
stream.shutdown(Shutdown::Both).unwrap_or_default();
})?;
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
pub mod tcp;
pub mod udp;

use crate::BoxResult;
use crate::{protocol::results::IntervalResultBox, BoxResult};

pub const INTERVAL: std::time::Duration = std::time::Duration::from_secs(1);

Expand All @@ -30,7 +30,7 @@ pub const INTERVAL: std::time::Duration = std::time::Duration::from_secs(1);
/// INTERVAL while gathering data.
pub trait TestStream {
/// gather data; returns None when the test is over
fn run_interval(&mut self) -> Option<BoxResult<crate::protocol::results::IntervalResultBox>>;
fn run_interval(&mut self) -> Option<BoxResult<IntervalResultBox>>;
/// return the port associated with the test-stream; this may vary over the test's lifetime
fn get_port(&self) -> BoxResult<u16>;
/// returns the index of the test, used to match client and server data
Expand All @@ -39,7 +39,7 @@ pub trait TestStream {
fn stop(&mut self);
}

fn parse_port_spec(port_spec: String) -> Vec<u16> {
fn parse_port_spec(port_spec: &str) -> Vec<u16> {
let mut ports = Vec::<u16>::new();
if !port_spec.is_empty() {
for range in port_spec.split(',') {
Expand Down
35 changes: 18 additions & 17 deletions src/stream/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
* along with rperf. If not, see <https://www.gnu.org/licenses/>.
*/

use crate::protocol::results::{get_unix_timestamp, IntervalResult, TcpReceiveResult, TcpSendResult};
use crate::protocol::results::{get_unix_timestamp, TcpReceiveResult, TcpSendResult};
use crate::stream::{parse_port_spec, TestStream, INTERVAL};
use crate::BoxResult;

use super::{parse_port_spec, TestStream, INTERVAL};

pub const TEST_HEADER_SIZE: usize = 16;

#[cfg(unix)]
Expand All @@ -38,7 +37,7 @@ pub struct TcpTestDefinition {
pub length: usize,
}
impl TcpTestDefinition {
pub fn new(details: &serde_json::Value) -> super::BoxResult<TcpTestDefinition> {
pub fn new(details: &serde_json::Value) -> BoxResult<TcpTestDefinition> {
let mut test_id_bytes = [0_u8; 16];
for (i, v) in details
.get("test_id")
Expand Down Expand Up @@ -76,13 +75,14 @@ impl TcpTestDefinition {
}

pub mod receiver {
use mio::net::{TcpListener, TcpStream};
use std::io::Read;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, Ordering::Relaxed};
use std::sync::Mutex;
use std::time::{Duration, Instant};

use mio::net::{TcpListener, TcpStream};
use crate::{protocol::results::IntervalResultBox, BoxResult};

const POLL_TIMEOUT: Duration = Duration::from_millis(250);
const CONNECTION_TIMEOUT: Duration = Duration::from_secs(1);
Expand All @@ -97,7 +97,7 @@ pub mod receiver {
lock_ip6: Mutex<u8>,
}
impl TcpPortPool {
pub fn new(port_spec: String, port_spec6: String) -> TcpPortPool {
pub fn new(port_spec: &str, port_spec6: &str) -> TcpPortPool {
let ports = super::parse_port_spec(port_spec);
if !ports.is_empty() {
log::debug!("configured IPv4 TCP port pool: {:?}", ports);
Expand All @@ -123,7 +123,7 @@ pub mod receiver {
}
}

pub fn bind(&mut self, peer_ip: &IpAddr) -> super::BoxResult<TcpListener> {
pub fn bind(&mut self, peer_ip: &IpAddr) -> BoxResult<TcpListener> {
match peer_ip {
IpAddr::V6(_) => {
if self.ports_ip6.is_empty() {
Expand Down Expand Up @@ -193,7 +193,6 @@ pub mod receiver {
}
}

#[allow(dead_code)]
pub struct TcpReceiver {
active: AtomicBool,
test_definition: super::TcpTestDefinition,
Expand All @@ -213,7 +212,7 @@ pub mod receiver {
stream_idx: &u8,
port_pool: &mut TcpPortPool,
peer_ip: &IpAddr,
) -> super::BoxResult<TcpReceiver> {
) -> BoxResult<TcpReceiver> {
log::debug!("binding TCP listener for stream {}...", stream_idx);
let mut listener: TcpListener = port_pool.bind(peer_ip).expect("failed to bind TCP socket");
log::debug!("bound TCP listener for stream {}: {}", stream_idx, listener.local_addr()?);
Expand All @@ -238,7 +237,7 @@ pub mod receiver {
})
}

fn process_connection(&mut self) -> super::BoxResult<(TcpStream, u64, f32)> {
fn process_connection(&mut self) -> BoxResult<(TcpStream, u64, f32)> {
log::debug!("preparing to receive TCP stream {} connection...", self.stream_idx);

let listener = self.listener.as_mut().unwrap();
Expand Down Expand Up @@ -357,7 +356,7 @@ pub mod receiver {
}

impl super::TestStream for TcpReceiver {
fn run_interval(&mut self) -> Option<super::BoxResult<Box<dyn super::IntervalResult + Sync + Send>>> {
fn run_interval(&mut self) -> Option<BoxResult<IntervalResultBox>> {
let mut bytes_received: u64 = 0;

let mut additional_time_elapsed: f32 = 0.0;
Expand Down Expand Up @@ -450,7 +449,7 @@ pub mod receiver {
}
}

fn get_port(&self) -> super::BoxResult<u16> {
fn get_port(&self) -> BoxResult<u16> {
match &self.listener {
Some(listener) => Ok(listener.local_addr()?.port()),
None => match &self.stream {
Expand All @@ -476,6 +475,8 @@ pub mod sender {
use std::thread::sleep;
use std::time::{Duration, Instant};

use crate::{protocol::results::IntervalResultBox, BoxResult};

const CONNECT_TIMEOUT: Duration = Duration::from_secs(2);
const WRITE_TIMEOUT: Duration = Duration::from_millis(50);
const BUFFER_FULL_TIMEOUT: Duration = Duration::from_millis(1);
Expand Down Expand Up @@ -509,7 +510,7 @@ pub mod sender {
send_interval: &f32,
send_buffer: &usize,
no_delay: &bool,
) -> super::BoxResult<TcpSender> {
) -> BoxResult<TcpSender> {
let mut staged_buffer = vec![0_u8; test_definition.length];
for (i, staged_buffer_i) in staged_buffer.iter_mut().enumerate().skip(super::TEST_HEADER_SIZE) {
//fill the packet with a fixed sequence
Expand All @@ -536,8 +537,8 @@ pub mod sender {
})
}

fn process_connection(&mut self) -> super::BoxResult<TcpStream> {
log::debug!("preparing to connect TCP stream {}...", self.stream_idx);
fn process_connection(&mut self) -> BoxResult<TcpStream> {
log::debug!("preparing to connect TCP stream {} to {} ...", self.stream_idx, self.socket_addr);

let stream = match TcpStream::connect_timeout(&self.socket_addr, CONNECT_TIMEOUT) {
Ok(s) => s,
Expand Down Expand Up @@ -575,7 +576,7 @@ pub mod sender {
}
}
impl super::TestStream for TcpSender {
fn run_interval(&mut self) -> Option<super::BoxResult<Box<dyn super::IntervalResult + Sync + Send>>> {
fn run_interval(&mut self) -> Option<BoxResult<IntervalResultBox>> {
if self.stream.is_none() {
//if still in the setup phase, connect to the receiver
match self.process_connection() {
Expand Down Expand Up @@ -704,7 +705,7 @@ pub mod sender {
}
}

fn get_port(&self) -> super::BoxResult<u16> {
fn get_port(&self) -> BoxResult<u16> {
match &self.stream {
Some(stream) => Ok(stream.local_addr()?.port()),
None => Err(Box::new(simple_error::simple_error!("no stream currently exists"))),
Expand Down
Loading

0 comments on commit bc6dbb7

Please sign in to comment.