diff --git a/monoio/Cargo.toml b/monoio/Cargo.toml index eb25f186..29390fb2 100644 --- a/monoio/Cargo.toml +++ b/monoio/Cargo.toml @@ -63,6 +63,7 @@ tempfile = "3.2" # use nightly only feature flags unstable = [] # async-cancel will push a async-cancel entry into sq when op is canceled +# strongly recommend to enable this feature async-cancel = [] # enanle zero copy(enable SOCK_ZEROCOPY + MSG_ZEROCOPY flag) # WARNING: this feature may cause performance degradation diff --git a/monoio/src/driver/mod.rs b/monoio/src/driver/mod.rs index 3e7c86cd..820e685c 100644 --- a/monoio/src/driver/mod.rs +++ b/monoio/src/driver/mod.rs @@ -157,10 +157,11 @@ impl Inner { } #[allow(unused)] - fn drop_op(&self, index: usize, data: &mut Option) { + #[inline] + fn drop_op(&self, index: usize, data: &mut Option, skip_cancel: bool) { match self { #[cfg(all(target_os = "linux", feature = "iouring"))] - Inner::Uring(this) => UringInner::drop_op(this, index, data), + Inner::Uring(this) => UringInner::drop_op(this, index, data, skip_cancel), #[cfg(feature = "legacy")] Inner::Legacy(_) => {} #[cfg(all( diff --git a/monoio/src/driver/op.rs b/monoio/src/driver/op.rs index 822f15bf..163d6ed7 100644 --- a/monoio/src/driver/op.rs +++ b/monoio/src/driver/op.rs @@ -37,7 +37,7 @@ mod symlink; mod splice; /// In-flight operation -pub(crate) struct Op { +pub(crate) struct Op { // Driver running the operation pub(super) driver: driver::Inner, @@ -58,19 +58,87 @@ pub(crate) struct Completion { /// Operation completion meta info. #[derive(Debug)] pub(crate) struct CompletionMeta { - pub(crate) result: io::Result, + pub(crate) result: io::Result, #[allow(unused)] pub(crate) flags: u32, } +#[derive(Debug)] +pub(crate) struct MaybeFd { + is_fd: bool, + fd: u32, +} + +impl MaybeFd { + #[inline] + pub(crate) unsafe fn new_result(fdr: io::Result, is_fd: bool) -> io::Result { + fdr.map(|fd| Self { is_fd, fd }) + } + + #[inline] + pub(crate) unsafe fn new_fd_result(fdr: io::Result) -> io::Result { + fdr.map(|fd| Self { is_fd: true, fd }) + } + + #[inline] + pub(crate) fn new_non_fd_result(fdr: io::Result) -> io::Result { + fdr.map(|fd| Self { is_fd: false, fd }) + } + + #[inline] + pub(crate) const unsafe fn new_fd(fd: u32) -> Self { + Self { is_fd: true, fd } + } + + #[inline] + pub(crate) const fn new_non_fd(fd: u32) -> Self { + Self { is_fd: false, fd } + } + + #[inline] + pub(crate) const fn into_inner(self) -> u32 { + let fd = self.fd; + std::mem::forget(self); + fd + } + + #[inline] + pub(crate) const fn zero() -> Self { + Self { + is_fd: false, + fd: 0, + } + } +} + +impl Drop for MaybeFd { + fn drop(&mut self) { + // The fd close only executed when: + // 1. the operation is cancelled + // 2. the cancellation failed + // 3. the returned result is a fd + // So this is a relatively cold path. For simplicity, we just do a close syscall here + // instead of pushing close op. + if self.is_fd { + unsafe { + libc::close(self.fd as libc::c_int); + } + } + } +} + pub(crate) trait OpAble { + #[cfg(all(target_os = "linux", feature = "iouring"))] + const RET_IS_FD: bool = false; + #[cfg(all(target_os = "linux", feature = "iouring"))] + const SKIP_CANCEL: bool = false; #[cfg(all(target_os = "linux", feature = "iouring"))] fn uring_op(&mut self) -> io_uring::squeue::Entry; #[cfg(any(feature = "legacy", feature = "poll-io"))] fn legacy_interest(&self) -> Option<(super::ready::Direction, usize)>; #[cfg(any(feature = "legacy", feature = "poll-io"))] - fn legacy_call(&mut self) -> io::Result; + fn legacy_call(&mut self) -> io::Result; } /// If legacy is enabled and iouring is not, we can expose io interface in a poll-like way. @@ -85,10 +153,7 @@ pub(crate) trait PollLegacy { } #[cfg(any(feature = "legacy", feature = "poll-io"))] -impl PollLegacy for T -where - T: OpAble, -{ +impl PollLegacy for T { #[cfg(feature = "legacy")] #[inline] fn poll_legacy(&mut self, _cx: &mut std::task::Context<'_>) -> std::task::Poll { @@ -113,24 +178,18 @@ where } } -impl Op { +impl Op { /// Submit an operation to uring. /// /// `state` is stored during the operation tracking any state submitted to /// the kernel. - pub(super) fn submit_with(data: T) -> io::Result> - where - T: OpAble, - { + pub(super) fn submit_with(data: T) -> io::Result> { driver::CURRENT.with(|this| this.submit_with(data)) } /// Try submitting an operation to uring #[allow(unused)] - pub(super) fn try_submit_with(data: T) -> io::Result> - where - T: OpAble, - { + pub(super) fn try_submit_with(data: T) -> io::Result> { if driver::CURRENT.is_set() { Op::submit_with(data) } else { @@ -138,10 +197,7 @@ impl Op { } } - pub(crate) fn op_canceller(&self) -> OpCanceller - where - T: OpAble, - { + pub(crate) fn op_canceller(&self) -> OpCanceller { #[cfg(feature = "legacy")] if is_legacy() { return if let Some((dir, id)) = self.data.as_ref().unwrap().legacy_interest() { @@ -181,9 +237,11 @@ where } } -impl Drop for Op { +impl Drop for Op { + #[inline] fn drop(&mut self) { - self.driver.drop_op(self.index, &mut self.data); + self.driver + .drop_op(self.index, &mut self.data, T::SKIP_CANCEL); } } diff --git a/monoio/src/driver/op/accept.rs b/monoio/src/driver/op/accept.rs index e683ed96..bec6fb42 100644 --- a/monoio/src/driver/op/accept.rs +++ b/monoio/src/driver/op/accept.rs @@ -18,7 +18,7 @@ use { use super::{super::shared_fd::SharedFd, Op, OpAble}; #[cfg(any(feature = "legacy", feature = "poll-io"))] -use crate::driver::ready::Direction; +use super::{driver::ready::Direction, MaybeFd}; #[cfg(all(unix, any(feature = "legacy", feature = "poll-io")))] use crate::syscall_u32; @@ -54,6 +54,9 @@ impl Op { } impl OpAble for Accept { + #[cfg(all(target_os = "linux", feature = "iouring"))] + const RET_IS_FD: bool = true; + #[cfg(all(target_os = "linux", feature = "iouring"))] fn uring_op(&mut self) -> io_uring::squeue::Entry { opcode::Accept::new( @@ -71,16 +74,16 @@ impl OpAble for Accept { } #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] - fn legacy_call(&mut self) -> io::Result { + fn legacy_call(&mut self) -> io::Result { let fd = self.fd.as_raw_socket(); let addr = self.addr.0.as_mut_ptr() as *mut _; let len = &mut self.addr.1; - syscall!(accept(fd as _, addr, len), PartialEq::eq, INVALID_SOCKET) + syscall!(accept@FD(fd as _, addr, len), PartialEq::eq, INVALID_SOCKET) } #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] - fn legacy_call(&mut self) -> io::Result { + fn legacy_call(&mut self) -> io::Result { let fd = self.fd.as_raw_fd(); let addr = self.addr.0.as_mut_ptr() as *mut _; let len = &mut self.addr.1; @@ -102,12 +105,10 @@ impl OpAble for Accept { target_os = "netbsd", target_os = "openbsd" ))] - return syscall_u32!(accept4( - fd, - addr, - len, - libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK, - )); + return { + let flag = libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK; + syscall_u32!(accept4@FD(fd, addr, len, flag)) + }; // But not all platforms have the `accept4(2)` call. Luckily BSD (derived) // OSes inherit the non-blocking flag from the listener, so we just have to @@ -119,13 +120,11 @@ impl OpAble for Accept { target_os = "redox" ))] return { - let stream_fd = syscall_u32!(accept(fd, addr, len))? as i32; - syscall_u32!(fcntl(stream_fd, libc::F_SETFD, libc::FD_CLOEXEC)) - .and_then(|_| syscall_u32!(fcntl(stream_fd, libc::F_SETFL, libc::O_NONBLOCK))) - .inspect_err(|_| { - let _ = syscall_u32!(close(stream_fd)); - })?; - Ok(stream_fd as _) + let stream_fd = syscall_u32!(accept@FD(fd, addr, len))?; + let fd = stream_fd.get() as i32; + syscall_u32!(fcntl(fd, libc::F_SETFD, libc::FD_CLOEXEC))?; + syscall_u32!(fcntl(fd, libc::F_SETFL, libc::O_NONBLOCK))?; + Ok(stream_fd) }; } } diff --git a/monoio/src/driver/op/close.rs b/monoio/src/driver/op/close.rs index 0ac7a731..5ee6c66e 100644 --- a/monoio/src/driver/op/close.rs +++ b/monoio/src/driver/op/close.rs @@ -10,6 +10,8 @@ use { windows_sys::Win32::Networking::WinSock::closesocket, }; +#[cfg(any(feature = "legacy", feature = "poll-io"))] +use super::MaybeFd; use super::{Op, OpAble}; pub(crate) struct Close { @@ -33,6 +35,9 @@ impl Op { } impl OpAble for Close { + #[cfg(all(target_os = "linux", feature = "iouring"))] + const SKIP_CANCEL: bool = true; + #[cfg(all(target_os = "linux", feature = "iouring"))] fn uring_op(&mut self) -> io_uring::squeue::Entry { opcode::Close::new(types::Fd(self.fd)).build() @@ -45,11 +50,11 @@ impl OpAble for Close { } #[cfg(any(feature = "legacy", feature = "poll-io"))] - fn legacy_call(&mut self) -> io::Result { + fn legacy_call(&mut self) -> io::Result { #[cfg(unix)] - return crate::syscall_u32!(close(self.fd)); + return crate::syscall_u32!(close@NON_FD(self.fd)); #[cfg(windows)] - return syscall!(closesocket(self.fd as _), PartialEq::ne, 0); + return syscall!(closesocket@NON_FD(self.fd as _), PartialEq::ne, 0); } } diff --git a/monoio/src/driver/op/connect.rs b/monoio/src/driver/op/connect.rs index 1fa923e9..46fd5787 100644 --- a/monoio/src/driver/op/connect.rs +++ b/monoio/src/driver/op/connect.rs @@ -10,7 +10,7 @@ use windows_sys::Win32::Networking::WinSock::{ use super::{super::shared_fd::SharedFd, Op, OpAble}; #[cfg(any(feature = "legacy", feature = "poll-io"))] -use crate::driver::ready::Direction; +use super::{driver::ready::Direction, MaybeFd}; pub(crate) struct Connect { pub(crate) fd: SharedFd, @@ -59,7 +59,7 @@ impl OpAble for Connect { } #[cfg(any(feature = "legacy", feature = "poll-io"))] - fn legacy_call(&mut self) -> io::Result { + fn legacy_call(&mut self) -> io::Result { // For ios/macos, if tfo is enabled, we will // call connectx here. // For linux/android, we have already set socket @@ -70,7 +70,7 @@ impl OpAble for Connect { endpoints.sae_dstaddr = self.socket_addr.as_ptr(); endpoints.sae_dstaddrlen = self.socket_addr_len; - return match crate::syscall_u32!(connectx( + return match crate::syscall_u32!(connectx@RAW( self.fd.raw_fd(), &endpoints as *const _, libc::SAE_ASSOCID_ANY, @@ -81,18 +81,18 @@ impl OpAble for Connect { std::ptr::null_mut(), )) { Err(err) if err.raw_os_error() != Some(libc::EINPROGRESS) => Err(err), - _ => Ok(self.fd.raw_fd() as u32), + _ => Ok(MaybeFd::zero()), }; } #[cfg(unix)] - match crate::syscall_u32!(connect( + match crate::syscall_u32!(connect@RAW( self.fd.raw_fd(), self.socket_addr.as_ptr(), self.socket_addr_len, )) { Err(err) if err.raw_os_error() != Some(libc::EINPROGRESS) => Err(err), - _ => Ok(self.fd.raw_fd() as u32), + _ => Ok(MaybeFd::zero()), } #[cfg(windows)] @@ -110,8 +110,7 @@ impl OpAble for Connect { return Err(err); } } - #[allow(clippy::unnecessary_cast)] - Ok(self.fd.raw_socket() as u32) + Ok(MaybeFd::zero()) } } } @@ -158,14 +157,14 @@ impl OpAble for ConnectUnix { } #[cfg(any(feature = "legacy", feature = "poll-io"))] - fn legacy_call(&mut self) -> io::Result { - match crate::syscall_u32!(connect( + fn legacy_call(&mut self) -> io::Result { + match crate::syscall_u32!(connect@RAW( self.fd.raw_fd(), &self.socket_addr.0 as *const _ as *const _, self.socket_addr.1 )) { Err(err) if err.raw_os_error() != Some(libc::EINPROGRESS) => Err(err), - _ => Ok(self.fd.raw_fd() as u32), + _ => Ok(MaybeFd::zero()), } } } diff --git a/monoio/src/driver/op/fsync.rs b/monoio/src/driver/op/fsync.rs index 0dfd1707..799a7521 100644 --- a/monoio/src/driver/op/fsync.rs +++ b/monoio/src/driver/op/fsync.rs @@ -2,17 +2,10 @@ use std::io; #[cfg(all(target_os = "linux", feature = "iouring"))] use io_uring::{opcode, types}; -#[cfg(windows)] -use { - crate::syscall, std::os::windows::prelude::AsRawHandle, - windows_sys::Win32::Storage::FileSystem::FlushFileBuffers, -}; use super::{super::shared_fd::SharedFd, Op, OpAble}; #[cfg(any(feature = "legacy", feature = "poll-io"))] -use crate::driver::ready::Direction; -#[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] -use crate::syscall_u32; +use super::{driver::ready::Direction, MaybeFd}; pub(crate) struct Fsync { #[allow(unused)] @@ -56,23 +49,27 @@ impl OpAble for Fsync { } #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] - fn legacy_call(&mut self) -> io::Result { - syscall!( - FlushFileBuffers(self.fd.as_raw_handle() as _), + fn legacy_call(&mut self) -> io::Result { + use std::os::windows::prelude::AsRawHandle; + + crate::syscall!( + windows_sys::Win32::Storage::FileSystem::FlushFileBuffers@NON_FD(self.fd.as_raw_handle() as _), PartialEq::eq, 0 ) } #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] - fn legacy_call(&mut self) -> io::Result { + fn legacy_call(&mut self) -> io::Result { + use crate::syscall_u32; + #[cfg(target_os = "linux")] if self.data_sync { - syscall_u32!(fdatasync(self.fd.raw_fd())) + syscall_u32!(fdatasync@NON_FD(self.fd.raw_fd())) } else { - syscall_u32!(fsync(self.fd.raw_fd())) + syscall_u32!(fsync@NON_FD(self.fd.raw_fd())) } #[cfg(not(target_os = "linux"))] - syscall_u32!(fsync(self.fd.raw_fd())) + syscall_u32!(fsync@NON_FD(self.fd.raw_fd())) } } diff --git a/monoio/src/driver/op/mkdir.rs b/monoio/src/driver/op/mkdir.rs index be418c51..9ef00fa9 100644 --- a/monoio/src/driver/op/mkdir.rs +++ b/monoio/src/driver/op/mkdir.rs @@ -2,6 +2,8 @@ use std::{ffi::CString, path::Path}; use libc::mode_t; +#[cfg(any(feature = "legacy", feature = "poll-io"))] +use super::MaybeFd; use super::{Op, OpAble}; use crate::driver::util::cstr; @@ -34,14 +36,18 @@ impl OpAble for MkDir { } #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] - fn legacy_call(&mut self) -> std::io::Result { + fn legacy_call(&mut self) -> std::io::Result { use crate::syscall_u32; - syscall_u32!(mkdirat(libc::AT_FDCWD, self.path.as_ptr(), self.mode)) + syscall_u32!(mkdirat@NON_FD( + libc::AT_FDCWD, + self.path.as_ptr(), + self.mode + )) } #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] - fn legacy_call(&mut self) -> io::Result { + fn legacy_call(&mut self) -> io::Result { unimplemented!() } } diff --git a/monoio/src/driver/op/open.rs b/monoio/src/driver/op/open.rs index 003dc34c..1673a3ce 100644 --- a/monoio/src/driver/op/open.rs +++ b/monoio/src/driver/op/open.rs @@ -2,14 +2,10 @@ use std::{ffi::CString, io, path::Path}; #[cfg(all(target_os = "linux", feature = "iouring"))] use io_uring::{opcode, types}; -#[cfg(windows)] -use windows_sys::Win32::{Foundation::INVALID_HANDLE_VALUE, Storage::FileSystem::CreateFileW}; -use super::{Op, OpAble}; #[cfg(any(feature = "legacy", feature = "poll-io"))] -use crate::driver::ready::Direction; -#[cfg(windows)] -use crate::syscall; +use super::{driver::ready::Direction, MaybeFd}; +use super::{Op, OpAble}; #[cfg(all(unix, any(feature = "legacy", feature = "poll-io")))] use crate::syscall_u32; use crate::{driver::util::cstr, fs::OpenOptions}; @@ -54,6 +50,9 @@ impl Op { } impl OpAble for Open { + #[cfg(all(target_os = "linux", feature = "iouring"))] + const RET_IS_FD: bool = true; + #[cfg(all(target_os = "linux", feature = "iouring"))] fn uring_op(&mut self) -> io_uring::squeue::Entry { opcode::OpenAt::new(types::Fd(libc::AT_FDCWD), self.path.as_c_str().as_ptr()) @@ -69,8 +68,8 @@ impl OpAble for Open { } #[cfg(all(any(feature = "legacy", feature = "poll-io"), not(windows)))] - fn legacy_call(&mut self) -> io::Result { - syscall_u32!(open( + fn legacy_call(&mut self) -> io::Result { + syscall_u32!(open@FD( self.path.as_c_str().as_ptr(), self.flags, self.mode as libc::c_int @@ -78,15 +77,20 @@ impl OpAble for Open { } #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] - fn legacy_call(&mut self) -> io::Result { + fn legacy_call(&mut self) -> io::Result { use std::{ffi::OsString, os::windows::ffi::OsStrExt}; + use windows_sys::Win32::{ + Foundation::INVALID_HANDLE_VALUE, Storage::FileSystem::CreateFileW, + }; + let os_str = OsString::from(self.path.to_string_lossy().into_owned()); // Convert OsString to wide character format (Vec). let wide_path: Vec = os_str.encode_wide().chain(Some(0)).collect(); - syscall!( - CreateFileW( + + crate::syscall!( + CreateFileW@FD( wide_path.as_ptr(), self.opts.access_mode()?, self.opts.share_mode, diff --git a/monoio/src/driver/op/poll.rs b/monoio/src/driver/op/poll.rs index 56aaae0e..d9d51e0f 100644 --- a/monoio/src/driver/op/poll.rs +++ b/monoio/src/driver/op/poll.rs @@ -1,20 +1,8 @@ use std::io; -#[cfg(windows)] -use std::{ - io::{Error, ErrorKind}, - os::windows::prelude::AsRawSocket, -}; - -#[cfg(all(target_os = "linux", feature = "iouring"))] -use io_uring::{opcode, types}; -#[cfg(windows)] -use windows_sys::Win32::Networking::WinSock::{ - WSAGetLastError, WSAPoll, POLLIN, POLLOUT, SOCKET_ERROR, WSAPOLLFD, -}; use super::{super::shared_fd::SharedFd, Op, OpAble}; #[cfg(any(feature = "legacy", feature = "poll-io"))] -use crate::driver::ready::Direction; +use super::{driver::ready::Direction, MaybeFd}; pub(crate) struct PollAdd { /// Holds a strong ref to the FD, preventing the file from being closed @@ -55,6 +43,8 @@ impl Op { impl OpAble for PollAdd { #[cfg(all(target_os = "linux", feature = "iouring"))] fn uring_op(&mut self) -> io_uring::squeue::Entry { + use io_uring::{opcode, types}; + opcode::PollAdd::new( types::Fd(self.fd.raw_fd()), if self.is_read { @@ -82,7 +72,7 @@ impl OpAble for PollAdd { } #[cfg(all(any(feature = "legacy", feature = "poll-io"), not(windows)))] - fn legacy_call(&mut self) -> io::Result { + fn legacy_call(&mut self) -> io::Result { if !self.relaxed { use std::{io::ErrorKind, os::fd::AsRawFd}; @@ -95,16 +85,25 @@ impl OpAble for PollAdd { }, revents: 0, }; - let ret = crate::syscall_u32!(poll(&mut pollfd as *mut _, 1, 0))?; + let ret = crate::syscall_u32!(poll@RAW(&mut pollfd as *mut _, 1, 0))?; if ret == 0 { return Err(ErrorKind::WouldBlock.into()); } } - Ok(0) + Ok(MaybeFd::new_non_fd(1)) } #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] - fn legacy_call(&mut self) -> io::Result { + fn legacy_call(&mut self) -> io::Result { + use std::{ + io::{Error, ErrorKind}, + os::windows::prelude::AsRawSocket, + }; + + use windows_sys::Win32::Networking::WinSock::{ + WSAGetLastError, WSAPoll, POLLIN, POLLOUT, SOCKET_ERROR, WSAPOLLFD, + }; + if !self.relaxed { let mut pollfd = WSAPOLLFD { fd: self.fd.as_raw_socket() as _, @@ -125,6 +124,6 @@ impl OpAble for PollAdd { _ => (), } } - Ok(0) + Ok(MaybeFd::new_non_fd(1)) } } diff --git a/monoio/src/driver/op/read.rs b/monoio/src/driver/op/read.rs index 2c0c20e7..0dede746 100644 --- a/monoio/src/driver/op/read.rs +++ b/monoio/src/driver/op/read.rs @@ -9,7 +9,7 @@ use io_uring::{opcode, types}; use super::{super::shared_fd::SharedFd, Op, OpAble}; #[cfg(any(feature = "legacy", feature = "poll-io"))] -use crate::driver::ready::Direction; +use super::{driver::ready::Direction, MaybeFd}; use crate::{ buf::{IoBufMut, IoVecBufMut}, BufResult, @@ -23,7 +23,7 @@ macro_rules! read_result { let complete = self.await; // Convert the operation result to `usize` - let res = complete.meta.result.map(|v| v as usize); + let res = complete.meta.result.map(|v| v.into_inner() as usize); // Recover the buffer let mut buf = complete.data.$buf; @@ -86,7 +86,7 @@ impl OpAble for Read { } #[cfg(any(feature = "legacy", feature = "poll-io"))] - fn legacy_call(&mut self) -> io::Result { + fn legacy_call(&mut self) -> io::Result { #[cfg(unix)] let fd = self.fd.as_raw_fd(); @@ -131,7 +131,7 @@ impl OpAble for ReadAt { } #[cfg(any(feature = "legacy", feature = "poll-io"))] - fn legacy_call(&mut self) -> io::Result { + fn legacy_call(&mut self) -> io::Result { #[cfg(unix)] let fd = self.fd.as_raw_fd(); #[cfg(windows)] @@ -179,7 +179,7 @@ impl OpAble for ReadVec { } #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] - fn legacy_call(&mut self) -> io::Result { + fn legacy_call(&mut self) -> io::Result { read_vectored( self.fd.raw_fd(), self.buf_vec.write_iovec_ptr(), @@ -188,7 +188,7 @@ impl OpAble for ReadVec { } #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] - fn legacy_call(&mut self) -> io::Result { + fn legacy_call(&mut self) -> io::Result { // There is no `readv`-like syscall of file on windows, but this will be used to send // socket message. @@ -208,11 +208,11 @@ impl OpAble for ReadVec { ) }; match ret { - 0 => Ok(nread), + 0 => Ok(MaybeFd::new_non_fd(nread)), _ => { let error = unsafe { WSAGetLastError() }; if error == WSAESHUTDOWN { - Ok(0) + Ok(MaybeFd::zero()) } else { Err(io::Error::from_raw_os_error(error)) } @@ -257,7 +257,7 @@ impl OpAble for ReadVecAt { } #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] - fn legacy_call(&mut self) -> io::Result { + fn legacy_call(&mut self) -> io::Result { read_vectored_at( self.fd.raw_fd(), self.buf_vec.write_iovec_ptr(), @@ -267,7 +267,7 @@ impl OpAble for ReadVecAt { } #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] - fn legacy_call(&mut self) -> io::Result { + fn legacy_call(&mut self) -> io::Result { // There is no `readv` like syscall of file on windows, but this will be used to send // socket message. @@ -295,11 +295,11 @@ impl OpAble for ReadVecAt { ) }; match ret { - 0 => Ok(nread), + 0 => Ok(MaybeFd::new_non_fd(nread)), _ => { let error = unsafe { WSAGetLastError() }; if error == WSAESHUTDOWN { - Ok(0) + Ok(MaybeFd::zero()) } else { Err(io::Error::from_raw_os_error(error)) } @@ -316,21 +316,21 @@ pub(crate) mod impls { use crate::syscall_u32; /// A wrapper for [`libc::read`] - pub(crate) fn read(fd: i32, buf: *mut u8, len: usize) -> io::Result { - syscall_u32!(read(fd, buf as _, len)) + pub(crate) fn read(fd: i32, buf: *mut u8, len: usize) -> io::Result { + syscall_u32!(read@NON_FD(fd, buf as _, len)) } /// A wrapper of [`libc::pread`] - pub(crate) fn read_at(fd: i32, buf: *mut u8, len: usize, offset: u64) -> io::Result { + pub(crate) fn read_at(fd: i32, buf: *mut u8, len: usize, offset: u64) -> io::Result { let offset = libc::off_t::try_from(offset) .map_err(|_| io::Error::new(io::ErrorKind::Other, "offset too big"))?; - syscall_u32!(pread(fd, buf as _, len, offset)) + syscall_u32!(pread@NON_FD(fd, buf as _, len, offset)) } /// A wrapper of [`libc::readv`] - pub(crate) fn read_vectored(fd: i32, buf_vec: *mut iovec, len: usize) -> io::Result { - syscall_u32!(readv(fd, buf_vec as _, len as _)) + pub(crate) fn read_vectored(fd: i32, buf_vec: *mut iovec, len: usize) -> io::Result { + syscall_u32!(readv@NON_FD(fd, buf_vec as _, len as _)) } /// A wrapper of [`libc::preadv`] @@ -339,11 +339,11 @@ pub(crate) mod impls { buf_vec: *mut iovec, len: usize, offset: u64, - ) -> io::Result { + ) -> io::Result { let offset = libc::off_t::try_from(offset) .map_err(|_| io::Error::new(io::ErrorKind::Other, "offset too big"))?; - syscall_u32!(preadv(fd, buf_vec as _, len as _, offset)) + syscall_u32!(preadv@NON_FD(fd, buf_vec as _, len as _, offset)) } } diff --git a/monoio/src/driver/op/recv.rs b/monoio/src/driver/op/recv.rs index ae9b2571..6972832a 100644 --- a/monoio/src/driver/op/recv.rs +++ b/monoio/src/driver/op/recv.rs @@ -11,9 +11,10 @@ use { crate::net::unix::SocketAddr as UnixSocketAddr, libc::{sockaddr_in, sockaddr_in6, sockaddr_storage, socklen_t, AF_INET, AF_INET6}, }; +#[cfg(all(unix, any(feature = "legacy", feature = "poll-io")))] +use {crate::syscall_u32, std::os::unix::prelude::AsRawFd}; #[cfg(all(windows, any(feature = "legacy", feature = "poll-io")))] use { - crate::syscall, std::os::windows::io::AsRawSocket, windows_sys::Win32::Networking::WinSock::recv, windows_sys::{ @@ -30,12 +31,10 @@ use { }, }, }; -#[cfg(all(unix, any(feature = "legacy", feature = "poll-io")))] -use {crate::syscall_u32, std::os::unix::prelude::AsRawFd}; use super::{super::shared_fd::SharedFd, Op, OpAble}; #[cfg(any(feature = "legacy", feature = "poll-io"))] -use crate::driver::ready::Direction; +use super::{driver::ready::Direction, MaybeFd}; use crate::{ buf::{IoBufMut, IoVecBufMut, IoVecMeta, MsgMeta}, BufResult, @@ -66,7 +65,7 @@ impl Op> { pub(crate) async fn result(self) -> BufResult { let complete = self.await; - let res = complete.meta.result.map(|v| v as _); + let res = complete.meta.result.map(|v| v.into_inner() as _); let mut buf = complete.data.buf; if let Ok(n) = res { @@ -97,9 +96,9 @@ impl OpAble for Recv { } #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] - fn legacy_call(&mut self) -> io::Result { + fn legacy_call(&mut self) -> io::Result { let fd = self.fd.as_raw_fd(); - syscall_u32!(recv( + syscall_u32!(recv@NON_FD( fd, self.buf.write_ptr() as _, self.buf.bytes_total().min(u32::MAX as usize), @@ -108,9 +107,9 @@ impl OpAble for Recv { } #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] - fn legacy_call(&mut self) -> io::Result { + fn legacy_call(&mut self) -> io::Result { let fd = self.fd.as_raw_socket(); - syscall!( + MaybeFd::new_non_fd_result(crate::syscall!( recv( fd as _, self.buf.write_ptr(), @@ -119,7 +118,7 @@ impl OpAble for Recv { ), PartialOrd::lt, 0 - ) + )) } } @@ -162,7 +161,7 @@ impl Op> { pub(crate) async fn wait(self) -> BufResult<(usize, SocketAddr), T> { let complete = self.await; - let res = complete.meta.result.map(|v| v as _); + let res = complete.meta.result.map(|v| v.into_inner() as _); let mut buf = complete.data.buf; let res = res.map(|n| { @@ -236,13 +235,13 @@ impl OpAble for RecvMsg { } #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] - fn legacy_call(&mut self) -> io::Result { + fn legacy_call(&mut self) -> io::Result { let fd = self.fd.as_raw_fd(); - syscall_u32!(recvmsg(fd, &mut *self.info.2, 0)) + syscall_u32!(recvmsg@NON_FD(fd, &mut *self.info.2, 0)) } #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] - fn legacy_call(&mut self) -> io::Result { + fn legacy_call(&mut self) -> io::Result { let fd = self.fd.as_raw_socket() as _; let func_ptr = WSA_RECV_MSG.get_or_init(|| unsafe { let mut wsa_recv_msg: LPFN_WSARECVMSG = None; @@ -281,7 +280,7 @@ impl OpAble for RecvMsg { if r == SOCKET_ERROR { unsafe { Err(io::Error::from_raw_os_error(WSAGetLastError())) } } else { - Ok(recved) + Ok(MaybeFd::new_non_fd(recved)) } } } @@ -317,7 +316,7 @@ impl Op> { pub(crate) async fn wait(self) -> BufResult<(usize, UnixSocketAddr), T> { let complete = self.await; - let res = complete.meta.result.map(|v| v as _); + let res = complete.meta.result.map(|v| v.into_inner() as _); let mut buf = complete.data.buf; let res = res.map(|n| { @@ -354,8 +353,8 @@ impl OpAble for RecvMsgUnix { } #[cfg(any(feature = "legacy", feature = "poll-io"))] - fn legacy_call(&mut self) -> io::Result { + fn legacy_call(&mut self) -> io::Result { let fd = self.fd.as_raw_fd(); - syscall_u32!(recvmsg(fd, &mut self.info.2 as *mut _, 0)) + syscall_u32!(recvmsg@NON_FD(fd, &mut self.info.2 as *mut _, 0)) } } diff --git a/monoio/src/driver/op/rename.rs b/monoio/src/driver/op/rename.rs index df9719a0..2bf770ad 100644 --- a/monoio/src/driver/op/rename.rs +++ b/monoio/src/driver/op/rename.rs @@ -1,5 +1,7 @@ use std::{ffi::CString, path::Path}; +#[cfg(any(feature = "legacy", feature = "poll-io"))] +use super::MaybeFd; use super::{Op, OpAble}; use crate::driver::util::cstr; @@ -37,10 +39,10 @@ impl OpAble for Rename { } #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] - fn legacy_call(&mut self) -> std::io::Result { + fn legacy_call(&mut self) -> std::io::Result { use crate::syscall_u32; - syscall_u32!(renameat( + syscall_u32!(renameat@NON_FD( libc::AT_FDCWD, self.from.as_ptr(), libc::AT_FDCWD, @@ -49,7 +51,7 @@ impl OpAble for Rename { } #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] - fn legacy_call(&mut self) -> io::Result { + fn legacy_call(&mut self) -> io::Result { unimplemented!() } } diff --git a/monoio/src/driver/op/send.rs b/monoio/src/driver/op/send.rs index d49e13b9..f4431b0d 100644 --- a/monoio/src/driver/op/send.rs +++ b/monoio/src/driver/op/send.rs @@ -14,7 +14,7 @@ use {crate::syscall_u32, std::os::unix::prelude::AsRawFd}; use super::{super::shared_fd::SharedFd, Op, OpAble}; #[cfg(any(feature = "legacy", feature = "poll-io"))] -use crate::driver::ready::Direction; +use super::{driver::ready::Direction, MaybeFd}; #[cfg(unix)] use crate::net::unix::SocketAddr as UnixSocketAddr; use crate::{ @@ -46,7 +46,10 @@ impl Op> { pub(crate) async fn result(self) -> BufResult { let complete = self.await; - (complete.meta.result.map(|v| v as _), complete.data.buf) + ( + complete.meta.result.map(|v| v.into_inner() as _), + complete.data.buf, + ) } } @@ -93,7 +96,7 @@ impl OpAble for Send { } #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] - fn legacy_call(&mut self) -> io::Result { + fn legacy_call(&mut self) -> io::Result { let fd = self.fd.as_raw_fd(); #[cfg(target_os = "linux")] #[allow(deprecated)] @@ -101,7 +104,7 @@ impl OpAble for Send { #[cfg(not(target_os = "linux"))] let flags = 0; - syscall_u32!(send( + syscall_u32!(send@NON_FD( fd, self.buf.read_ptr() as _, self.buf.bytes_init(), @@ -113,7 +116,7 @@ impl OpAble for Send { fn legacy_call(&mut self) -> io::Result { let fd = self.fd.as_raw_socket(); syscall!( - send(fd as _, self.buf.read_ptr(), self.buf.bytes_init() as _, 0), + send@NON_FD(fd as _, self.buf.read_ptr(), self.buf.bytes_init() as _, 0), PartialOrd::lt, 0 ) @@ -180,7 +183,7 @@ impl Op> { pub(crate) async fn wait(self) -> BufResult { let complete = self.await; - let res = complete.meta.result.map(|v| v as _); + let res = complete.meta.result.map(|v| v.into_inner() as _); let buf = complete.data.buf; (res, buf) } @@ -205,14 +208,14 @@ impl OpAble for SendMsg { } #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] - fn legacy_call(&mut self) -> io::Result { + fn legacy_call(&mut self) -> io::Result { #[cfg(target_os = "linux")] #[allow(deprecated)] const FLAGS: libc::c_int = libc::MSG_NOSIGNAL as libc::c_int; #[cfg(not(target_os = "linux"))] const FLAGS: libc::c_int = 0; let fd = self.fd.as_raw_fd(); - syscall_u32!(sendmsg(fd, &*self.info.2, FLAGS)) + syscall_u32!(sendmsg@NON_FD(fd, &*self.info.2, FLAGS)) } #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] @@ -282,7 +285,7 @@ impl Op> { pub(crate) async fn wait(self) -> BufResult { let complete = self.await; - let res = complete.meta.result.map(|v| v as _); + let res = complete.meta.result.map(|v| v.into_inner() as _); let buf = complete.data.buf; (res, buf) } @@ -309,13 +312,13 @@ impl OpAble for SendMsgUnix { #[cfg(any(feature = "legacy", feature = "poll-io"))] #[inline] - fn legacy_call(&mut self) -> io::Result { + fn legacy_call(&mut self) -> io::Result { #[cfg(target_os = "linux")] #[allow(deprecated)] const FLAGS: libc::c_int = libc::MSG_NOSIGNAL as libc::c_int; #[cfg(not(target_os = "linux"))] const FLAGS: libc::c_int = 0; let fd = self.fd.as_raw_fd(); - syscall_u32!(sendmsg(fd, &mut self.info.2 as *mut _, FLAGS)) + syscall_u32!(sendmsg@NON_FD(fd, &mut self.info.2 as *mut _, FLAGS)) } } diff --git a/monoio/src/driver/op/splice.rs b/monoio/src/driver/op/splice.rs index 21940576..47771b81 100644 --- a/monoio/src/driver/op/splice.rs +++ b/monoio/src/driver/op/splice.rs @@ -6,7 +6,10 @@ use std::io; use io_uring::{opcode, types}; #[cfg(all(unix, feature = "legacy"))] use { - crate::{driver::ready::Direction, syscall_u32}, + crate::{ + driver::{op::MaybeFd, ready::Direction}, + syscall_u32, + }, std::os::unix::prelude::AsRawFd, }; @@ -53,7 +56,7 @@ impl Op { pub(crate) async fn splice(self) -> io::Result { let complete = self.await; - complete.meta.result + complete.meta.result.map(|v| v.into_inner()) } } @@ -88,13 +91,13 @@ impl OpAble for Splice { } #[cfg(all(unix, feature = "legacy"))] - fn legacy_call(&mut self) -> io::Result { + fn legacy_call(&mut self) -> io::Result { const FLAG: u32 = libc::SPLICE_F_MOVE | libc::SPLICE_F_NONBLOCK; let fd_in = self.fd_in.as_raw_fd(); let fd_out = self.fd_out.as_raw_fd(); let off_in = std::ptr::null_mut::(); let off_out = std::ptr::null_mut::(); - syscall_u32!(splice( + syscall_u32!(splice@NON_FD( fd_in, off_in, fd_out, diff --git a/monoio/src/driver/op/statx.rs b/monoio/src/driver/op/statx.rs index 971ee3c0..e535fb9d 100644 --- a/monoio/src/driver/op/statx.rs +++ b/monoio/src/driver/op/statx.rs @@ -5,9 +5,9 @@ use io_uring::{opcode, types}; #[cfg(target_os = "linux")] use libc::statx; -use super::{Op, OpAble}; #[cfg(any(feature = "legacy", feature = "poll-io"))] -use crate::driver::ready::Direction; +use super::{driver::ready::Direction, MaybeFd}; +use super::{Op, OpAble}; use crate::driver::{shared_fd::SharedFd, util::cstr}; #[derive(Debug)] @@ -83,12 +83,10 @@ impl OpAble for FdStatx { } #[cfg(all(any(feature = "legacy", feature = "poll-io"), target_os = "linux"))] - fn legacy_call(&mut self) -> std::io::Result { + fn legacy_call(&mut self) -> std::io::Result { use std::os::fd::AsRawFd; - use crate::syscall_u32; - - syscall_u32!(statx( + crate::syscall_u32!(statx@NON_FD( self.inner.as_raw_fd(), c"".as_ptr(), libc::AT_EMPTY_PATH, @@ -98,17 +96,15 @@ impl OpAble for FdStatx { } #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] - fn legacy_call(&mut self) -> std::io::Result { + fn legacy_call(&mut self) -> std::io::Result { unimplemented!() } #[cfg(all(any(feature = "legacy", feature = "poll-io"), target_os = "macos"))] - fn legacy_call(&mut self) -> std::io::Result { + fn legacy_call(&mut self) -> std::io::Result { use std::os::fd::AsRawFd; - use crate::syscall_u32; - - syscall_u32!(fstat( + crate::syscall_u32!(fstat@NON_FD( self.inner.as_raw_fd(), self.stat_buf.as_mut_ptr() as *mut _ )) @@ -176,10 +172,8 @@ impl OpAble for PathStatx { } #[cfg(all(any(feature = "legacy", feature = "poll-io"), target_os = "linux"))] - fn legacy_call(&mut self) -> std::io::Result { - use crate::syscall_u32; - - syscall_u32!(statx( + fn legacy_call(&mut self) -> std::io::Result { + crate::syscall_u32!(statx@NON_FD( libc::AT_FDCWD, self.inner.as_ptr(), self.flags, @@ -189,21 +183,19 @@ impl OpAble for PathStatx { } #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] - fn legacy_call(&mut self) -> std::io::Result { + fn legacy_call(&mut self) -> std::io::Result { unimplemented!() } #[cfg(all(any(feature = "legacy", feature = "poll-io"), target_os = "macos"))] - fn legacy_call(&mut self) -> std::io::Result { - use crate::syscall_u32; - + fn legacy_call(&mut self) -> std::io::Result { if self.follow_symlinks { - syscall_u32!(stat( + crate::syscall_u32!(stat@NON_FD( self.inner.as_ptr(), self.stat_buf.as_mut_ptr() as *mut _ )) } else { - syscall_u32!(lstat( + crate::syscall_u32!(lstat@NON_FD( self.inner.as_ptr(), self.stat_buf.as_mut_ptr() as *mut _ )) diff --git a/monoio/src/driver/op/symlink.rs b/monoio/src/driver/op/symlink.rs index 33785d9c..fb4cf7ef 100644 --- a/monoio/src/driver/op/symlink.rs +++ b/monoio/src/driver/op/symlink.rs @@ -1,5 +1,7 @@ use std::{ffi::CString, io, path::Path}; +#[cfg(any(feature = "legacy", feature = "poll-io"))] +use super::{driver::ready::Direction, MaybeFd}; use super::{Op, OpAble}; use crate::driver::util::cstr; @@ -28,14 +30,13 @@ impl OpAble for Symlink { } #[cfg(any(feature = "legacy", feature = "poll-io"))] - fn legacy_interest(&self) -> Option<(crate::driver::ready::Direction, usize)> { + fn legacy_interest(&self) -> Option<(Direction, usize)> { None } #[cfg(any(feature = "legacy", feature = "poll-io"))] - fn legacy_call(&mut self) -> std::io::Result { - use crate::syscall_u32; - syscall_u32!(symlink( + fn legacy_call(&mut self) -> std::io::Result { + crate::syscall_u32!(symlink@NON_FD( self.from.as_c_str().as_ptr(), self.to.as_c_str().as_ptr() )) diff --git a/monoio/src/driver/op/unlink.rs b/monoio/src/driver/op/unlink.rs index 3871d29c..8ecd0f97 100644 --- a/monoio/src/driver/op/unlink.rs +++ b/monoio/src/driver/op/unlink.rs @@ -8,7 +8,10 @@ use libc::{AT_FDCWD, AT_REMOVEDIR}; use super::{Op, OpAble}; use crate::driver::util::cstr; #[cfg(any(feature = "legacy", feature = "poll-io"))] -use crate::{driver::ready::Direction, syscall_u32}; +use crate::{ + driver::{op::MaybeFd, ready::Direction}, + syscall_u32, +}; pub(crate) struct Unlink { path: CString, @@ -47,11 +50,11 @@ impl OpAble for Unlink { } #[cfg(any(feature = "legacy", feature = "poll-io"))] - fn legacy_call(&mut self) -> io::Result { + fn legacy_call(&mut self) -> io::Result { if self.remove_dir { - syscall_u32!(rmdir(self.path.as_c_str().as_ptr())) + syscall_u32!(rmdir@NON_FD(self.path.as_c_str().as_ptr())) } else { - syscall_u32!(unlink(self.path.as_c_str().as_ptr())) + syscall_u32!(unlink@NON_FD(self.path.as_c_str().as_ptr())) } } } diff --git a/monoio/src/driver/op/write.rs b/monoio/src/driver/op/write.rs index 364f7819..c88234f0 100644 --- a/monoio/src/driver/op/write.rs +++ b/monoio/src/driver/op/write.rs @@ -13,7 +13,7 @@ use windows_sys::Win32::{Foundation::TRUE, Storage::FileSystem::WriteFile}; use super::{super::shared_fd::SharedFd, Op, OpAble}; #[cfg(any(feature = "legacy", feature = "poll-io"))] -use crate::driver::ready::Direction; +use super::{driver::ready::Direction, MaybeFd}; use crate::{ buf::{IoBuf, IoVecBuf}, BufResult, @@ -25,7 +25,7 @@ macro_rules! write_result { impl<$T: $Trait> super::Op<$name<$T>> { pub(crate) async fn result(self) -> BufResult { let complete = self.await; - (complete.meta.result.map(|v| v as _), complete.data.$buf) + (complete.meta.result.map(|v| v.into_inner() as _), complete.data.$buf) } } )* @@ -79,7 +79,7 @@ impl OpAble for Write { } #[cfg(any(feature = "legacy", feature = "poll-io"))] - fn legacy_call(&mut self) -> io::Result { + fn legacy_call(&mut self) -> io::Result { #[cfg(windows)] let fd = self.fd.as_raw_handle() as _; #[cfg(unix)] @@ -128,7 +128,7 @@ impl OpAble for WriteAt { } #[cfg(any(feature = "legacy", feature = "poll-io"))] - fn legacy_call(&mut self) -> io::Result { + fn legacy_call(&mut self) -> io::Result { #[cfg(windows)] let fd = self.fd.as_raw_handle() as _; #[cfg(unix)] @@ -181,7 +181,7 @@ impl OpAble for WriteVec { } #[cfg(any(feature = "legacy", feature = "poll-io"))] - fn legacy_call(&mut self) -> io::Result { + fn legacy_call(&mut self) -> io::Result { #[cfg(windows)] let fd = self.fd.as_raw_handle() as _; #[cfg(unix)] @@ -237,7 +237,7 @@ impl OpAble for WriteVecAt { } #[cfg(any(feature = "legacy", feature = "poll-io"))] - fn legacy_call(&mut self) -> io::Result { + fn legacy_call(&mut self) -> io::Result { write_vectored_at( self.fd.raw_fd(), self.buf_vec.read_iovec_ptr(), @@ -255,21 +255,30 @@ pub(crate) mod impls { use crate::syscall_u32; /// A wrapper of [`libc::write`] - pub(crate) fn write(fd: i32, buf: *const u8, len: usize) -> io::Result { - syscall_u32!(write(fd, buf as _, len)) + pub(crate) fn write(fd: i32, buf: *const u8, len: usize) -> io::Result { + syscall_u32!(write@NON_FD(fd, buf as _, len)) } /// A wrapper of [`libc::write`] - pub(crate) fn write_at(fd: i32, buf: *const u8, len: usize, offset: u64) -> io::Result { + pub(crate) fn write_at( + fd: i32, + buf: *const u8, + len: usize, + offset: u64, + ) -> io::Result { let offset = libc::off_t::try_from(offset) .map_err(|_| io::Error::new(io::ErrorKind::Other, "offset too big"))?; - syscall_u32!(pwrite(fd, buf as _, len, offset)) + syscall_u32!(pwrite@NON_FD(fd, buf as _, len, offset)) } /// A wrapper of [`libc::writev`] - pub(crate) fn write_vectored(fd: i32, buf_vec: *const iovec, len: usize) -> io::Result { - syscall_u32!(writev(fd, buf_vec as _, len as _)) + pub(crate) fn write_vectored( + fd: i32, + buf_vec: *const iovec, + len: usize, + ) -> io::Result { + syscall_u32!(writev@NON_FD(fd, buf_vec as _, len as _)) } /// A wrapper of [`libc::pwritev`] @@ -278,11 +287,11 @@ pub(crate) mod impls { buf_vec: *const iovec, len: usize, offset: u64, - ) -> io::Result { + ) -> io::Result { let offset = libc::off_t::try_from(offset) .map_err(|_| io::Error::new(io::ErrorKind::Other, "offset too big"))?; - syscall_u32!(pwritev(fd, buf_vec as _, len as _, offset)) + syscall_u32!(pwritev@NON_FD(fd, buf_vec as _, len as _, offset)) } } diff --git a/monoio/src/driver/poll.rs b/monoio/src/driver/poll.rs index e0ae0b41..b918f95b 100644 --- a/monoio/src/driver/poll.rs +++ b/monoio/src/driver/poll.rs @@ -1,6 +1,6 @@ use std::{io, task::Context, time::Duration}; -use super::{ready::Direction, scheduled_io::ScheduledIo}; +use super::{op::MaybeFd, ready::Direction, scheduled_io::ScheduledIo}; use crate::{driver::op::CompletionMeta, utils::slab::Slab}; /// Poller with io dispatch. @@ -77,7 +77,7 @@ impl Poll { cx: &mut Context<'_>, token: usize, direction: Direction, - syscall: impl FnOnce() -> io::Result, + syscall: impl FnOnce() -> io::Result, ) -> std::task::Poll { let mut scheduled_io = self.io_dispatch.get(token).expect("scheduled_io lost"); let ref_mut = scheduled_io.as_mut(); diff --git a/monoio/src/driver/shared_fd.rs b/monoio/src/driver/shared_fd.rs index b9f370fb..0a57b799 100644 --- a/monoio/src/driver/shared_fd.rs +++ b/monoio/src/driver/shared_fd.rs @@ -45,7 +45,7 @@ impl State { // TODO: only Init state can convert? if matches!(state, UringState::Init) { let mut source = mio::unix::SourceFd(&fd); - crate::syscall!(fcntl(fd, libc::F_SETFL, libc::O_NONBLOCK))?; + crate::syscall!(fcntl@RAW(fd, libc::F_SETFL, libc::O_NONBLOCK))?; let reg = CURRENT .with(|inner| match inner { #[cfg(all(target_os = "linux", feature = "iouring"))] @@ -58,7 +58,7 @@ impl State { crate::driver::Inner::Legacy(_) => panic!("unexpected legacy runtime"), }) .inspect_err(|_| { - let _ = crate::syscall!(fcntl(fd, libc::F_SETFL, 0)); + let _ = crate::syscall!(fcntl@RAW(fd, libc::F_SETFL, 0)); })?; *state = UringState::Legacy(Some(reg)); } else { @@ -86,7 +86,7 @@ impl State { return Err(io::Error::new(io::ErrorKind::Other, "empty token")); }; let mut source = mio::unix::SourceFd(&fd); - crate::syscall!(fcntl(fd, libc::F_SETFL, 0))?; + crate::syscall!(fcntl@RAW(fd, libc::F_SETFL, 0))?; CURRENT .with(|inner| match inner { #[cfg(all(target_os = "linux", feature = "iouring"))] @@ -97,7 +97,7 @@ impl State { crate::driver::Inner::Legacy(_) => panic!("unexpected legacy runtime"), }) .inspect_err(|_| { - let _ = crate::syscall!(fcntl(fd, libc::F_SETFL, libc::O_NONBLOCK)); + let _ = crate::syscall!(fcntl@RAW(fd, libc::F_SETFL, libc::O_NONBLOCK)); })?; *self = State::Uring(UringState::Init); Ok(()) diff --git a/monoio/src/driver/uring/lifecycle.rs b/monoio/src/driver/uring/lifecycle.rs index 3efa69a3..dddaf21e 100644 --- a/monoio/src/driver/uring/lifecycle.rs +++ b/monoio/src/driver/uring/lifecycle.rs @@ -6,9 +6,12 @@ use std::{ task::{Context, Poll, Waker}, }; -use crate::{driver::op::CompletionMeta, utils::slab::Ref}; +use crate::{ + driver::op::{CompletionMeta, MaybeFd}, + utils::slab::Ref, +}; -pub(crate) enum Lifecycle { +enum Lifecycle { /// The operation has been submitted to uring and is currently in-flight Submitted, @@ -21,12 +24,30 @@ pub(crate) enum Lifecycle { Ignored(Box), /// The operation has completed. - Completed(io::Result, u32), + Completed(io::Result, u32), +} + +pub(crate) struct MaybeFdLifecycle { + is_fd: bool, + lifecycle: Lifecycle, +} + +impl MaybeFdLifecycle { + #[inline] + pub(crate) const fn new(is_fd: bool) -> Self { + Self { + is_fd, + lifecycle: Lifecycle::Submitted, + } + } } -impl Ref<'_, Lifecycle> { - pub(crate) fn complete(mut self, result: io::Result, flags: u32) { - let ref_mut = &mut *self; +impl Ref<'_, MaybeFdLifecycle> { + // # Safety + // Caller must make sure the result is valid since it may contain fd or a length hint. + pub(crate) unsafe fn complete(mut self, result: io::Result, flags: u32) { + let result = MaybeFd::new_result(result, self.is_fd); + let ref_mut = &mut self.lifecycle; match ref_mut { Lifecycle::Submitted => { *ref_mut = Lifecycle::Completed(result, flags); @@ -37,19 +58,19 @@ impl Ref<'_, Lifecycle> { Lifecycle::Waiting(waker) => { waker.wake(); } - _ => unsafe { std::hint::unreachable_unchecked() }, + _ => std::hint::unreachable_unchecked(), } } Lifecycle::Ignored(..) => { self.remove(); } - Lifecycle::Completed(..) => unsafe { std::hint::unreachable_unchecked() }, + Lifecycle::Completed(..) => std::hint::unreachable_unchecked(), } } #[allow(clippy::needless_pass_by_ref_mut)] pub(crate) fn poll_op(mut self, cx: &mut Context<'_>) -> Poll { - let ref_mut = &mut *self; + let ref_mut = &mut self.lifecycle; match ref_mut { Lifecycle::Submitted => { *ref_mut = Lifecycle::Waiting(cx.waker().clone()); @@ -64,7 +85,7 @@ impl Ref<'_, Lifecycle> { _ => {} } - match self.remove() { + match self.remove().lifecycle { Lifecycle::Completed(result, flags) => Poll::Ready(CompletionMeta { result, flags }), _ => unsafe { std::hint::unreachable_unchecked() }, } @@ -72,7 +93,7 @@ impl Ref<'_, Lifecycle> { // return if the op must has been finished pub(crate) fn drop_op(mut self, data: &mut Option) -> bool { - let ref_mut = &mut *self; + let ref_mut = &mut self.lifecycle; match ref_mut { Lifecycle::Submitted | Lifecycle::Waiting(_) => { if let Some(data) = data.take() { diff --git a/monoio/src/driver/uring/mod.rs b/monoio/src/driver/uring/mod.rs index e170d12a..f0407aaa 100644 --- a/monoio/src/driver/uring/mod.rs +++ b/monoio/src/driver/uring/mod.rs @@ -11,7 +11,7 @@ use std::{ }; use io_uring::{cqueue, opcode, types::Timespec, IoUring}; -use lifecycle::Lifecycle; +use lifecycle::MaybeFdLifecycle; use super::{ op::{CompletionMeta, Op, OpAble}, @@ -87,7 +87,7 @@ pub(crate) struct UringInner { // When dropping the driver, all in-flight operations must have completed. This // type wraps the slab and ensures that, on drop, the slab is empty. struct Ops { - slab: Slab, + slab: Slab, } impl IoUringDriver { @@ -129,7 +129,7 @@ impl IoUringDriver { // Create eventfd and register it to the ring. let waker = { - let fd = crate::syscall!(eventfd(0, libc::EFD_CLOEXEC))?; + let fd = crate::syscall!(eventfd@RAW(0, libc::EFD_CLOEXEC))?; unsafe { use std::os::unix::io::FromRawFd; std::fs::File::from_raw_fd(fd) @@ -391,7 +391,9 @@ impl UringInner { self.poll.tick(Some(Duration::ZERO))?; } _ if index >= MIN_REVERSED_USERDATA => (), - _ => self.ops.complete(index as _, resultify(&cqe), cqe.flags()), + // # Safety + // Here we can make sure the result is valid. + _ => unsafe { self.ops.complete(index as _, resultify(&cqe), cqe.flags()) }, } } Ok(()) @@ -421,10 +423,10 @@ impl UringInner { } } - fn new_op(data: T, inner: &mut UringInner, driver: Inner) -> Op { + fn new_op(data: T, inner: &mut UringInner, driver: Inner) -> Op { Op { driver, - index: inner.ops.insert(), + index: inner.ops.insert(T::RET_IS_FD), data: Some(data), } } @@ -509,6 +511,7 @@ impl UringInner { this: &Rc>, index: usize, data: &mut Option, + _skip_cancel: bool, ) { let inner = unsafe { &mut *this.get() }; if index == usize::MAX { @@ -518,7 +521,7 @@ impl UringInner { if let Some(lifecycle) = inner.ops.slab.get(index) { let _must_finished = lifecycle.drop_op(data); #[cfg(feature = "async-cancel")] - if !_must_finished { + if !_must_finished && !_skip_cancel { unsafe { let cancel = opcode::AsyncCancel::new(index as u64) .build() @@ -597,11 +600,16 @@ impl Ops { } // Insert a new operation - pub(crate) fn insert(&mut self) -> usize { - self.slab.insert(Lifecycle::Submitted) + #[inline] + pub(crate) fn insert(&mut self, is_fd: bool) -> usize { + self.slab.insert(MaybeFdLifecycle::new(is_fd)) } - fn complete(&mut self, index: usize, result: io::Result, flags: u32) { + // Complete an operation + // # Safety + // Caller must make sure the result is valid. + #[inline] + unsafe fn complete(&mut self, index: usize, result: io::Result, flags: u32) { let lifecycle = unsafe { self.slab.get(index).unwrap_unchecked() }; lifecycle.complete(result, flags); } diff --git a/monoio/src/driver/util.rs b/monoio/src/driver/util.rs index 20f16fc5..dec31a07 100644 --- a/monoio/src/driver/util.rs +++ b/monoio/src/driver/util.rs @@ -31,7 +31,23 @@ pub(super) fn timespec(duration: std::time::Duration) -> io_uring::types::Timesp #[cfg(unix)] #[macro_export] macro_rules! syscall { - ($fn: ident ( $($arg: expr),* $(,)* ) ) => {{ + ($fn: ident @FD ( $($arg: expr),* $(,)* ) ) => {{ + let res = unsafe { libc::$fn($($arg, )*) }; + if res == -1 { + Err(std::io::Error::last_os_error()) + } else { + Ok(unsafe { $crate::driver::op::MaybeFd::new_fd(res) }) + } + }}; + ($fn: ident @NON_FD ( $($arg: expr),* $(,)* ) ) => {{ + let res = unsafe { libc::$fn($($arg, )*) }; + if res == -1 { + Err(std::io::Error::last_os_error()) + } else { + Ok($crate::driver::op::MaybeFd::new_non_fd(res)) + } + }}; + ($fn: ident @RAW ( $($arg: expr),* $(,)* ) ) => {{ let res = unsafe { libc::$fn($($arg, )*) }; if res == -1 { Err(std::io::Error::last_os_error()) @@ -45,7 +61,23 @@ macro_rules! syscall { #[cfg(windows)] #[macro_export] macro_rules! syscall { - ($fn: ident ( $($arg: expr),* $(,)* ), $err_test: path, $err_value: expr) => {{ + ($fn: ident @FD ( $($arg: expr),* $(,)* ), $err_test: path, $err_value: expr) => {{ + let res = unsafe { $fn($($arg, )*) }; + if $err_test(&res, &$err_value) { + Err(std::io::Error::last_os_error()) + } else { + Ok(unsafe { $crate::driver::op::MaybeFd::new_fd(res.try_into().unwrap()) }) + } + }}; + ($fn: ident @NON_FD ( $($arg: expr),* $(,)* ), $err_test: path, $err_value: expr) => {{ + let res = unsafe { $fn($($arg, )*) }; + if $err_test(&res, &$err_value) { + Err(std::io::Error::last_os_error()) + } else { + Ok($crate::driver::op::MaybeFd::new_non_fd(res.try_into().unwrap())) + } + }}; + ($fn: ident @RAW ( $($arg: expr),* $(,)* ), $err_test: path, $err_value: expr) => {{ let res = unsafe { $fn($($arg, )*) }; if $err_test(&res, &$err_value) { Err(std::io::Error::last_os_error()) @@ -58,17 +90,39 @@ macro_rules! syscall { /// Do syscall and return Result #[macro_export] macro_rules! syscall_u32 { - ($fn: ident ( $($arg: expr),* $(,)* ) ) => {{ + ($fn: ident @RAW ( $($arg: expr),* $(,)* ) ) => {{ #[cfg(windows)] let res = unsafe { $fn($($arg, )*) }; #[cfg(unix)] - let res = unsafe { libc::$fn($($arg, )*) }; + let res = unsafe { ::libc::$fn($($arg, )*) }; if res < 0 { - Err(std::io::Error::last_os_error()) + Err(::std::io::Error::last_os_error()) } else { Ok(res as u32) } }}; + ($fn: ident @FD ( $($arg: expr),* $(,)* ) ) => {{ + #[cfg(windows)] + let res = unsafe { $fn($($arg, )*) }; + #[cfg(unix)] + let res = unsafe { ::libc::$fn($($arg, )*) }; + if res < 0 { + Err(::std::io::Error::last_os_error()) + } else { + Ok(unsafe { $crate::driver::op::MaybeFd::new_fd(res as u32) }) + } + }}; + ($fn: ident @NON_FD ( $($arg: expr),* $(,)* ) ) => {{ + #[cfg(windows)] + let res = unsafe { $fn($($arg, )*) }; + #[cfg(unix)] + let res = unsafe { ::libc::$fn($($arg, )*) }; + if res < 0 { + Err(::std::io::Error::last_os_error()) + } else { + Ok($crate::driver::op::MaybeFd::new_non_fd(res as u32)) + } + }}; } #[cfg(all( diff --git a/monoio/src/fs/open_options.rs b/monoio/src/fs/open_options.rs index 73c3d737..0efb7742 100644 --- a/monoio/src/fs/open_options.rs +++ b/monoio/src/fs/open_options.rs @@ -352,7 +352,7 @@ impl OpenOptions { // The file is open Ok(File::from_shared_fd(SharedFd::new_without_register( - completion.meta.result? as _, + completion.meta.result?.into_inner() as _, ))) } diff --git a/monoio/src/net/mod.rs b/monoio/src/net/mod.rs index 7aa47a1d..4a92aeec 100644 --- a/monoio/src/net/mod.rs +++ b/monoio/src/net/mod.rs @@ -53,12 +53,12 @@ pub(crate) fn new_socket( // Gives a warning for platforms without SOCK_NONBLOCK. #[allow(clippy::let_and_return)] #[cfg(unix)] - let socket = crate::syscall!(socket(domain, socket_type, 0)); + let socket = crate::syscall!(socket@RAW(domain, socket_type, 0)); // Mimic `libstd` and set `SO_NOSIGPIPE` on apple systems. #[cfg(target_vendor = "apple")] let socket = socket.and_then(|socket| { - crate::syscall!(setsockopt( + crate::syscall!(setsockopt@RAW( socket, libc::SOL_SOCKET, libc::SO_NOSIGPIPE, @@ -73,14 +73,14 @@ pub(crate) fn new_socket( let socket = socket.and_then(|socket| { // For platforms that don't support flags in socket, we need to // set the flags ourselves. - crate::syscall!(fcntl(socket, libc::F_SETFL, libc::O_NONBLOCK)) + crate::syscall!(fcntl@RAW(socket, libc::F_SETFL, libc::O_NONBLOCK)) .and_then(|_| { - crate::syscall!(fcntl(socket, libc::F_SETFD, libc::FD_CLOEXEC)).map(|_| socket) + crate::syscall!(fcntl@RAW(socket, libc::F_SETFD, libc::FD_CLOEXEC)).map(|_| socket) }) .inspect_err(|_| { // If either of the `fcntl` calls failed, ensure the socket is // closed and return the error. - let _ = crate::syscall!(close(socket)); + let _ = crate::syscall!(close@RAW(socket)); }) }); @@ -100,17 +100,17 @@ pub(crate) fn new_socket( socket_type: WINSOCK_SOCKET_TYPE, ) -> std::io::Result { let _: i32 = crate::syscall!( - WSAStartup(MAKEWORD(2, 2), std::ptr::null_mut()), + WSAStartup@RAW(MAKEWORD(2, 2), std::ptr::null_mut()), PartialEq::eq, NO_ERROR as _ )?; let socket = crate::syscall!( - socket(domain as _, socket_type, 0), + socket@RAW(domain as _, socket_type, 0), PartialEq::eq, INVALID_SOCKET )?; crate::syscall!( - ioctlsocket(socket, FIONBIO, &mut 1), + ioctlsocket@RAW(socket, FIONBIO, &mut 1), PartialEq::ne, NO_ERROR as _ ) diff --git a/monoio/src/net/tcp/listener.rs b/monoio/src/net/tcp/listener.rs index 09ae6844..7848a320 100644 --- a/monoio/src/net/tcp/listener.rs +++ b/monoio/src/net/tcp/listener.rs @@ -117,7 +117,7 @@ impl TcpListener { let fd = completion.meta.result?; // Construct stream - let stream = TcpStream::from_shared_fd(SharedFd::new::(fd as _)?); + let stream = TcpStream::from_shared_fd(SharedFd::new::(fd.into_inner() as _)?); // Construct SocketAddr let storage = completion.data.addr.0.as_ptr(); @@ -174,7 +174,7 @@ impl TcpListener { let fd = completion.meta.result?; // Construct stream - let stream = TcpStream::from_shared_fd(SharedFd::new::(fd as _)?); + let stream = TcpStream::from_shared_fd(SharedFd::new::(fd.into_inner() as _)?); // Construct SocketAddr let storage = completion.data.addr.0.as_ptr(); diff --git a/monoio/src/net/tcp/stream.rs b/monoio/src/net/tcp/stream.rs index 19978767..bf09de92 100644 --- a/monoio/src/net/tcp/stream.rs +++ b/monoio/src/net/tcp/stream.rs @@ -473,6 +473,7 @@ impl tokio::io::AsyncRead for TcpStream { let ret = ready!(crate::driver::op::PollLegacy::poll_legacy(&mut recv, cx)); std::task::Poll::Ready(ret.result.map(|n| { + let n = n.into_inner(); buf.assume_init(n as usize); buf.advance(n as usize); })) @@ -492,7 +493,7 @@ impl tokio::io::AsyncWrite for TcpStream { let mut send = Op::send_raw(&self.fd, raw_buf); let ret = ready!(crate::driver::op::PollLegacy::poll_legacy(&mut send, cx)); - std::task::Poll::Ready(ret.result.map(|n| n as usize)) + std::task::Poll::Ready(ret.result.map(|n| n.into_inner() as usize)) } } @@ -526,7 +527,7 @@ impl tokio::io::AsyncWrite for TcpStream { let mut writev = Op::writev_raw(&self.fd, raw_buf); let ret = ready!(crate::driver::op::PollLegacy::poll_legacy(&mut writev, cx)); - std::task::Poll::Ready(ret.result.map(|n| n as usize)) + std::task::Poll::Ready(ret.result.map(|n| n.into_inner() as usize)) } } diff --git a/monoio/src/net/tcp/stream_poll.rs b/monoio/src/net/tcp/stream_poll.rs index 0d1b7bc5..591ee31b 100644 --- a/monoio/src/net/tcp/stream_poll.rs +++ b/monoio/src/net/tcp/stream_poll.rs @@ -76,6 +76,7 @@ impl tokio::io::AsyncRead for TcpStreamPoll { let ret = ready!(crate::driver::op::PollLegacy::poll_io(&mut recv, cx)); std::task::Poll::Ready(ret.result.map(|n| { + let n = n.into_inner(); buf.assume_init(n as usize); buf.advance(n as usize); })) @@ -95,7 +96,7 @@ impl tokio::io::AsyncWrite for TcpStreamPoll { let mut send = Op::send_raw(&self.0.fd, raw_buf); let ret = ready!(crate::driver::op::PollLegacy::poll_io(&mut send, cx)); - std::task::Poll::Ready(ret.result.map(|n| n as usize)) + std::task::Poll::Ready(ret.result.map(|n| n.into_inner() as usize)) } } @@ -134,7 +135,7 @@ impl tokio::io::AsyncWrite for TcpStreamPoll { let mut writev = Op::writev_raw(&self.0.fd, raw_buf); let ret = ready!(crate::driver::op::PollLegacy::poll_io(&mut writev, cx)); - std::task::Poll::Ready(ret.result.map(|n| n as usize)) + std::task::Poll::Ready(ret.result.map(|n| n.into_inner() as usize)) } } diff --git a/monoio/src/net/tcp/tfo/linux.rs b/monoio/src/net/tcp/tfo/linux.rs index bea85ed6..b6ee81fa 100644 --- a/monoio/src/net/tcp/tfo/linux.rs +++ b/monoio/src/net/tcp/tfo/linux.rs @@ -11,7 +11,7 @@ thread_local! { /// Call before listen. pub(crate) fn set_tcp_fastopen(fd: &S, fast_open: i32) -> io::Result<()> { - crate::syscall!(setsockopt( + crate::syscall!(setsockopt@RAW( fd.as_raw_fd(), libc::SOL_TCP, libc::TCP_FASTOPEN, @@ -26,7 +26,7 @@ pub(crate) fn set_tcp_fastopen(fd: &S, fast_open: i32) -> io::Result pub(crate) fn set_tcp_fastopen_connect(fd: &S) -> io::Result<()> { const ENABLED: libc::c_int = 0x1; - crate::syscall!(setsockopt( + crate::syscall!(setsockopt@RAW( fd.as_raw_fd(), libc::SOL_TCP, libc::TCP_FASTOPEN_CONNECT, diff --git a/monoio/src/net/tcp/tfo/macos.rs b/monoio/src/net/tcp/tfo/macos.rs index 3465fe54..1e8d6996 100644 --- a/monoio/src/net/tcp/tfo/macos.rs +++ b/monoio/src/net/tcp/tfo/macos.rs @@ -3,7 +3,7 @@ use std::{io, os::fd::AsRawFd}; /// Call before listen. pub(crate) fn set_tcp_fastopen(fd: &S) -> io::Result<()> { const ENABLED: libc::c_int = 0x1; - crate::syscall!(setsockopt( + crate::syscall!(setsockopt@RAW( fd.as_raw_fd(), libc::IPPROTO_TCP, libc::TCP_FASTOPEN, @@ -19,7 +19,7 @@ pub(crate) fn set_tcp_fastopen_force_enable(fd: &S) -> io::Result<() const TCP_FASTOPEN_FORCE_ENABLE: libc::c_int = 0x218; const ENABLED: libc::c_int = 0x1; - crate::syscall!(setsockopt( + crate::syscall!(setsockopt@RAW( fd.as_raw_fd(), libc::IPPROTO_TCP, TCP_FASTOPEN_FORCE_ENABLE, diff --git a/monoio/src/net/unix/listener.rs b/monoio/src/net/unix/listener.rs index fba686bb..f930f786 100644 --- a/monoio/src/net/unix/listener.rs +++ b/monoio/src/net/unix/listener.rs @@ -75,7 +75,7 @@ impl UnixListener { let fd = completion.meta.result?; // Construct stream - let stream = UnixStream::from_shared_fd(SharedFd::new::(fd as _)?); + let stream = UnixStream::from_shared_fd(SharedFd::new::(fd.into_inner() as _)?); // Construct SocketAddr let mut storage = unsafe { std::mem::MaybeUninit::assume_init(completion.data.addr.0) }; @@ -105,7 +105,7 @@ impl UnixListener { let fd = completion.meta.result?; // Construct stream - let stream = UnixStream::from_shared_fd(SharedFd::new::(fd as _)?); + let stream = UnixStream::from_shared_fd(SharedFd::new::(fd.into_inner() as _)?); // Construct SocketAddr let mut storage = unsafe { std::mem::MaybeUninit::assume_init(completion.data.addr.0) }; diff --git a/monoio/src/net/unix/pipe.rs b/monoio/src/net/unix/pipe.rs index 760a0332..dd240704 100644 --- a/monoio/src/net/unix/pipe.rs +++ b/monoio/src/net/unix/pipe.rs @@ -30,8 +30,8 @@ pub fn new_pipe() -> io::Result<(Pipe, Pipe)> { } }; #[cfg(target_os = "linux")] - crate::syscall!(pipe2(pipes.as_mut_ptr() as _, flag))?; + crate::syscall!(pipe2@RAW(pipes.as_mut_ptr() as _, flag))?; #[cfg(not(target_os = "linux"))] - crate::syscall!(pipe(pipes.as_mut_ptr() as _))?; + crate::syscall!(pipe@RAW(pipes.as_mut_ptr() as _))?; Ok((Pipe::from_raw_fd(pipes[0]), Pipe::from_raw_fd(pipes[1]))) } diff --git a/monoio/src/net/unix/seq_packet/listener.rs b/monoio/src/net/unix/seq_packet/listener.rs index 0fa3ca5c..f3fc3a6f 100644 --- a/monoio/src/net/unix/seq_packet/listener.rs +++ b/monoio/src/net/unix/seq_packet/listener.rs @@ -26,8 +26,8 @@ impl UnixSeqpacketListener { pub fn bind_with_backlog>(path: P, backlog: libc::c_int) -> io::Result { let (addr, addr_len) = socket_addr(path.as_ref())?; let socket = new_socket(libc::AF_UNIX, libc::SOCK_SEQPACKET)?; - crate::syscall!(bind(socket, &addr as *const _ as *const _, addr_len))?; - crate::syscall!(listen(socket, backlog))?; + crate::syscall!(bind@RAW(socket, &addr as *const _ as *const _, addr_len))?; + crate::syscall!(listen@RAW(socket, backlog))?; Ok(Self { fd: SharedFd::new::(socket)?, }) @@ -50,7 +50,7 @@ impl UnixSeqpacketListener { let fd = completion.meta.result?; // Construct stream - let stream = UnixSeqpacket::from_shared_fd(SharedFd::new::(fd as _)?); + let stream = UnixSeqpacket::from_shared_fd(SharedFd::new::(fd.into_inner() as _)?); // Construct SocketAddr let mut storage = unsafe { std::mem::MaybeUninit::assume_init(completion.data.addr.0) }; diff --git a/monoio/src/net/unix/socket_addr.rs b/monoio/src/net/unix/socket_addr.rs index adf8ff80..983f3d93 100644 --- a/monoio/src/net/unix/socket_addr.rs +++ b/monoio/src/net/unix/socket_addr.rs @@ -233,15 +233,15 @@ where }; let mut fds = [-1; 2]; - crate::syscall!(socketpair(libc::AF_UNIX, flags, 0, fds.as_mut_ptr()))?; + crate::syscall!(socketpair@RAW(libc::AF_UNIX, flags, 0, fds.as_mut_ptr()))?; let pair = unsafe { (T::from_raw_fd(fds[0]), T::from_raw_fd(fds[1])) }; Ok(pair) } pub(crate) fn local_addr(socket: RawFd) -> io::Result { - SocketAddr::new(|sockaddr, socklen| crate::syscall!(getsockname(socket, sockaddr, socklen))) + SocketAddr::new(|sockaddr, socklen| crate::syscall!(getsockname@RAW(socket, sockaddr, socklen))) } pub(crate) fn peer_addr(socket: RawFd) -> io::Result { - SocketAddr::new(|sockaddr, socklen| crate::syscall!(getpeername(socket, sockaddr, socklen))) + SocketAddr::new(|sockaddr, socklen| crate::syscall!(getpeername@RAW(socket, sockaddr, socklen))) } diff --git a/monoio/src/net/unix/stream.rs b/monoio/src/net/unix/stream.rs index 7705b747..e8ee2c06 100644 --- a/monoio/src/net/unix/stream.rs +++ b/monoio/src/net/unix/stream.rs @@ -323,6 +323,7 @@ impl tokio::io::AsyncRead for UnixStream { let ret = ready!(crate::driver::op::PollLegacy::poll_legacy(&mut recv, cx)); std::task::Poll::Ready(ret.result.map(|n| { + let n = n.into_inner(); buf.assume_init(n as usize); buf.advance(n as usize); })) @@ -342,7 +343,7 @@ impl tokio::io::AsyncWrite for UnixStream { let mut send = Op::send_raw(&self.fd, raw_buf); let ret = ready!(crate::driver::op::PollLegacy::poll_legacy(&mut send, cx)); - std::task::Poll::Ready(ret.result.map(|n| n as usize)) + std::task::Poll::Ready(ret.result.map(|n| n.into_inner() as usize)) } } diff --git a/monoio/src/net/unix/stream_poll.rs b/monoio/src/net/unix/stream_poll.rs index f3516dd5..893771f1 100644 --- a/monoio/src/net/unix/stream_poll.rs +++ b/monoio/src/net/unix/stream_poll.rs @@ -65,6 +65,7 @@ impl tokio::io::AsyncRead for UnixStreamPoll { let ret = ready!(crate::driver::op::PollLegacy::poll_io(&mut recv, cx)); std::task::Poll::Ready(ret.result.map(|n| { + let n = n.into_inner(); buf.assume_init(n as usize); buf.advance(n as usize); })) @@ -84,7 +85,7 @@ impl tokio::io::AsyncWrite for UnixStreamPoll { let mut send = Op::send_raw(&self.0.fd, raw_buf); let ret = ready!(crate::driver::op::PollLegacy::poll_io(&mut send, cx)); - std::task::Poll::Ready(ret.result.map(|n| n as usize)) + std::task::Poll::Ready(ret.result.map(|n| n.into_inner() as usize)) } } @@ -121,7 +122,7 @@ impl tokio::io::AsyncWrite for UnixStreamPoll { let mut writev = Op::writev_raw(&self.0.fd, raw_buf); let ret = ready!(crate::driver::op::PollLegacy::poll_io(&mut writev, cx)); - std::task::Poll::Ready(ret.result.map(|n| n as usize)) + std::task::Poll::Ready(ret.result.map(|n| n.into_inner() as usize)) } } diff --git a/monoio/tests/fd_leak.rs b/monoio/tests/fd_leak.rs new file mode 100644 index 00000000..774f62fd --- /dev/null +++ b/monoio/tests/fd_leak.rs @@ -0,0 +1,105 @@ +use std::{ + io::{Read, Write}, + sync::mpsc::channel, + time::Duration, +}; + +use monoio::io::AsyncReadRentExt; + +// This test is used to prove the runtime can close the cancelled(but failed to cancel) op's fd +// result. +// 1. accept(push accept op) and poll the future to Pending +// 2. spawn another thread to connect the listener(will start connecting after task drop) +// 3. cancel(drop) the accept task +// 4. spin for a while to delay the iouring enter which submit the cancel op +// 5. the other thread should be able to get a connection +// 6. if the other thread can read eof, then it can prove the runtime close the fd correctly +// 7. if the read blocked, then the runtime failed to close the fd +#[monoio::test_all(timer_enabled = true)] +async fn test_fd_leak_cancel_fail() { + // step 1 and 2 + let listener = monoio::net::TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = listener.local_addr().unwrap(); + let mut incoming = listener.accept(); + let fut = unsafe { std::pin::Pin::new_unchecked(&mut incoming) }; + assert!(monoio::select! { + result = fut => Ok(result), + _ = monoio::time::sleep(Duration::from_millis(200)) => Err(()), + } + .is_err()); + + // step 2 + let (tx1, rx1) = channel::<()>(); + let (tx2, rx2) = channel::<()>(); + std::thread::spawn(move || { + rx1.recv().unwrap(); + // step 5 + let mut conn = std::net::TcpStream::connect(addr).unwrap(); + tx2.send(()).unwrap(); + let mut buf = [0u8; 1]; + conn.write_all(&buf).unwrap(); + // step 6 + let ret = conn.read(&mut buf[..]); + assert!( + matches!(ret, Ok(0)) + || matches!(ret, Err(e) if e.kind() == std::io::ErrorKind::ConnectionReset) + ); + tx2.send(()).unwrap(); + }); + + // step 3: cancel the accept op but not submit the cancel op + drop(incoming); + tx1.send(()).unwrap(); + // step 4: block the thread with sync channel + rx2.recv().unwrap(); + // step 7: wait for 1 second to make sure the runtime can close the fd + monoio::time::sleep(Duration::from_secs(1)).await; + + if rx2.try_recv().is_ok() { + // With iouring, the fd is accepted and closed by the runtime. + // So here it will return. + return; + } + // With legacy driver, the accept syscall is not executed. + // So we can accept now and check if it is the connection established by the other thread. + // We can read 1 byte to check if it is zero. Then we close the fd and wait for the other + // thread. + // So we can prove the connection at server side is either not accepted or closed by the + // runtime. + let (mut conn, _) = listener.accept().await.unwrap(); + let buf = vec![1; 1]; + let (r, buf) = conn.read_exact(buf).await; + assert_eq!(r.unwrap(), 1); + assert_eq!(buf[0], 0); + drop(conn); + rx2.recv().unwrap(); +} + +// This test is used to prove the runtime try best to cancel pending op when op is dropped. +#[cfg(feature = "async-cancel")] +#[monoio::test_all(timer_enabled = true)] +async fn test_fd_leak_try_cancel() { + let listener = monoio::net::TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = listener.local_addr().unwrap(); + let incoming = listener.accept(); + assert!(monoio::select! { + result = incoming => Ok(result), + _ = monoio::time::sleep(Duration::from_millis(200)) => Err(()), + } + .is_err()); + // The future is dropped now and the cancel op is pushed. + monoio::time::sleep(Duration::from_millis(200)).await; + let (tx, rx) = channel::<()>(); + std::thread::spawn(move || { + let mut conn = std::net::TcpStream::connect(addr).unwrap(); + let buf = [0u8; 1]; + conn.write_all(&buf).unwrap(); + tx.send(()).unwrap(); + }); + rx.recv().unwrap(); + let mut conn = listener.accept().await.unwrap().0; + let buf = vec![1; 1]; + let (r, buf) = conn.read_exact(buf).await; + assert_eq!(r.unwrap(), 1); + assert_eq!(buf[0], 0); +}