From 271343130156585d13ff2b1c6d2b579421c81a4b Mon Sep 17 00:00:00 2001 From: Jonas Ohland Date: Tue, 26 Dec 2023 14:50:34 +0100 Subject: [PATCH] wip --- Cargo.toml | 1 + crates/rist-rs-bits/src/gre/mod.rs | 6 +- crates/rist-rs-bits/src/ip/v4/test.rs | 4 +- crates/rist-rs-bits/src/rtp/mod.rs | 9 +- crates/rist-rs-std/Cargo.toml | 6 +- crates/rist-rs-std/examples/runtime_simple.rs | 68 +++ crates/rist-rs-std/src/lib.rs | 188 ++---- crates/rist-rs-std/src/net.rs | 552 +++++------------- .../rist-rs-std/src/transport/socket/mod.rs | 43 -- crates/rist-rs-test/examples/test-rt-std.rs | 8 +- crates/rist-rs-test/src/proto/mod.rs | 43 +- crates/rist-rs-tokio/Cargo.toml | 1 + crates/rist-rs-tokio/src/lib.rs | 2 - crates/rist-rs-types/Cargo.toml | 1 + crates/rist-rs-types/src/time/rate.rs | 2 +- .../rist-rs-types/src/traits/protocol/mod.rs | 82 ++- .../rist-rs-types/src/traits/runtime/mod.rs | 115 +++- crates/rist-rs-util/src/reorder/ring/test.rs | 4 +- tools/netzwerker/src/ctl.rs | 12 +- tools/netzwerker/src/util/mod.rs | 2 +- 20 files changed, 455 insertions(+), 694 deletions(-) create mode 100644 crates/rist-rs-std/examples/runtime_simple.rs diff --git a/Cargo.toml b/Cargo.toml index f46f099..db7887e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,3 +13,4 @@ members = [ "tools/netzwerker", "tools/rist-io", ] +resolver = "2" diff --git a/crates/rist-rs-bits/src/gre/mod.rs b/crates/rist-rs-bits/src/gre/mod.rs index 64d41b8..8058905 100644 --- a/crates/rist-rs-bits/src/gre/mod.rs +++ b/crates/rist-rs-bits/src/gre/mod.rs @@ -221,9 +221,9 @@ mod test { assert!(!gre.has_key()); assert!(!gre.has_sequence()); assert_eq!(gre.version(), 0); - assert!(matches!(gre.checksum(), None)); - assert!(matches!(gre.sequence_number(), None)); - assert!(matches!(gre.key(), None)); + assert!(gre.checksum().is_none()); + assert!(gre.sequence_number().is_none()); + assert!(gre.key().is_none()); } #[test] diff --git a/crates/rist-rs-bits/src/ip/v4/test.rs b/crates/rist-rs-bits/src/ip/v4/test.rs index f8ee1b7..77654fc 100644 --- a/crates/rist-rs-bits/src/ip/v4/test.rs +++ b/crates/rist-rs-bits/src/ip/v4/test.rs @@ -150,7 +150,7 @@ fn broken_total_len() { ]); // broken total length -> no payload - assert!(matches!(ip.payload(), Err(_))); + assert!(ip.payload().is_err()); // options still valid because header is valid - assert!(matches!(ip.options(), Ok(_))); + assert!(ip.options().is_ok()); } diff --git a/crates/rist-rs-bits/src/rtp/mod.rs b/crates/rist-rs-bits/src/rtp/mod.rs index 9d29f41..fb85ed9 100644 --- a/crates/rist-rs-bits/src/rtp/mod.rs +++ b/crates/rist-rs-bits/src/rtp/mod.rs @@ -280,14 +280,11 @@ mod test { #[test] fn invalid() { // conversion from invalid length should fail - assert!(matches!( - RTPView::try_from(RTP_INVALID_LEN.as_slice()), - Err(_) - )); + assert!(RTPView::try_from(RTP_INVALID_LEN.as_slice()).is_err()); let broken_padding = packet(&RTP_BROKEN_PADDING); assert!(broken_padding.has_padding()); - assert!(matches!(broken_padding.payload(), Err(_))); - assert!(matches!(broken_padding.padding_len().unwrap(), Err(_))) + assert!(broken_padding.payload().is_err()); + assert!(broken_padding.padding_len().unwrap().is_err()) } } diff --git a/crates/rist-rs-std/Cargo.toml b/crates/rist-rs-std/Cargo.toml index 61b8c0b..cdce360 100644 --- a/crates/rist-rs-std/Cargo.toml +++ b/crates/rist-rs-std/Cargo.toml @@ -6,16 +6,16 @@ version = "0.0.1" [dependencies] rand = "0.8" rist-rs-core = { path = "../rist-rs-core", features = ["std"] } -rist-rs-transport-dtls-openssl = { path = "../rist-rs-transport-dtls-openssl", optional = true } rist-rs-types = { path = "../rist-rs-types", features = ["std"] } rist-rs-util = { path = "../rist-rs-util", features = ["std"] } socket2 = { version = "0.4" } +slab = { version = "0.4" } +mio = { version = "0.8", features = ["net", "os-poll"] } tracing = { version = "0.1", default-features = false } [dev-dependencies] tracing-subscriber = { version = "0.3" } [features] -default = ["openssl"] +default = [] log = ["tracing/log"] -openssl = ["rist-rs-transport-dtls-openssl"] diff --git a/crates/rist-rs-std/examples/runtime_simple.rs b/crates/rist-rs-std/examples/runtime_simple.rs new file mode 100644 index 0000000..2100fbe --- /dev/null +++ b/crates/rist-rs-std/examples/runtime_simple.rs @@ -0,0 +1,68 @@ +use std::io; + +use rist_rs_std::StdRuntime; +use rist_rs_types::traits::{ + protocol::{Ctl, Protocol, ProtocolEvent, IOV}, + runtime::Runtime, +}; + +#[derive(Clone, Debug)] +struct SimpleCtl; + +impl Ctl for SimpleCtl { + type Error = io::Error; + + type Output = (); + + fn start() -> Self { + todo!() + } + + fn shutdown() -> Self { + todo!() + } +} + +struct Server {} + +impl Server { + fn new() -> Self { + Self {} + } +} + +impl Protocol for Server +where + R: Runtime, +{ + type Ctl = SimpleCtl; + + fn run(&mut self, rt: &mut R, iov: &[IOV]) -> ProtocolEvent { + let mut buf = [0u8; 1500]; + for ev in iov { + match ev { + IOV::Readable(socket) => match rt.recv_from(*socket, &mut buf) { + Err(err) => tracing::error!(?err, "recv_from"), + Ok((len, addr)) => { + let packet = buf.split_at(len).0; + if let Err(err) = rt.send_to(*socket, packet, addr) { + tracing::error!(?err, "send_to") + } + } + }, + IOV::Writeable(_) => {} + IOV::Error(socket, err) => { + tracing::error!(?socket, ?err, "error"); + } + IOV::Ctl(_) => todo!(), + IOV::Empty => todo!(), + } + } + ProtocolEvent::asap(&rt.get_default_clock()) + } +} + +fn main() { + let rt = StdRuntime::new(); + rt.run(Server::new()); +} diff --git a/crates/rist-rs-std/src/lib.rs b/crates/rist-rs-std/src/lib.rs index af81649..a340ff1 100644 --- a/crates/rist-rs-std/src/lib.rs +++ b/crates/rist-rs-std/src/lib.rs @@ -1,59 +1,18 @@ -// #![allow(unused)] +#![allow(unused)] use std::fmt::{Debug, Display}; use std::hash::Hash; -use std::io; use std::net::SocketAddr as StdSocketAddr; -use std::time::Duration; -use rist_rs_types::traits::protocol::{Ctl, Protocol}; -use rist_rs_types::traits::runtime::{self, Runtime, RuntimeError}; +use rist_rs_types::traits::protocol::Protocol; +use rist_rs_types::traits::runtime::{self, Runtime}; use rist_rs_types::traits::time::clock::StdSystemClock; -use rist_rs_util::collections::static_vec::StaticVec; mod net; pub mod testing; pub mod transport; -use net::Sockets as NetworkSockets; - -#[derive(Debug)] -pub enum StdRuntimeError { - UnknownSocket(u32), - IOE(std::io::Error), - Panic(&'static str), -} - -impl Display for StdRuntimeError { - fn fmt(&self, _: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - todo!() - } -} - -impl RuntimeError for StdRuntimeError { - fn is_not_ready(&self) -> bool { - if let StdRuntimeError::IOE(err) = self { - return err.kind() == std::io::ErrorKind::WouldBlock; - } - false - } - - fn io_error(&self) -> Option<&io::Error> { - match self { - StdRuntimeError::IOE(e) => Some(e), - _ => None, - } - } - - fn into_io_error(self) -> Option { - match self { - StdRuntimeError::IOE(e) => Some(e), - _ => None, - } - } -} - #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] pub enum SocketAddr { NetworkAddress(StdSocketAddr), @@ -71,8 +30,8 @@ impl runtime::SocketAddr for SocketAddr { #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum Socket { - NetworkSocket(net::SocketId), Empty, + Net(usize), } impl runtime::Socket for Socket {} @@ -84,27 +43,24 @@ impl Socket { } pub enum IoEventKind { - Accept(SocketAddr, Socket), Readable(Socket), Writable(Socket), - Error(StdRuntimeError), + Error(runtime::Error), None, } pub struct IoEvent { pub socket: Socket, pub kind: IoEventKind, - pub buf: StaticVec, pub len: usize, } impl IoEvent { - pub fn allocate(num: usize, buf_len: usize) -> Vec { + pub fn allocate(num: usize) -> Vec { (0..num) .map(|_| IoEvent { kind: IoEventKind::None, len: 0, - buf: StaticVec::new(buf_len), socket: Socket::empty(), }) .collect() @@ -120,13 +76,9 @@ impl IoEvent { } } -pub struct StdRuntime { - network_sockets: NetworkSockets, -} +pub struct StdRuntime {} impl Runtime for StdRuntime { - type Error = StdRuntimeError; - type Clock = StdSystemClock; type SocketAddr = SocketAddr; @@ -137,105 +89,61 @@ impl Runtime for StdRuntime { StdSystemClock } - fn bind(&mut self, address: Self::SocketAddr) -> Result { - match address { - SocketAddr::NetworkAddress(address) => self - .network_sockets - .bind(address) - .map_err(StdRuntimeError::IOE) - .map(Into::into), - } + fn bind( + &mut self, + address: Self::SocketAddr, + flags: runtime::SocketFlags, + ) -> Result { + Err(runtime::Error::AddrInUse) } fn connect( &mut self, local_sock_id: Self::Socket, address: Self::SocketAddr, - ) -> Result { - match address { - SocketAddr::NetworkAddress(address) => { - if let Socket::NetworkSocket(socket) = local_sock_id { - self.network_sockets - .connect(socket, address) - .map_err(StdRuntimeError::IOE) - .map(Into::into) - } else { - Err(StdRuntimeError::UnknownSocket(8)) - } - } - } + ) -> Result { + Ok(local_sock_id) } - fn send(&mut self, socket: Self::Socket, buf: &[u8]) -> Result<(), Self::Error> { - match socket { - Socket::NetworkSocket(socket) => self - .network_sockets - .send_non_blocking(socket, buf) - .map_err(StdRuntimeError::IOE), - Socket::Empty => Ok(()), - } + fn send(&mut self, _socket: Self::Socket, _buf: &[u8]) -> Result<(), runtime::Error> { + Ok(()) } - fn get_remote_address(&self, _: Self::Socket) -> Result { + fn get_remote_address(&self, _: Self::Socket) -> Result { todo!() } - fn close(&mut self, socket: Self::Socket) { - match socket { - Socket::NetworkSocket(socket) => self.network_sockets.close(socket), - Socket::Empty => {} - } + fn close(&mut self, _socket: Self::Socket) {} + + fn send_to( + &mut self, + _socket: Self::Socket, + _buf: &[u8], + _address: Self::SocketAddr, + ) -> Result<(), runtime::Error> { + todo!() + } + + fn recv(&mut self, _socket: Self::Socket, buf: &mut [u8]) -> Result { + todo!() + } + + fn recv_from( + &mut self, + _socket: Self::Socket, + _buf: &mut [u8], + ) -> Result<(usize, Self::SocketAddr), runtime::Error> { + todo!() } } impl StdRuntime { #[allow(clippy::new_without_default)] pub fn new() -> Self { - Self { - network_sockets: NetworkSockets::new(), - } + Self {} } - pub fn run_protocol>(mut self, mut protocol: P) { - let mut events = IoEvent::allocate(24, 1500); - protocol.ctl(&mut self, ::start()).unwrap(); - loop { - self.network_sockets.poll_events(&mut events); - for event in events - .iter_mut() - .take_while(|e| !matches!(e.kind, IoEventKind::None)) - { - match &event.kind { - IoEventKind::None => unreachable!(), - IoEventKind::Accept(remote_address, remote_socket_id) => { - protocol.accept( - &mut self, - event.socket, - *remote_socket_id, - *remote_address, - ); - } - IoEventKind::Readable(remote_socket_id) => { - protocol.receive( - &mut self, - *remote_socket_id, - event.buf.split_at(event.len).0, - ); - } - IoEventKind::Writable(socket) => { - protocol.writeable(&mut self, *socket); - } - IoEventKind::Error(error) => { - tracing::error!(?error, "socket error"); - if let Socket::NetworkSocket(socket) = event.socket { - self.network_sockets.close(socket) - } - } - } - } - std::thread::sleep(Duration::from_millis(5)); - } - } + pub fn run>(mut self, mut protocol: P) {} } impl Display for SocketAddr { @@ -249,26 +157,14 @@ impl Display for SocketAddr { impl Display for Socket { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Socket::NetworkSocket(net) => Display::fmt(net, f), Socket::Empty => write!(f, ""), + Socket::Net(_) => todo!(), } } } -impl From for StdRuntimeError { - fn from(value: std::io::Error) -> Self { - StdRuntimeError::IOE(value) - } -} - impl From for SocketAddr { fn from(value: StdSocketAddr) -> Self { Self::NetworkAddress(value) } } - -impl From for Socket { - fn from(value: net::SocketId) -> Self { - Self::NetworkSocket(value) - } -} diff --git a/crates/rist-rs-std/src/net.rs b/crates/rist-rs-std/src/net.rs index d4c5ace..805a607 100644 --- a/crates/rist-rs-std/src/net.rs +++ b/crates/rist-rs-std/src/net.rs @@ -1,472 +1,190 @@ #![allow(unused)] +use mio::{Poll, Token}; +use rist_rs_types::traits::protocol::{Ctl, Events, IOV}; use std::collections::hash_map::{DefaultHasher, Entry}; use std::collections::HashMap; use std::fmt::{Debug, Display}; use std::hash::{Hash, Hasher}; use std::io; use std::net::{SocketAddr, UdpSocket}; +use std::os::fd::{AsRawFd, FromRawFd}; -use crate::{IoEvent, IoEventKind}; +use rist_rs_types::traits::runtime; -#[derive(Clone, Copy, Hash, PartialEq, Eq)] -pub struct SocketId(pub(crate) usize); +use crate::{IoEvent, IoEventKind, StdRuntime}; -impl SocketId { - pub fn empty() -> SocketId { - SocketId(usize::MAX) - } - - pub fn hash(&self) -> u64 { - let mut hasher = DefaultHasher::new(); - self.0.hash(&mut hasher); - hasher.finish() - } +struct NetIo { + poll: mio::Poll, + socks: slab::Slab, + evs: mio::Events, } -impl Debug for SocketId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:x}", self.hash()) +impl NetIo { + fn try_new() -> Result { + Ok(Self { + poll: mio::Poll::new()?, + socks: slab::Slab::new(), + evs: mio::Events::with_capacity(32), + }) } -} -impl Display for SocketId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:x}", self.hash()) - } -} + fn bind(&mut self, addr: SocketAddr) -> Result { + // bind the socket + let mut sock = mio::net::UdpSocket::bind(addr)?; -struct LocalSocket { - socket: UdpSocket, - remotes: HashMap, -} - -struct RemoteSocket { - local_socket_id: usize, - remote_address: SocketAddr, -} - -pub struct Sockets { - num_local_sockets: usize, - num_remote_sockets: usize, - sockets: Vec>, - remote_sockets: Vec>, -} + // add to slab + let sock_id = self.socks.insert(sock); -impl Sockets { - const SOCK_INDEX_PIVOT: usize = usize::MAX / 2; + // register with poller + self.poll.registry().register( + self.socks.get_mut(sock_id).unwrap(), + Token(sock_id), + mio::Interest::WRITABLE | mio::Interest::READABLE, + ); - pub fn new() -> Self { - Self { - num_local_sockets: 0, - num_remote_sockets: 0, - sockets: Default::default(), - remote_sockets: Default::default(), - } + let sock = crate::Socket::Net(sock_id); + tracing::trace!(%addr, ?sock, "udp bound"); + Ok(sock) } - fn bind_non_blocking_socket(address: SocketAddr) -> Result { - let socket = socket2::Socket::from(UdpSocket::bind(address)?); - socket.set_nonblocking(true)?; - Ok(socket.into()) + fn connect(&mut self, socket: usize, addr: SocketAddr) -> Result<(), runtime::Error> { + let sock = self + .socks + .get_mut(socket) + .ok_or(runtime::Error::InvalidInput)?; + sock.connect(addr).map_err(From::from) } - fn update_active_socket_count(&mut self) { - self.num_remote_sockets = self.remote_sockets.iter().filter(|s| s.is_some()).count(); - self.num_local_sockets = self.sockets.iter().filter(|s| s.is_some()).count() + fn send(&mut self, socket: usize, buf: &[u8]) -> Result<(), runtime::Error> { + let sock = self + .socks + .get_mut(socket) + .ok_or(runtime::Error::InvalidInput)?; + sock.send(buf).map_err(From::from).map(|_| ()) } - fn close_remote_socket(&mut self, socket: usize) { - let idx = socket - Self::SOCK_INDEX_PIVOT; - if idx <= self.remote_sockets.len() { - match self.remote_sockets[idx].take() { - Some(remote_socket) => match &mut self.sockets[remote_socket.local_socket_id] { - Some(sock) => { - tracing::trace!(remote_socket_address = %remote_socket.remote_address, remote_socket_index = idx, "removing remote socket entry"); - sock.remotes.remove(&remote_socket.remote_address); - } - None => { - tracing::warn!(socket, "remote socket leaked"); - } - }, - None => { - tracing::warn!(socket, "orphaned socket closed"); + fn send_to( + &mut self, + socket: usize, + buf: &[u8], + addr: SocketAddr, + ) -> Result<(), runtime::Error> { + let sock = self + .socks + .get_mut(socket) + .ok_or(runtime::Error::InvalidInput)?; + sock.send_to(buf, addr).map_err(From::from).map(|_| ()) + } + + fn recv(&mut self, socket: usize, buf: &mut [u8]) -> Result { + let sock = self + .socks + .get_mut(socket) + .ok_or(runtime::Error::InvalidInput)?; + sock.recv(buf).map_err(From::from) + } + + fn recv_from( + &mut self, + socket: usize, + buf: &mut [u8], + ) -> Result<(usize, SocketAddr), runtime::Error> { + let sock = self + .socks + .get_mut(socket) + .ok_or(runtime::Error::InvalidInput)?; + sock.recv_from(buf).map_err(From::from) + } + + fn poll(&mut self, events: &mut Events) { + // system poll + if let Err(err) = self.poll.poll(&mut self.evs, None) { + tracing::error!(%err, "poll") + } + + // copy events + for ev in &self.evs { + match ev { + ev if ev.is_readable() => { + events.push(IOV::Readable(crate::Socket::Net(ev.token().0))) } - } - } - } - - fn close_local_socket(&mut self, socket: usize) { - if socket <= self.sockets.len() { - match self.sockets[socket].take() { - Some(local_socket) => { - for (_, remote_socket) in local_socket.remotes { - self.remote_sockets[remote_socket - Self::SOCK_INDEX_PIVOT].take(); - } + ev if ev.is_writable() => { + events.push(IOV::Writeable(crate::Socket::Net(ev.token().0))) } - None => todo!(), - } - } - } - - fn reserve_socket(sockets: &mut Vec>) -> usize { - let mut idx = 0usize; - loop { - if idx == sockets.len() { - sockets.push(None); - break; - } - if sockets[idx].is_none() { - break; - } - idx += 1 - } - idx - } - - pub fn add(&mut self, socket: UdpSocket) -> io::Result { - let socket = socket2::Socket::from(socket); - socket.set_nonblocking(true)?; - let idx = Self::reserve_socket(&mut self.sockets); - self.sockets[idx] = Some(LocalSocket { - socket: socket.into(), - remotes: Default::default(), - }); - self.update_active_socket_count(); - Ok(SocketId(idx)) - } - - pub fn bind(&mut self, address: SocketAddr) -> io::Result { - let socket = Self::bind_non_blocking_socket(address)?; - let mut idx = Self::reserve_socket(&mut self.sockets); - self.sockets[idx] = Some(LocalSocket { - socket, - remotes: Default::default(), - }); - self.update_active_socket_count(); - Ok(SocketId(idx)) - } - - pub fn close(&mut self, socket: SocketId) { - if socket.0 >= Self::SOCK_INDEX_PIVOT { - self.close_remote_socket(socket.0) - } else { - self.close_local_socket(socket.0) - } - self.update_active_socket_count() - } - - pub fn connect( - &mut self, - local_socket_id: SocketId, - remote_address: SocketAddr, - ) -> io::Result { - let local_socket_id = local_socket_id.0; - match &mut self.sockets[local_socket_id] { - None => Err(io::Error::from(io::ErrorKind::InvalidInput)), - Some(socket_entry) => { - let idx = Self::reserve_socket(&mut self.remote_sockets); - let remote_socket_id = idx + Self::SOCK_INDEX_PIVOT; - tracing::trace!(%remote_address, remote_socket_index = idx, "insert new remote socket entry"); - self.remote_sockets[idx] = Some(RemoteSocket { - local_socket_id, - remote_address, - }); - socket_entry - .remotes - .insert(remote_address, remote_socket_id); - self.update_active_socket_count(); - Ok(SocketId(remote_socket_id)) - } - } - } - - pub fn send_non_blocking(&self, remote_socket_id: SocketId, buf: &[u8]) -> io::Result<()> { - let remote_socket_id = remote_socket_id.0; - match self.remote_sockets[remote_socket_id - Self::SOCK_INDEX_PIVOT] - .as_ref() - .and_then(|remote| { - self.sockets[remote.local_socket_id] - .as_ref() - .map(|socket| (remote.remote_address, &socket.socket)) - }) { - Some((addr, sock)) => sock.send_to(buf, addr).map(|_| ()), - None => Err(io::Error::from(io::ErrorKind::InvalidInput)), - } - } - - pub fn poll_events(&mut self, events: &mut [IoEvent]) { - let reads_per_sock = - 1.max((events.len() / self.num_local_sockets.max(1)) - self.num_remote_sockets); - let mut events_index = 0; - let remote_sockets = &mut self.remote_sockets; - for (local_socket_id, opt_socket_entry) in self.sockets.iter_mut().enumerate() { - if let Some(socket_entry) = opt_socket_entry { - for _ in 0..reads_per_sock { - if events_index == events.len() { - // event buffer already filled - return; - } - let event = &mut events[events_index]; - match socket_entry.socket.recv_from(&mut event.buf) { - Ok((len, remote_address)) => { - event.len = len; - event.socket = SocketId(local_socket_id).into(); - match socket_entry.remotes.entry(remote_address) { - Entry::Occupied(entry) => { - event.kind = - IoEventKind::Readable(SocketId(*entry.get()).into()) - } - Entry::Vacant(entry) => { - let remote_socket_index = Self::reserve_socket(remote_sockets); - tracing::trace!(%remote_address, remote_socket_index, "insert new remote socket entry"); - let remote_socket_id = - remote_socket_index + Self::SOCK_INDEX_PIVOT; - entry.insert(remote_socket_id); - remote_sockets[remote_socket_index] = Some(RemoteSocket { - local_socket_id, - remote_address, - }); - event.kind = IoEventKind::Accept( - remote_address.into(), - SocketId(remote_socket_id).into(), - ) - } - } - } - Err(error) => { - if error.kind() == io::ErrorKind::WouldBlock { - break; - } else { - event.len = 0; - event.socket = SocketId(local_socket_id).into(); - event.kind = IoEventKind::Error(error.into()) - } + ev if ev.is_error() => { + if let Some(sock) = self.socks.get_mut(ev.token().0) { + if let Ok(Some(err)) = sock.take_error() { + events.push(IOV::Error(crate::Socket::Net(ev.token().0), err.into())) } } - events_index += 1; } + ev => tracing::error!(?ev, "unexpected event"), } } - for (i, socket) in self - .remote_sockets - .iter() - .enumerate() - .filter_map(|(i, remote_socket)| remote_socket.as_ref().map(|s| (i, s))) - { - if events_index == events.len() { - // event buffer already filled - return; - } - events[events_index].len = 0; - events[events_index].socket = SocketId(socket.local_socket_id).into(); - events[events_index].kind = - IoEventKind::Writable(SocketId(Self::SOCK_INDEX_PIVOT + i).into()); - events_index += 1; - } - for event in events.iter_mut().skip(events_index) { - event.reset(); - } } } #[allow(unused)] mod test { + use std::{ + net::{Ipv4Addr, SocketAddr, SocketAddrV4, UdpSocket}, + str::{from_utf8, FromStr}, + }; - use super::{SocketId, Sockets}; - use crate::testing::{self, BusyLoopTimeout}; - use crate::{IoEvent, IoEventKind, Socket}; - use std::net::{SocketAddr, SocketAddrV4, UdpSocket}; - use std::time::Duration; + use rist_rs_types::traits::{ + protocol::{Ctl, Events, IOV}, + runtime, + }; + use socket2::SockAddr; - pub fn expect_accept_event(events: &[IoEvent]) -> Option<&IoEvent> { - for event in events { - if let IoEventKind::Accept(remote_address, remote_socket_id) = event.kind { - return Some(event); - } - } - None - } + use crate::{IoEvent, Socket, StdRuntime}; - #[test] - fn test_bind_connect_close() { - let mut sockets = Sockets::new(); - let (port, socket) = testing::get_localhost_bound_socket(); - drop(socket); - let socket = sockets - .bind(testing::sock_addr_localhost(port)) - .expect("bind operation failed"); - sockets.close(socket); - UdpSocket::bind(testing::sock_addr_localhost(port)).expect("port was not closed correctly"); - } + use super::NetIo; - #[test] - fn bind_accept() { - let mut events = IoEvent::allocate(1, 24); - let mut sockets = Sockets::new(); - let (port, socket) = testing::get_localhost_bound_socket(); - let socket = sockets.add(socket).unwrap(); - let (test_port, test_socket) = testing::get_localhost_bound_socket(); + #[derive(Debug, Clone)] + struct DummyCtl; - // send a message to the listening socket - test_socket - .send_to(&[0x00], testing::sock_addr_localhost(port)) - .unwrap(); - let mut timeout = BusyLoopTimeout::new(Duration::from_secs(5)); - loop { - sockets.poll_events(&mut events); - if let IoEventKind::Accept(remote_address, remote_socket_id) = &events[0].kind { - // received an 'Accept' event - assert_eq!( - *remote_address, - testing::sock_addr_localhost(test_port).into() - ); - assert_eq!(events[0].socket, socket.into()); - assert_eq!(events[0].len, 1); + impl Ctl for DummyCtl { + type Error = (); - if let Socket::NetworkSocket(socket) = remote_socket_id { - // close the remote socket - sockets.close(*socket); - } else { - panic!("invalid socket returned in io event") - } - break; - } + type Output = (); - if timeout.sleep() { - panic!("timeout") - } + fn start() -> Self { + Self } - // send another message - test_socket - .send_to(&[0x00], testing::sock_addr_localhost(port)) - .unwrap(); - - // since the remote socket was closed we expect a another Accept event - let mut timeout = BusyLoopTimeout::new(Duration::from_secs(5)); - loop { - sockets.poll_events(&mut events); - if let IoEventKind::Accept(remote_address, remote_socket_id) = &events[0].kind { - assert_eq!( - *remote_address, - testing::sock_addr_localhost(test_port).into() - ); - assert_eq!(events[0].socket, socket.into()); - assert_eq!(events[0].len, 1); - break; - } - - if timeout.sleep() { - panic!("timeout") - } + fn shutdown() -> Self { + Self } - - // make sure no more events are returned and the previous event was cleared - sockets.poll_events(&mut events); - assert!(matches!( - events[0].kind, - IoEventKind::Writable(_) | IoEventKind::None - )); } - #[test] - fn accept_read() { - let mut events = IoEvent::allocate(1, 24); - let mut sockets = Sockets::new(); - let (port, socket) = testing::get_localhost_bound_socket(); - let socket = sockets.add(socket).unwrap(); - let mut remote_socket = SocketId::empty(); - let (test_port, test_socket) = testing::get_localhost_bound_socket(); - - // send a message to the listening socket - test_socket - .send_to(&[0x00], testing::sock_addr_localhost(port)) - .unwrap(); - let mut timeout = BusyLoopTimeout::new(Duration::from_secs(5)); + fn poll_read_one(io: &mut NetIo) -> Vec { + let mut buf = [0; 8192]; + let mut events = Events::::new(1); loop { - sockets.poll_events(&mut events); - if let IoEventKind::Accept(remote_address, remote_socket_id) = &events[0].kind { - // received an 'Accept' event - assert_eq!( - *remote_address, - testing::sock_addr_localhost(test_port).into() - ); - assert_eq!(events[0].socket, socket.into()); - assert_eq!(events[0].len, 1); - match *remote_socket_id { - Socket::NetworkSocket(remote_socket_id) => { - remote_socket = remote_socket_id; - } - _ => panic!("invalid socket returned in io event"), - } - break; - } + io.poll(&mut events); - if timeout.sleep() { - panic!("timeout") - } - } - - // send another message - test_socket - .send_to(&[0x00], testing::sock_addr_localhost(port)) - .unwrap(); - - // since the remote socket was closed we expect a another Accept event - let mut timeout = BusyLoopTimeout::new(Duration::from_secs(5)); - loop { - sockets.poll_events(&mut events); - if let IoEventKind::Readable(remote_socket_id) = &events[0].kind { - assert_eq!(*remote_socket_id, remote_socket.into()); - assert_eq!(events[0].socket, socket.into()); - assert_eq!(events[0].len, 1); - break; - } - - if timeout.sleep() { - panic!("timeout") + if let IOV::Readable(Socket::Net(s)) = events.pop().unwrap() { + let len = io.recv(s, &mut buf).unwrap(); + return buf.split_at(len).0.into(); } } } #[test] - fn accept_reply() { - let mut events = IoEvent::allocate(1, 24); - let mut sockets = Sockets::new(); - let (port, socket) = testing::get_localhost_bound_socket(); - let socket = sockets.add(socket).unwrap(); - let (ex_port, ex_socket) = testing::get_localhost_bound_socket(); - ex_socket - .send_to(&[0x00], testing::sock_addr_localhost(port)) - .unwrap(); - let mut timeout = BusyLoopTimeout::new(Duration::from_secs(5)); - loop { - sockets.poll_events(&mut events); - if let IoEventKind::Accept(remote_address, remote_socket_id) = &events[0].kind { - assert_eq!( - *remote_address, - testing::sock_addr_localhost(ex_port).into() - ); - assert_eq!(events[0].socket, socket.into()); - assert_eq!(events[0].len, 1); - match *remote_socket_id { - Socket::NetworkSocket(remote_socket_id) => { - sockets - .send_non_blocking(remote_socket_id, &[0xff]) - .expect("send failed"); - } - _ => panic!("invalid socket type returned in io event"), - } - break; - } - if timeout.sleep() { - panic!("timeout") - } - } - - let mut buf = [0u8; 24]; - let (len, addr) = ex_socket.recv_from(&mut buf).expect("receive failed"); - assert_eq!(len, 1); - assert_eq!(addr, testing::sock_addr_localhost(port)); - } + fn test_rx() { + tracing_subscriber::fmt() + .with_max_level(tracing::Level::TRACE) + .init(); + let packet_out = [0u8]; + let bind_addr: SocketAddr = "127.0.0.1:10229".parse().unwrap(); + let mut io = NetIo::try_new().unwrap(); + io.bind(bind_addr).unwrap(); + let sender = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)).unwrap(); + sender.send_to(&packet_out, bind_addr); + let packet_in = poll_read_one(&mut io); + assert_eq!(packet_out, packet_in.as_slice()); + } + + fn test_tx() {} } diff --git a/crates/rist-rs-std/src/transport/socket/mod.rs b/crates/rist-rs-std/src/transport/socket/mod.rs index 39b0d03..e910216 100644 --- a/crates/rist-rs-std/src/transport/socket/mod.rs +++ b/crates/rist-rs-std/src/transport/socket/mod.rs @@ -111,47 +111,4 @@ mod test { use super::*; use std::str::{from_utf8, FromStr}; - - #[test] - fn bind() { - drop(NonBlockingUdpSocket::bind(SocketAddr::from_str("0.0.0.0:0").unwrap()).unwrap()); - } - - #[test] - fn read_no_data() { - let mut socket = - NonBlockingUdpSocket::bind(SocketAddr::from_str("0.0.0.0:0").unwrap()).unwrap(); - let mut buf = []; - // should not block and return no data - assert!(matches!(socket.try_recv(&mut buf), None)); - } - - #[test] - fn transmit_non_blocking() { - let rx_listen = SocketAddr::from_str("0.0.0.0:37702").unwrap(); - let mut tx = - NonBlockingUdpSocket::bind(SocketAddr::from_str("0.0.0.0:0").unwrap()).unwrap(); - let mut rx = NonBlockingUdpSocket::bind(rx_listen).unwrap(); - tx.connect(rx_listen).unwrap(); - let buf = "Hello!".as_bytes(); - let mut rxbuf = vec![0u8; buf.len()]; - match tx.try_send(buf) { - Some(r) => match r { - Ok(len) => assert_eq!(len, buf.len()), - Err(e) => panic!("{}", e), - }, - _ => panic!(), - } - match rx.try_recv(&mut rxbuf) { - Some(r) => match r { - Ok(s) => { - assert_eq!(s, buf.len()); - assert_eq!(from_utf8(&rxbuf).unwrap(), "Hello!") - } - Err(e) => panic!("{}", e), - }, - _ => panic!(), - } - assert!(matches!(rx.try_recv(&mut rxbuf), None)); - } } diff --git a/crates/rist-rs-test/examples/test-rt-std.rs b/crates/rist-rs-test/examples/test-rt-std.rs index a3c8393..47b6bac 100644 --- a/crates/rist-rs-test/examples/test-rt-std.rs +++ b/crates/rist-rs-test/examples/test-rt-std.rs @@ -3,7 +3,7 @@ use std::net::SocketAddr; use clap::Parser; use rist_rs_std::StdRuntime; use rist_rs_test::proto::SimpleProto; -use rist_rs_types::traits::runtime::Runtime; +use rist_rs_types::traits::runtime::{Runtime, SocketFlags}; #[derive(clap::Parser)] struct Cli { @@ -20,7 +20,9 @@ fn main() { let cli = Cli::parse(); let mut runtime = StdRuntime::new(); - let socket = runtime.bind(cli.bind.into()).unwrap(); + let socket = runtime + .bind(cli.bind.into(), SocketFlags::RecvFrom) + .unwrap(); let proto = SimpleProto::new(socket, cli.peer); - runtime.run_protocol(proto); + runtime.run(proto); } diff --git a/crates/rist-rs-test/src/proto/mod.rs b/crates/rist-rs-test/src/proto/mod.rs index 70a5778..a73d534 100644 --- a/crates/rist-rs-test/src/proto/mod.rs +++ b/crates/rist-rs-test/src/proto/mod.rs @@ -1,3 +1,5 @@ +#![allow(unused)] + use std::{ collections::{hash_map::Entry, HashMap}, net::SocketAddr, @@ -5,11 +7,12 @@ use std::{ }; use rist_rs_types::traits::{ - protocol::{Ctl, Protocol, ProtocolEvent}, - runtime::{Runtime, RuntimeError, SocketAddr as TSocketAddr}, + protocol::{Ctl, Protocol, ProtocolEvent, IOV}, + runtime::{self, Runtime, SocketAddr as TSocketAddr}, time::clock::{Clock, TimePoint}, }; +#[derive(Debug, Clone)] pub struct SimpleProtoCtl; impl Ctl for SimpleProtoCtl { @@ -84,8 +87,8 @@ where > Duration::from_secs(10) { tracing::info!(remote_socket = %entry.key(), remote_address = %entry.get().address, "peer timed out"); - rt.close(entry.key().clone()); - drop(entry.remove()); + rt.close(*entry.key()); + entry.remove(); updated = true; } else if now .duration_since(entry.get().last_contact) @@ -115,7 +118,7 @@ where for peer in remote_peer_list { if !self.peers.values().any(|s| s.address == (*peer).into()) { let remote_address: R::SocketAddr = (*peer).into(); - match rt.connect(self.local_socket.clone(), remote_address.clone()) { + match rt.connect(self.local_socket, remote_address) { Ok(socket) => { tracing::info!(local_socket = %self.local_socket, remote_socket = %socket, %remote_address, "new peer from member list"); updated = true; @@ -182,12 +185,12 @@ where || now.duration_since(peer.last_send).unwrap_or(Duration::MAX) > Duration::from_millis(300) { - match rt.send(socket.clone(), buf) { + match rt.send(*socket, buf) { Ok(_) => { peer.last_send = now; peer.blocked = false; } - Err(error) if error.is_not_ready() => { + Err(runtime::Error::WouldBlock) => { peer.blocked = true; } Err(error) => { @@ -207,7 +210,7 @@ where for peer in peers { if !self.peers.values().any(|s| s.address == (*peer).into()) { let remote_address: R::SocketAddr = (*peer).into(); - match rt.connect(self.local_socket.clone(), remote_address.clone()) { + match rt.connect(self.local_socket, remote_address) { Ok(socket) => { tracing::info!(local_socket = %self.local_socket, remote_socket = %socket, %remote_address, "new peer from initial member list"); self.peers.insert(socket, Peer::new(now, remote_address)); @@ -226,6 +229,12 @@ where { type Ctl = SimpleProtoCtl; + fn run(&mut self, rt: &mut R, _iov: &[IOV]) -> ProtocolEvent { + todo!() + } + + /* + fn ctl(&mut self, rt: &mut R, _: Self::Ctl) -> Result<(), ()> { if let Some(peers) = self.start_peers.take() { self.add_start_peers(rt, &peers, None); @@ -233,22 +242,6 @@ where Ok(()) } - fn accept( - &mut self, - rt: &mut R, - local_socket: ::Socket, - remote_socket: ::Socket, - remote_address: ::SocketAddr, - ) -> ProtocolEvent { - tracing::info!(%local_socket, %remote_socket, %remote_address, "new peer"); - let now = rt.get_default_clock().now(); - self.peers - .insert(remote_socket, Peer::new(now, remote_address)); - self.cleanup_dead_peers(rt, Some(now)); - self.update_peer_list_message_cache(rt, Some(now)); - ProtocolEvent::asap(&rt.get_default_clock()) - } - fn receive( &mut self, rt: &mut R, @@ -287,4 +280,6 @@ where self.cleanup_dead_peers(rt, None); ProtocolEvent::asap(&rt.get_default_clock()) } + + */ } diff --git a/crates/rist-rs-tokio/Cargo.toml b/crates/rist-rs-tokio/Cargo.toml index 912704c..1be3987 100644 --- a/crates/rist-rs-tokio/Cargo.toml +++ b/crates/rist-rs-tokio/Cargo.toml @@ -6,6 +6,7 @@ version = "0.0.1" [dependencies] futures-channel = { version = "0.3" } rist-rs-core = { path = "../rist-rs-core", features = ["std"] } +rist-rs-types = { path = "../rist-rs-types" } slab = { version = "0.4" } tokio = { version = "1.25", features = ["net", "time"] } tracing = { version = "0.1" } diff --git a/crates/rist-rs-tokio/src/lib.rs b/crates/rist-rs-tokio/src/lib.rs index 92d0c68..f9faf2f 100644 --- a/crates/rist-rs-tokio/src/lib.rs +++ b/crates/rist-rs-tokio/src/lib.rs @@ -1,3 +1 @@ pub mod net; - -pub struct TokioRuntime {} diff --git a/crates/rist-rs-types/Cargo.toml b/crates/rist-rs-types/Cargo.toml index de4669b..6c2fa39 100644 --- a/crates/rist-rs-types/Cargo.toml +++ b/crates/rist-rs-types/Cargo.toml @@ -5,6 +5,7 @@ version = "0.0.1" [dependencies] num-traits = { version = "0.2", default-features = false } +bitflags = { version = "2.4", default-features = false } rist-rs-macros = { path = "../rist-rs-macros" } [features] diff --git a/crates/rist-rs-types/src/time/rate.rs b/crates/rist-rs-types/src/time/rate.rs index 5724614..693fca5 100644 --- a/crates/rist-rs-types/src/time/rate.rs +++ b/crates/rist-rs-types/src/time/rate.rs @@ -108,7 +108,7 @@ mod test { #[test] fn zero_rate() { let rate = Rate::from((1234, 0)); - assert!(matches!(rate.to_f64_checked(), None)); + assert!(rate.to_f64_checked().is_none()); } #[test] diff --git a/crates/rist-rs-types/src/traits/protocol/mod.rs b/crates/rist-rs-types/src/traits/protocol/mod.rs index 0524bb5..d6d8eba 100644 --- a/crates/rist-rs-types/src/traits/protocol/mod.rs +++ b/crates/rist-rs-types/src/traits/protocol/mod.rs @@ -1,8 +1,11 @@ use std::fmt::Debug; -use super::{runtime::Runtime, time::clock::Clock}; +use super::{ + runtime::{self, Runtime}, + time::clock::Clock, +}; -pub trait Ctl: Sized + Send + 'static { +pub trait Ctl: Debug + Clone + Sized + Send + 'static { type Error: Debug; type Output; @@ -28,30 +31,65 @@ where } } +pub enum IOV { + Readable(R::Socket), + Writeable(R::Socket), + Error(R::Socket, runtime::Error), + Ctl(C), + Empty, +} + +impl Clone for IOV { + fn clone(&self) -> Self { + match self { + Self::Readable(arg0) => Self::Readable(*arg0), + Self::Writeable(arg0) => Self::Writeable(*arg0), + Self::Error(arg0, arg1) => Self::Error(*arg0, arg1.clone()), + Self::Ctl(arg0) => Self::Ctl(arg0.clone()), + Self::Empty => Self::Empty, + } + } +} + +impl Debug for IOV { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Readable(arg0) => f.debug_tuple("Readable").field(arg0).finish(), + Self::Writeable(arg0) => f.debug_tuple("Writeable").field(arg0).finish(), + Self::Error(arg0, arg1) => f.debug_tuple("Error").field(arg0).field(arg1).finish(), + Self::Ctl(arg0) => f.debug_tuple("Ctl").field(arg0).finish(), + Self::Empty => write!(f, "Empty"), + } + } +} + +impl Default for IOV { + fn default() -> Self { + Self::Empty + } +} + +pub struct Events(Vec>); + +impl Events { + pub fn new(capacity: usize) -> Self { + Self(Vec::with_capacity(capacity)) + } + + pub fn push(&mut self, ev: IOV) { + self.0.push(ev) + } + + pub fn pop(&mut self) -> Option> { + self.0.pop() + } +} + pub trait Protocol: Sized + Send + 'static where R: Runtime, { type Ctl: Ctl; - fn ctl( - &mut self, - rt: &mut R, - op: Self::Ctl, - ) -> Result<::Output, ::Error>; - - /// Accept a new connection from a remote entity - fn accept( - &mut self, - rt: &mut R, - local_socket: R::Socket, - remote_socket: R::Socket, - remote_address: R::SocketAddr, - ) -> ProtocolEvent; - - fn receive(&mut self, rt: &mut R, socket: R::Socket, buf: &[u8]) -> ProtocolEvent; - - fn writeable(&mut self, rt: &mut R, socket: R::Socket) -> ProtocolEvent; - - fn wake(&mut self, rt: &mut R) -> ProtocolEvent; + fn run(&mut self, rt: &mut R, iov: &[IOV]) -> ProtocolEvent; } diff --git a/crates/rist-rs-types/src/traits/runtime/mod.rs b/crates/rist-rs-types/src/traits/runtime/mod.rs index fba1515..0412e89 100644 --- a/crates/rist-rs-types/src/traits/runtime/mod.rs +++ b/crates/rist-rs-types/src/traits/runtime/mod.rs @@ -1,29 +1,97 @@ -use std::{ +use bitflags::bitflags; +use core::{ fmt::{Debug, Display}, hash::Hash, - io, }; use super::time::clock::Clock; -pub trait RuntimeError: Debug + Display + Send + 'static { - fn is_not_ready(&self) -> bool; - fn io_error(&self) -> Option<&io::Error>; - fn into_io_error(self) -> Option; +pub trait StdError: Debug {} + +bitflags! { + pub struct SocketFlags: u32 { + const RecvFrom = 0x000001; + } +} + +pub enum Error { + WouldBlock, + AddrInUse, + AlreadyExists, + InvalidInput, + Str(&'static str), + Any(Box), + + #[cfg(feature = "std")] + Boxed(Box), +} + +#[cfg(feature = "std")] +impl From> for Error { + fn from(value: Box) -> Self { + Self::Boxed(value) + } +} + +#[cfg(feature = "std")] +impl From for Error { + fn from(value: std::io::Error) -> Self { + match value.kind() { + std::io::ErrorKind::WouldBlock => Self::WouldBlock, + std::io::ErrorKind::AddrInUse => Self::AddrInUse, + std::io::ErrorKind::AlreadyExists => Self::AlreadyExists, + std::io::ErrorKind::InvalidInput => Self::InvalidInput, + _ => Self::Any(Box::new(value)), + } + } +} + +impl From<&'static str> for Error { + fn from(value: &'static str) -> Self { + Self::Str(value) + } +} + +impl Debug for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::WouldBlock => write!(f, "WouldBlock"), + Self::AddrInUse => write!(f, "AddrInUse"), + Self::AlreadyExists => write!(f, "AlreadyExists"), + Self::InvalidInput => write!(f, "InvalidInput"), + Self::Str(s) => write!(f, "{}", s), + Self::Any(arg0) => f.debug_tuple("Any").field(&arg0.to_string()).finish(), + + #[cfg(feature = "std")] + Self::Boxed(err) => f.debug_tuple("Boxed").field(&err.to_string()).finish(), + } + } +} + +impl Clone for Error { + fn clone(&self) -> Self { + match self { + Self::WouldBlock => Self::WouldBlock, + Self::AddrInUse => Self::AddrInUse, + Self::AlreadyExists => Self::AlreadyExists, + Self::InvalidInput => Self::InvalidInput, + Self::Str(arg0) => Self::Str(arg0), + Self::Any(arg0) => Self::Any(Box::new(arg0.to_string())), + Self::Boxed(arg0) => Self::Any(Box::new(arg0.to_string())), + } + } } -pub trait SocketAddr: Debug + Display + Clone + PartialEq + Eq + Hash + Send +pub trait SocketAddr: Debug + Display + Clone + Copy + PartialEq + Eq + Hash + Send where Self: From, { fn network_address(&self) -> Option<&std::net::SocketAddr>; } -pub trait Socket: Debug + Display + Clone + PartialEq + Eq + Hash + Send {} +pub trait Socket: Debug + Display + Clone + Copy + PartialEq + Eq + Hash + Send {} pub trait Runtime: Send + 'static { - type Error: RuntimeError; - type Clock: Clock; type SocketAddr: SocketAddr; @@ -36,17 +104,36 @@ pub trait Runtime: Send + 'static { self.get_clock(None) } - fn get_remote_address(&self, remote: Self::Socket) -> Result; + fn get_remote_address(&self, remote: Self::Socket) -> Result; - fn bind(&mut self, address: Self::SocketAddr) -> Result; + fn bind( + &mut self, + address: Self::SocketAddr, + flags: SocketFlags, + ) -> Result; fn connect( &mut self, socket: Self::Socket, address: Self::SocketAddr, - ) -> Result; + ) -> Result; - fn send(&mut self, socket: Self::Socket, buf: &[u8]) -> Result<(), Self::Error>; + fn send(&mut self, socket: Self::Socket, buf: &[u8]) -> Result<(), Error>; + + fn send_to( + &mut self, + socket: Self::Socket, + buf: &[u8], + address: Self::SocketAddr, + ) -> Result<(), Error>; + + fn recv(&mut self, socket: Self::Socket, buf: &mut [u8]) -> Result; + + fn recv_from( + &mut self, + socket: Self::Socket, + buf: &mut [u8], + ) -> Result<(usize, Self::SocketAddr), Error>; fn close(&mut self, socket: Self::Socket); } diff --git a/crates/rist-rs-util/src/reorder/ring/test.rs b/crates/rist-rs-util/src/reorder/ring/test.rs index 777943d..898d41e 100644 --- a/crates/rist-rs-util/src/reorder/ring/test.rs +++ b/crates/rist-rs-util/src/reorder/ring/test.rs @@ -179,7 +179,7 @@ fn reorder_basic() { test_init(); let mut buf = TestReorderBuffer::::new(32); // push some unordered packets - send_seq(&mut buf, [4, 1, 2, 0, 5, 3].into_iter()); + send_seq(&mut buf, [4, 1, 2, 0, 5, 3]); assert_eq!(buf.len(), 6); // get back ordered packets for i in 0..6u32 { @@ -220,7 +220,7 @@ fn skip_and_drain_empty() { assert!(matches!(buf.next_event(), ReorderQueueEvent::NeedMore)); // explicitly skip the missing packet (seq: 2) and get the next one assert_eq!(buf.skip_to_next().unwrap().sequence_number(), 3); - assert!(matches!(buf.skip_to_next(), None)); + assert!(buf.skip_to_next().is_none()); // now the buffer is drained assert!(buf.is_empty()) } diff --git a/tools/netzwerker/src/ctl.rs b/tools/netzwerker/src/ctl.rs index d86ed9e..510bb90 100644 --- a/tools/netzwerker/src/ctl.rs +++ b/tools/netzwerker/src/ctl.rs @@ -205,12 +205,14 @@ impl ControlProcessorState { tracing::info!("all processors started") } Err(e) => { + let errs = e + .into_iter() + .enumerate() + .map(|(i, e)| format!("{i}: {e:?}")) + .collect::>(); tracing::error!( - "startup failed, one or more processors were not started successfully:\n{}", - e.into_iter() - .enumerate() - .map(|(i, e)| { format!("{i}: {e:?}") }) - .collect::() + ?errs, + "startup failed, one or more processors were not started successfully", ); self.abort().await.unwrap(); } diff --git a/tools/netzwerker/src/util/mod.rs b/tools/netzwerker/src/util/mod.rs index fa1b361..9ae3abc 100644 --- a/tools/netzwerker/src/util/mod.rs +++ b/tools/netzwerker/src/util/mod.rs @@ -30,7 +30,7 @@ pub fn send_packet_to(seq: &mut Vec, packet: Packet) { seq[1].send_packet(p2); } else { seq.iter_mut() - .zip(packet.into_iter()) + .zip(packet) .for_each(|(input, packet)| input.send_packet(packet)); } } else {