Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed descriptor support #222

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
b5fb182
Introduce fd variations: raw and fixed
FrankReh Feb 3, 2023
a57f2ae
Adds the fixed descriptor capability to SharedFd
FrankReh Feb 3, 2023
2d67d61
fuller open fixed support
FrankReh Feb 10, 2023
24b2749
async close fixed support
FrankReh Feb 10, 2023
1baf987
File::read fixed support
FrankReh Feb 10, 2023
c9e9ceb
File::write fixed support
FrankReh Feb 10, 2023
ff699e2
support dropping fixed file table descriptors
FrankReh Feb 11, 2023
9cbca8e
general comment cleanup
FrankReh Feb 11, 2023
297477e
comment idea for fixed slot drop
FrankReh Feb 12, 2023
8f76a1f
Merge branch 'master' into frankreh/fixed-descriptor-support
FrankReh Feb 13, 2023
cbef4d2
Merge branch 'master' into frankreh/fixed-descriptor-support
FrankReh Feb 14, 2023
3debd20
opcode::Close uses Fixed(fd) now
FrankReh Feb 14, 2023
94adb7d
Merge branch 'master' into frankreh/fixed-descriptor-support
FrankReh Feb 20, 2023
b81fce0
bump io-uring dependency to 0.5.13
FrankReh Feb 20, 2023
c80ae28
remove allow(dead_code)
FrankReh Feb 20, 2023
3de2068
remove old commented-out code
FrankReh Feb 20, 2023
2390971
simplify shared_fd::CommonFd
FrankReh Feb 20, 2023
21ef00f
Merge branch 'master' into frankreh/fixed-descriptor-support
FrankReh Feb 20, 2023
7913a17
Merge branch 'master' into frankreh/fixed-descriptor-support
FrankReh Feb 21, 2023
4522d04
Merge branch 'master' into frankreh/fixed-descriptor-support
FrankReh Feb 25, 2023
60a7ca3
Merge branch 'master' into frankreh/fixed-descriptor-support
FrankReh Feb 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions src/fs/open_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub struct OpenOptions {
truncate: bool,
create: bool,
create_new: bool,
pub(crate) fixed_table_slot: bool,
pub(crate) mode: libc::mode_t,
pub(crate) custom_flags: libc::c_int,
}
Expand Down Expand Up @@ -95,6 +96,7 @@ impl OpenOptions {
truncate: false,
create: false,
create_new: false,
fixed_table_slot: false,
mode: 0o666,
custom_flags: 0,
}
Expand Down Expand Up @@ -217,6 +219,40 @@ impl OpenOptions {
self
}

/// Sets the option for using the io_uring fixed file table to manage
/// the file descriptor, rather than the kernel's process file descriptor table.
///
/// The regular file descriptor, often referred to as the raw fd,
/// will not be available.
///
/// # Examples
///
/// ```no_run
/// use tokio_uring::fs::OpenOptions;
///
/// fn main() -> Result<(), Box<dyn std::error::Error>> {
/// tokio_uring::start(async {
/// let file = OpenOptions::new()
/// .write(true)
/// .truncate(true)
/// .fixed_table_slot(true)
/// .open("foo.txt")
/// .await?;
///
/// // Write to file. And then close.
///
/// // Close, returning close result.
/// file.close().await?;
/// Ok(())
/// })
///
/// }
/// ```
pub fn fixed_table_slot(&mut self, fixed_table_slot: bool) -> &mut OpenOptions {
self.fixed_table_slot = fixed_table_slot;
self
}

/// Sets the option to create a new file, or open it if it already exists.
///
/// In order for the file to be created, [`OpenOptions::write`] or
Expand Down
31 changes: 20 additions & 11 deletions src/io/close.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,33 @@
use crate::io::shared_fd::CommonFd;
use crate::runtime::driver::op;
use crate::runtime::driver::op::{Completable, Op};
use crate::runtime::CONTEXT;
use std::io;
use std::os::unix::io::RawFd;

pub(crate) struct Close {
fd: RawFd,
}
pub(crate) struct Close {}

