Skip to content

Commit

Permalink
fix: properly close fd when return-fd op cancelled failed
Browse files Browse the repository at this point in the history
  • Loading branch information
ihciah committed Oct 30, 2024
1 parent c9708e8 commit 3c899be
Show file tree
Hide file tree
Showing 38 changed files with 538 additions and 265 deletions.
1 change: 1 addition & 0 deletions monoio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ tempfile = "3.2"
# use nightly only feature flags
unstable = []
# async-cancel will push a async-cancel entry into sq when op is canceled
# strongly recommend to enable this feature
async-cancel = []
# enanle zero copy(enable SOCK_ZEROCOPY + MSG_ZEROCOPY flag)
# WARNING: this feature may cause performance degradation
Expand Down
5 changes: 3 additions & 2 deletions monoio/src/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,11 @@ impl Inner {
}

#[allow(unused)]
fn drop_op<T: 'static>(&self, index: usize, data: &mut Option<T>) {
#[inline]
fn drop_op<T: 'static>(&self, index: usize, data: &mut Option<T>, skip_cancel: bool) {
match self {
#[cfg(all(target_os = "linux", feature = "iouring"))]
Inner::Uring(this) => UringInner::drop_op(this, index, data),
Inner::Uring(this) => UringInner::drop_op(this, index, data, skip_cancel),
#[cfg(feature = "legacy")]
Inner::Legacy(_) => {}
#[cfg(all(
Expand Down
102 changes: 80 additions & 22 deletions monoio/src/driver/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ mod symlink;
mod splice;

/// In-flight operation
pub(crate) struct Op<T: 'static> {
pub(crate) struct Op<T: 'static + OpAble> {
// Driver running the operation
pub(super) driver: driver::Inner,

Expand All @@ -58,19 +58,87 @@ pub(crate) struct Completion<T> {
/// Operation completion meta info.
#[derive(Debug)]
pub(crate) struct CompletionMeta {
pub(crate) result: io::Result<u32>,
pub(crate) result: io::Result<MaybeFd>,
#[allow(unused)]
pub(crate) flags: u32,
}

#[derive(Debug)]
pub(crate) struct MaybeFd {
is_fd: bool,
fd: u32,
}

impl MaybeFd {
#[inline]
pub(crate) unsafe fn new_result(fdr: io::Result<u32>, is_fd: bool) -> io::Result<Self> {
fdr.map(|fd| Self { is_fd, fd })
}

#[inline]
pub(crate) unsafe fn new_fd_result(fdr: io::Result<u32>) -> io::Result<Self> {
fdr.map(|fd| Self { is_fd: true, fd })
}

#[inline]
pub(crate) fn new_non_fd_result(fdr: io::Result<u32>) -> io::Result<Self> {
fdr.map(|fd| Self { is_fd: false, fd })
}

#[inline]
pub(crate) const unsafe fn new_fd(fd: u32) -> Self {
Self { is_fd: true, fd }
}

#[inline]
pub(crate) const fn new_non_fd(fd: u32) -> Self {
Self { is_fd: false, fd }
}

#[inline]
pub(crate) const fn into_inner(self) -> u32 {
let fd = self.fd;
std::mem::forget(self);
fd
}

#[inline]
pub(crate) const fn zero() -> Self {
Self {
is_fd: false,
fd: 0,
}
}
}

impl Drop for MaybeFd {
fn drop(&mut self) {
// The fd close only executed when:
// 1. the operation is cancelled
// 2. the cancellation failed
// 3. the returned result is a fd
// So this is a relatively cold path. For simplicity, we just do a close syscall here
// instead of pushing close op.
if self.is_fd {
unsafe {
libc::close(self.fd as libc::c_int);
}
}
}
}

pub(crate) trait OpAble {
#[cfg(all(target_os = "linux", feature = "iouring"))]
const RET_IS_FD: bool = false;
#[cfg(all(target_os = "linux", feature = "iouring"))]
const SKIP_CANCEL: bool = false;
#[cfg(all(target_os = "linux", feature = "iouring"))]
fn uring_op(&mut self) -> io_uring::squeue::Entry;

#[cfg(any(feature = "legacy", feature = "poll-io"))]
fn legacy_interest(&self) -> Option<(super::ready::Direction, usize)>;
#[cfg(any(feature = "legacy", feature = "poll-io"))]
fn legacy_call(&mut self) -> io::Result<u32>;
fn legacy_call(&mut self) -> io::Result<MaybeFd>;
}

/// If legacy is enabled and iouring is not, we can expose io interface in a poll-like way.
Expand All @@ -85,10 +153,7 @@ pub(crate) trait PollLegacy {
}

#[cfg(any(feature = "legacy", feature = "poll-io"))]
impl<T> PollLegacy for T
where
T: OpAble,
{
impl<T: OpAble> PollLegacy for T {
#[cfg(feature = "legacy")]
#[inline]
fn poll_legacy(&mut self, _cx: &mut std::task::Context<'_>) -> std::task::Poll<CompletionMeta> {
Expand All @@ -113,35 +178,26 @@ where
}
}

impl<T> Op<T> {
impl<T: OpAble> Op<T> {
/// Submit an operation to uring.
///
/// `state` is stored during the operation tracking any state submitted to
/// the kernel.
pub(super) fn submit_with(data: T) -> io::Result<Op<T>>
where
T: OpAble,
{
pub(super) fn submit_with(data: T) -> io::Result<Op<T>> {
driver::CURRENT.with(|this| this.submit_with(data))
}

/// Try submitting an operation to uring
#[allow(unused)]
pub(super) fn try_submit_with(data: T) -> io::Result<Op<T>>
where
T: OpAble,
{
pub(super) fn try_submit_with(data: T) -> io::Result<Op<T>> {
if driver::CURRENT.is_set() {
Op::submit_with(data)
} else {
Err(io::ErrorKind::Other.into())
}
}

pub(crate) fn op_canceller(&self) -> OpCanceller
where
T: OpAble,
{
pub(crate) fn op_canceller(&self) -> OpCanceller {
#[cfg(feature = "legacy")]
if is_legacy() {
return if let Some((dir, id)) = self.data.as_ref().unwrap().legacy_interest() {
Expand Down Expand Up @@ -181,9 +237,11 @@ where
}
}

impl<T> Drop for Op<T> {
impl<T: OpAble> Drop for Op<T> {
#[inline]
fn drop(&mut self) {
self.driver.drop_op(self.index, &mut self.data);
self.driver
.drop_op(self.index, &mut self.data, T::SKIP_CANCEL);
}
}

Expand Down
33 changes: 16 additions & 17 deletions monoio/src/driver/op/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use {

use super::{super::shared_fd::SharedFd, Op, OpAble};
#[cfg(any(feature = "legacy", feature = "poll-io"))]
use crate::driver::ready::Direction;
use super::{driver::ready::Direction, MaybeFd};
#[cfg(all(unix, any(feature = "legacy", feature = "poll-io")))]
use crate::syscall_u32;

Expand Down Expand Up @@ -54,6 +54,9 @@ impl Op<Accept> {
}

impl OpAble for Accept {
#[cfg(all(target_os = "linux", feature = "iouring"))]
const RET_IS_FD: bool = true;

#[cfg(all(target_os = "linux", feature = "iouring"))]
fn uring_op(&mut self) -> io_uring::squeue::Entry {
opcode::Accept::new(
Expand All @@ -71,16 +74,16 @@ impl OpAble for Accept {
}

#[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))]
fn legacy_call(&mut self) -> io::Result<u32> {
fn legacy_call(&mut self) -> io::Result<MaybeFd> {
let fd = self.fd.as_raw_socket();
let addr = self.addr.0.as_mut_ptr() as *mut _;
let len = &mut self.addr.1;

syscall!(accept(fd as _, addr, len), PartialEq::eq, INVALID_SOCKET)
syscall!(accept@FD(fd as _, addr, len), PartialEq::eq, INVALID_SOCKET)
}

#[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))]
fn legacy_call(&mut self) -> io::Result<u32> {
fn legacy_call(&mut self) -> io::Result<MaybeFd> {
let fd = self.fd.as_raw_fd();
let addr = self.addr.0.as_mut_ptr() as *mut _;
let len = &mut self.addr.1;
Expand All @@ -102,12 +105,10 @@ impl OpAble for Accept {
target_os = "netbsd",
target_os = "openbsd"
))]
return syscall_u32!(accept4(
fd,
addr,
len,
libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK,
));
return {
let flag = libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK;
syscall_u32!(accept4@FD(fd, addr, len, flag))
};

// But not all platforms have the `accept4(2)` call. Luckily BSD (derived)
// OSes inherit the non-blocking flag from the listener, so we just have to
Expand All @@ -119,13 +120,11 @@ impl OpAble for Accept {
target_os = "redox"
))]
return {
let stream_fd = syscall_u32!(accept(fd, addr, len))? as i32;
syscall_u32!(fcntl(stream_fd, libc::F_SETFD, libc::FD_CLOEXEC))
.and_then(|_| syscall_u32!(fcntl(stream_fd, libc::F_SETFL, libc::O_NONBLOCK)))
.inspect_err(|_| {
let _ = syscall_u32!(close(stream_fd));
})?;
Ok(stream_fd as _)
let stream_fd = syscall_u32!(accept@FD(fd, addr, len))?;
let fd = stream_fd.get() as i32;
syscall_u32!(fcntl(fd, libc::F_SETFD, libc::FD_CLOEXEC))?;
syscall_u32!(fcntl(fd, libc::F_SETFL, libc::O_NONBLOCK))?;
Ok(stream_fd)
};
}
}
11 changes: 8 additions & 3 deletions monoio/src/driver/op/close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use {
windows_sys::Win32::Networking::WinSock::closesocket,
};

#[cfg(any(feature = "legacy", feature = "poll-io"))]
use super::MaybeFd;
use super::{Op, OpAble};

pub(crate) struct Close {
Expand All @@ -33,6 +35,9 @@ impl Op<Close> {
}

impl OpAble for Close {
#[cfg(all(target_os = "linux", feature = "iouring"))]
const SKIP_CANCEL: bool = true;

#[cfg(all(target_os = "linux", feature = "iouring"))]
fn uring_op(&mut self) -> io_uring::squeue::Entry {
opcode::Close::new(types::Fd(self.fd)).build()
Expand All @@ -45,11 +50,11 @@ impl OpAble for Close {
}

#[cfg(any(feature = "legacy", feature = "poll-io"))]
fn legacy_call(&mut self) -> io::Result<u32> {
fn legacy_call(&mut self) -> io::Result<MaybeFd> {
#[cfg(unix)]
return crate::syscall_u32!(close(self.fd));
return crate::syscall_u32!(close@NON_FD(self.fd));

#[cfg(windows)]
return syscall!(closesocket(self.fd as _), PartialEq::ne, 0);
return syscall!(closesocket@NON_FD(self.fd as _), PartialEq::ne, 0);
}
}
21 changes: 10 additions & 11 deletions monoio/src/driver/op/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use windows_sys::Win32::Networking::WinSock::{

use super::{super::shared_fd::SharedFd, Op, OpAble};
#[cfg(any(feature = "legacy", feature = "poll-io"))]
use crate::driver::ready::Direction;
use super::{driver::ready::Direction, MaybeFd};

pub(crate) struct Connect {
pub(crate) fd: SharedFd,
Expand Down Expand Up @@ -59,7 +59,7 @@ impl OpAble for Connect {
}

#[cfg(any(feature = "legacy", feature = "poll-io"))]
fn legacy_call(&mut self) -> io::Result<u32> {
fn legacy_call(&mut self) -> io::Result<MaybeFd> {
// For ios/macos, if tfo is enabled, we will
// call connectx here.
// For linux/android, we have already set socket
Expand All @@ -70,7 +70,7 @@ impl OpAble for Connect {
endpoints.sae_dstaddr = self.socket_addr.as_ptr();
endpoints.sae_dstaddrlen = self.socket_addr_len;

return match crate::syscall_u32!(connectx(
return match crate::syscall_u32!(connectx@RAW(
self.fd.raw_fd(),
&endpoints as *const _,
libc::SAE_ASSOCID_ANY,
Expand All @@ -81,18 +81,18 @@ impl OpAble for Connect {
std::ptr::null_mut(),
)) {
Err(err) if err.raw_os_error() != Some(libc::EINPROGRESS) => Err(err),
_ => Ok(self.fd.raw_fd() as u32),
_ => Ok(MaybeFd::zero()),
};
}

#[cfg(unix)]
match crate::syscall_u32!(connect(
match crate::syscall_u32!(connect@RAW(
self.fd.raw_fd(),
self.socket_addr.as_ptr(),
self.socket_addr_len,
)) {
Err(err) if err.raw_os_error() != Some(libc::EINPROGRESS) => Err(err),
_ => Ok(self.fd.raw_fd() as u32),
_ => Ok(MaybeFd::zero()),
}

#[cfg(windows)]
Expand All @@ -110,8 +110,7 @@ impl OpAble for Connect {
return Err(err);
}
}
#[allow(clippy::unnecessary_cast)]
Ok(self.fd.raw_socket() as u32)
Ok(MaybeFd::zero())
}
}
}
Expand Down Expand Up @@ -158,14 +157,14 @@ impl OpAble for ConnectUnix {
}

#[cfg(any(feature = "legacy", feature = "poll-io"))]
fn legacy_call(&mut self) -> io::Result<u32> {
match crate::syscall_u32!(connect(
fn legacy_call(&mut self) -> io::Result<MaybeFd> {
match crate::syscall_u32!(connect@RAW(
self.fd.raw_fd(),
&self.socket_addr.0 as *const _ as *const _,
self.socket_addr.1
)) {
Err(err) if err.raw_os_error() != Some(libc::EINPROGRESS) => Err(err),
_ => Ok(self.fd.raw_fd() as u32),
_ => Ok(MaybeFd::zero()),
}
}
}
Expand Down
Loading

0 comments on commit 3c899be

Please sign in to comment.