Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: Introduce UnsubmittedRead #321

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions src/fs/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::fs::OpenOptions;
use crate::io::SharedFd;

use crate::runtime::driver::op::Op;
use crate::{UnsubmittedOneshot, UnsubmittedWrite};
use crate::{UnsubmittedOneshot, UnsubmittedRead, UnsubmittedWrite};
use std::fmt;
use std::io;
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
Expand Down Expand Up @@ -177,9 +177,23 @@ impl File {
/// }
/// ```
pub async fn read_at<T: BoundedBufMut>(&self, buf: T, pos: u64) -> crate::BufResult<usize, T> {
// Submit the read operation
let op = Op::read_at(&self.fd, buf, pos).unwrap();
op.await
UnsubmittedOneshot::read_at(&self.fd, buf, pos)
.submit()
.await
}

/// Read some bytes at the specified offset from the file into the specified
/// buffer, returning how many bytes were read.
///
/// Like [`read`], but returns unsubmitted.
///
/// Returns an UnsubmittedRead could be submitted.
pub async fn unsubmitted_read_at<T: BoundedBufMut>(
&self,
buf: T,
pos: u64,
) -> UnsubmittedRead<T> {
UnsubmittedOneshot::read_at(&self.fd, buf, pos)
}

/// Read some bytes at the specified offset from the file into the specified
Expand Down
2 changes: 1 addition & 1 deletion src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub(crate) use noop::NoOp;

mod open;

mod read;
pub(crate) mod read;

mod read_fixed;

Expand Down
91 changes: 45 additions & 46 deletions src/io/read.rs
Original file line number Diff line number Diff line change
@@ -1,64 +1,63 @@
use io_uring::cqueue::Entry;

use crate::buf::BoundedBufMut;
use crate::io::SharedFd;
use crate::BufResult;
use crate::{BufResult, OneshotOutputTransform, UnsubmittedOneshot};

use crate::runtime::driver::op::{Completable, CqeResult, Op};
use crate::runtime::CONTEXT;
use std::io;
use std::marker::PhantomData;

/// An unsubmitted read operation.
pub type UnsubmittedRead<T> = UnsubmittedOneshot<ReadData<T>, ReadTransform<T>>;

pub(crate) struct Read<T> {
#[allow(missing_docs)]
pub struct ReadData<T> {
/// Holds a strong ref to the FD, preventing the file from being closed
/// while the operation is in-flight.
#[allow(dead_code)]
fd: SharedFd,
_fd: SharedFd,

/// Reference to the in-flight buffer.
pub(crate) buf: T,
buf: T,
}

impl<T: BoundedBufMut> Op<Read<T>> {
pub(crate) fn read_at(fd: &SharedFd, buf: T, offset: u64) -> io::Result<Op<Read<T>>> {
use io_uring::{opcode, types};

CONTEXT.with(|x| {
x.handle().expect("Not in a runtime context").submit_op(
Read {
fd: fd.clone(),
buf,
},
|read| {
// Get raw buffer info
let ptr = read.buf.stable_mut_ptr();
let len = read.buf.bytes_total();
opcode::Read::new(types::Fd(fd.raw_fd()), ptr, len as _)
.offset(offset as _)
.build()
},
)
})
}
#[allow(missing_docs)]
pub struct ReadTransform<T> {
_phantom: PhantomData<T>,
}

impl<T> Completable for Read<T>
where
T: BoundedBufMut,
{
impl<T> OneshotOutputTransform for ReadTransform<T> {
type Output = BufResult<usize, T>;
type StoredData = ReadData<T>;

fn complete(self, cqe: CqeResult) -> Self::Output {
// Convert the operation result to `usize`
let res = cqe.result.map(|v| v as usize);
// Recover the buffer
let mut buf = self.buf;
fn transform_oneshot_output(self, data: Self::StoredData, cqe: Entry) -> Self::Output {
let res = if cqe.result() >= 0 {
Ok(cqe.result() as usize)
} else {
Err(io::Error::from_raw_os_error(-cqe.result()))
};

// If the operation was successful, advance the initialized cursor.
if let Ok(n) = res {
// Safety: the kernel wrote `n` bytes to the buffer.
unsafe {
buf.set_init(n);
}
}
(res, data.buf)
}
}

impl<T: BoundedBufMut> UnsubmittedRead<T> {
pub(crate) fn read_at(fd: &SharedFd, mut buf: T, offset: u64) -> Self {
use io_uring::{opcode, types};

(res, buf)
// Get raw buffer info
let ptr = buf.stable_mut_ptr();
let len = buf.bytes_total();

Self::new(
ReadData {
_fd: fd.clone(),
buf,
},
ReadTransform {
_phantom: PhantomData,
},
opcode::Read::new(types::Fd(fd.raw_fd()), ptr, len as _)
.offset(offset as _)
.build(),
)
}
}
6 changes: 3 additions & 3 deletions src/io/socket.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::io::write::UnsubmittedWrite;
use crate::runtime::driver::op::Op;
use crate::UnsubmittedRead;
use crate::{
buf::fixed::FixedBuf,
buf::{BoundedBuf, BoundedBufMut, IoBuf, Slice},
Expand Down Expand Up @@ -168,9 +169,8 @@ impl Socket {
op.await
}

pub(crate) async fn read<T: BoundedBufMut>(&self, buf: T) -> crate::BufResult<usize, T> {
let op = Op::read_at(&self.fd, buf, 0).unwrap();
op.await
pub(crate) fn read<T: BoundedBufMut>(&self, buf: T) -> UnsubmittedRead<T> {
UnsubmittedOneshot::read_at(&self.fd, buf, 0)
}

pub(crate) async fn read_fixed<T>(&self, buf: T) -> crate::BufResult<usize, T>
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ pub mod buf;
pub mod fs;
pub mod net;

pub use io::read::*;
pub use io::write::*;

pub use runtime::driver::op::{InFlightOneshot, OneshotOutputTransform, UnsubmittedOneshot};
pub use runtime::spawn;
pub use runtime::Runtime;
Expand Down
16 changes: 12 additions & 4 deletions src/net/tcp/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ use std::{
};

use crate::{
buf::fixed::FixedBuf,
buf::{BoundedBuf, BoundedBufMut},
buf::{fixed::FixedBuf, BoundedBuf, BoundedBufMut},
io::{SharedFd, Socket},
UnsubmittedWrite,
UnsubmittedRead, UnsubmittedWrite,
};

/// A TCP stream between a local and a remote socket.
Expand Down Expand Up @@ -75,7 +74,16 @@ impl TcpStream {
///
/// Returns the original buffer and quantity of data read.
pub async fn read<T: BoundedBufMut>(&self, buf: T) -> crate::BufResult<usize, T> {
self.inner.read(buf).await
self.inner.read(buf).submit().await
}

/// Read some data from the stream
///
/// Like [`read`], but returns unsubmitted.
///
/// Returns an UnsubmittedRead could be submitted.
pub fn unsubmitted_read<T: BoundedBufMut>(&self, buf: T) -> UnsubmittedRead<T> {
self.inner.read(buf)
}

/// Read some data from the stream into a registered buffer.
Expand Down
16 changes: 12 additions & 4 deletions src/net/udp.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use crate::{
buf::fixed::FixedBuf,
buf::{BoundedBuf, BoundedBufMut},
buf::{fixed::FixedBuf, BoundedBuf, BoundedBufMut},
io::{SharedFd, Socket},
UnsubmittedWrite,
UnsubmittedRead, UnsubmittedWrite,
};
use socket2::SockAddr;
use std::{
Expand Down Expand Up @@ -317,7 +316,16 @@ impl UdpSocket {
///
/// Returns the original buffer and quantity of data read.
pub async fn read<T: BoundedBufMut>(&self, buf: T) -> crate::BufResult<usize, T> {
self.inner.read(buf).await
self.inner.read(buf).submit().await
}

/// Read some data from the stream
///
/// Like [`read`], but returns unsubmitted.
///
/// Returns an UnsubmittedRead could be submitted.
pub fn unsubmitted_read<T: BoundedBufMut>(&self, buf: T) -> UnsubmittedRead<T> {
self.inner.read(buf)
}

/// Receives a single datagram message into a registered buffer.
Expand Down
16 changes: 12 additions & 4 deletions src/net/unix/stream.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use crate::{
buf::fixed::FixedBuf,
buf::{BoundedBuf, BoundedBufMut},
buf::{fixed::FixedBuf, BoundedBuf, BoundedBufMut},
io::{SharedFd, Socket},
UnsubmittedWrite,
UnsubmittedRead, UnsubmittedWrite,
};
use socket2::SockAddr;
use std::{
Expand Down Expand Up @@ -76,7 +75,16 @@ impl UnixStream {
/// Read some data from the stream into the buffer, returning the original buffer and
/// quantity of data read.
pub async fn read<T: BoundedBufMut>(&self, buf: T) -> crate::BufResult<usize, T> {
self.inner.read(buf).await
self.inner.read(buf).submit().await
}

/// Read some data from the stream
///
/// Like [`read`], but returns unsubmitted.
///
/// Returns an UnsubmittedRead could be submitted.
pub fn unsubmitted_read<T: BoundedBufMut>(&self, buf: T) -> UnsubmittedRead<T> {
self.inner.read(buf)
}

/// Like [`read`], but using a pre-mapped buffer
Expand Down