impl Op<Close> {
pub(crate) fn close(fd: RawFd) -> io::Result<Op<Close>> {
pub(crate) fn close(fd: CommonFd) -> io::Result<Op<Close>> {
use io_uring::{opcode, types};

CONTEXT.with(|x| {
x.handle()
.expect("Not in a runtime context")
.submit_op(Close { fd }, |close| {
opcode::Close::new(types::Fd(close.fd)).build()
match fd {
CommonFd::Raw(raw) => {
let fd = types::Fd(raw);
CONTEXT.with(|x| {
x.handle()
.expect("Not in a runtime context")
.submit_op(Close {}, |_close| opcode::Close::new(fd).build())
})
}
CommonFd::Fixed(fixed) => {
let fd = types::Fixed(fixed);
CONTEXT.with(|x| {
x.handle()
.expect("Not in a runtime context")
.submit_op(Close {}, |_close| opcode::Close::new(fd).build())
})
})
}
}
}
}

Expand Down
35 changes: 28 additions & 7 deletions src/io/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ use crate::runtime::driver::op::{Completable, CqeResult, Op};
use crate::runtime::CONTEXT;
use std::ffi::CString;
use std::io;
use std::os::unix::io::RawFd;
use std::path::Path;

/// Open a file
#[allow(dead_code)]
pub(crate) struct Open {
pub(crate) path: CString,
pub(crate) flags: libc::c_int,
path: CString,
flags: libc::c_int,
fixed_table_auto_select: bool,
}

impl Op<Open> {
Expand All @@ -24,10 +26,20 @@ impl Op<Open> {
| options.creation_mode()?
| (options.custom_flags & !libc::O_ACCMODE);

let (file_index, fixed_table_auto_select) = if options.fixed_table_slot {
(Some(types::DestinationSlot::auto_target()), true)
} else {
(None, false)
};

CONTEXT.with(|x| {
x.handle()
.expect("Not in a runtime context")
.submit_op(Open { path, flags }, |open| {
x.handle().expect("Not in a runtime context").submit_op(
Open {
path,
flags,
fixed_table_auto_select,
},
|open| {
// Get a reference to the memory. The string will be held by the
// operation state and will not be accessed again until the operation
// completes.
Expand All @@ -36,8 +48,10 @@ impl Op<Open> {
opcode::OpenAt::new(types::Fd(libc::AT_FDCWD), p_ref)
.flags(flags)
.mode(options.mode)
.file_index(file_index)
.build()
})
},
)
})
}
}
Expand All @@ -46,6 +60,13 @@ impl Completable for Open {
type Output = io::Result<File>;

fn complete(self, cqe: CqeResult) -> Self::Output {
Ok(File::from_shared_fd(SharedFd::new(cqe.result? as _)))
let result = cqe.result?;
let shared_fd = if self.fixed_table_auto_select {
SharedFd::new_fixed(result)
} else {
SharedFd::new(result as RawFd)
};

Ok(File::from_shared_fd(shared_fd))
}
}
18 changes: 15 additions & 3 deletions src/io/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::buf::BoundedBufMut;
use crate::io::SharedFd;
use crate::BufResult;

use crate::io::shared_fd::CommonFd;
use crate::runtime::driver::op::{Completable, CqeResult, Op};
use crate::runtime::CONTEXT;
use std::io;
Expand Down Expand Up @@ -30,9 +31,20 @@ impl<T: BoundedBufMut> Op<Read<T>> {
// Get raw buffer info
let ptr = read.buf.stable_mut_ptr();
let len = read.buf.bytes_total();
opcode::Read::new(types::Fd(fd.raw_fd()), ptr, len as _)
.offset(offset as _)
.build()
match read.fd.common_fd() {
CommonFd::Raw(raw) => {
let fd = types::Fd(raw);
opcode::Read::new(fd, ptr, len as _)
.offset(offset as _)
.build()
}
CommonFd::Fixed(fixed) => {
let fd = types::Fixed(fixed);
opcode::Read::new(fd, ptr, len as _)
.offset(offset as _)
.build()
}
}
},
)
})
Expand Down
81 changes: 76 additions & 5 deletions src/io/shared_fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{
};

use crate::runtime::driver::op::Op;
use crate::runtime::CONTEXT;

// Tracks in-flight operations on a file descriptor. Ensures all in-flight
// operations complete before submitting the close.
Expand All @@ -23,9 +24,15 @@ pub(crate) struct SharedFd {
inner: Rc<Inner>,
}

#[derive(Debug, Clone, Copy)]
pub(crate) enum CommonFd {
Raw(RawFd),
Fixed(u32),
}

struct Inner {
// Open file descriptor
fd: RawFd,
fd: CommonFd,

// Track the sharing state of the file descriptor:
// normal, being waited on to allow a close by the parent's owner, or already closed.
Expand All @@ -45,6 +52,14 @@ enum State {

impl SharedFd {
pub(crate) fn new(fd: RawFd) -> SharedFd {
Self::_new(CommonFd::Raw(fd))
}

pub(crate) fn new_fixed(slot: u32) -> SharedFd {
Self::_new(CommonFd::Fixed(slot))
}

fn _new(fd: CommonFd) -> SharedFd {
SharedFd {
inner: Rc::new(Inner {
fd,
Expand All @@ -53,8 +68,26 @@ impl SharedFd {
}
}

/// Returns the RawFd
/*
* This function name won't make sense when this fixed file feature
* is fully fleshed out. For now, we panic if called on
* a fixed file.
*/
/// Returns the RawFd.
pub(crate) fn raw_fd(&self) -> RawFd {
match self.inner.fd {
CommonFd::Raw(raw) => raw,
CommonFd::Fixed(_fixed) => {
// TODO remove this function completely once all the uring opcodes that accept
// a fixed file table slot have been modified. For now, we have to keep it to avoid
// too many file changes all at once.
unreachable!("fixed file support not yet added for this call stack");
}
}
}

// Returns the common fd, either a RawFd or the fixed fd slot number.
pub(crate) fn common_fd(&self) -> CommonFd {
self.inner.fd
}

Expand Down Expand Up @@ -147,14 +180,52 @@ impl Drop for SharedFd {

impl Drop for Inner {
fn drop(&mut self) {
// If the inner state isn't `Closed`, the user hasn't called close().await
// so do it synchronously.
// If the inner state isn't `Closed`, the user hasn't called close().await so close it now.
// At least for the case of a regular file descriptor we can do it synchronously. For the
// case of a fixed file table descriptor, we may already be out of the driver's context,
// but if we aren't we resort to the io_uring close operation - and spawn a task to do it.

let state = self.state.borrow_mut();

if let State::Closed = *state {
return;
}
let _ = unsafe { std::fs::File::from_raw_fd(self.fd) };

// Perform one form of close or the other.
match self.fd {
CommonFd::Raw(raw) => {
let _ = unsafe { std::fs::File::from_raw_fd(raw) };
}

CommonFd::Fixed(fixed) => {
// As there is no synchronous close for a fixed file table slot, we have to resort
// to the async close provided by the io_uring device. If we knew the fixed file
// table had been unregistered, this wouldn't be necessary either.

match CONTEXT.try_with(|cx| cx.is_set()) {
Ok(true) => {}
// If the driver is gone, nothing to do. The fixed table has already been taken
// down by the device anyway.
_ => return,
}

// TODO Investigate the idea from the liburing team of replacing the one slot with
// a -1 by using the register/files_update synchronous command. If the current
// scheme that uses a spawn is initiallly acceptable, probably leave it like this
// for now and wait to be able to benchmark once we have streaming tcp sockets.

crate::spawn(async move {
if let Ok(true) = CONTEXT.try_with(|cx| cx.is_set()) {
let fd = CommonFd::Fixed(fixed);
if let Ok(op) = Op::close(fd) {
let _ = op.await;
}
// Else, should warn or panic if the Op::Close can't be built? It would
// mean the fixed value was out of reach which would not be expected at
// this point.
}
});
}
}
}
}
26 changes: 20 additions & 6 deletions src/io/write.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use crate::{buf::BoundedBuf, io::SharedFd, BufResult, OneshotOutputTransform, UnsubmittedOneshot};
use crate::{
buf::BoundedBuf,
io::{shared_fd::CommonFd, SharedFd},
BufResult, OneshotOutputTransform, UnsubmittedOneshot,
};
use io_uring::cqueue::Entry;
use std::io;
use std::marker::PhantomData;
use std::{io, marker::PhantomData};

/// An unsubmitted write operation.
pub type UnsubmittedWrite<T> = UnsubmittedOneshot<WriteData<T>, WriteTransform<T>>;
Expand Down Expand Up @@ -51,9 +54,20 @@ impl<T: BoundedBuf> UnsubmittedWrite<T> {
WriteTransform {
_phantom: PhantomData::default(),
},
opcode::Write::new(types::Fd(fd.raw_fd()), ptr, len as _)
.offset(offset as _)
.build(),
match fd.common_fd() {
CommonFd::Raw(raw) => {
let fd = types::Fd(raw);
opcode::Write::new(fd, ptr, len as _)
.offset(offset as _)
.build()
}
CommonFd::Fixed(fixed) => {
let fd = types::Fixed(fixed);
opcode::Write::new(fd, ptr, len as _)
.offset(offset as _)
.build()
}
},
)
}
}