Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bufwrite的write总是成功,声明和实现统一 #384

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions ds/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ mod bits;
pub use bits::*;

pub trait BufWriter {
fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()>;
fn write_all(&mut self, buf: &[u8]);
#[inline]
fn write_seg_all(&mut self, buf0: &[u8], buf1: &[u8]) -> std::io::Result<()> {
self.write_all(buf0)?;
self.write_all(buf1)
fn write_seg_all(&mut self, buf0: &[u8], buf1: &[u8]) {
self.write_all(buf0);
self.write_all(buf1);
}
}

Expand Down
4 changes: 2 additions & 2 deletions ds/src/mem/ring_slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,13 @@ impl RingSlice {
});
}
#[inline]
pub fn copy_to<W: crate::BufWriter>(&self, oft: usize, w: &mut W) -> std::io::Result<()> {
pub fn copy_to<W: crate::BufWriter>(&self, oft: usize, w: &mut W) {
with_segment!(
self,
oft,
|p, l| w.write_all(from_raw_parts(p, l)),
|p0, l0, p1, l1| { w.write_seg_all(from_raw_parts(p0, l0), from_raw_parts(p1, l1)) }
)
);
}
#[inline]
pub fn copy_to_vec(&self, v: &mut Vec<u8>) {
Expand Down
6 changes: 3 additions & 3 deletions protocol/src/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ impl Protocol for Kv {
// noop: 第一个字节变更为Response,其他的与Request保持一致
OP_NOOP => {
w.write_u8(RESPONSE_MAGIC)?;
w.write_slice(ctx.request(), 1)?;
w.write_slice(ctx.request(), 1);
}

//version: 返回固定rsp
Expand Down Expand Up @@ -381,10 +381,10 @@ impl Kv {
w.write_u32(extra)?;
}
if let Some(key) = &key {
w.write_ringslice(key, 0)?;
w.write_ringslice(key, 0);
}
if let Some(response) = response {
w.write_slice(response, 0)? // value
w.write_slice(response, 0) // value
}
Ok(())
}
Expand Down
5 changes: 3 additions & 2 deletions protocol/src/memcache/binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl Protocol for MemcacheBinary {
log::debug!("+++ will write mc rsp:{:?}", rsp.data());
//let data = rsp.data_mut();
rsp.restore_op(old_op_code);
w.write_slice(rsp, 0)?;
w.write_slice(rsp, 0);

return Ok(());
}
Expand All @@ -154,7 +154,8 @@ impl Protocol for MemcacheBinary {
// noop: 第一个字节变更为Response,其他的与Request保持一致
OP_NOOP => {
w.write_u8(RESPONSE_MAGIC)?;
w.write_slice(ctx.request(), 1)
w.write_slice(ctx.request(), 1);
Ok(())
}

OP_VERSION => w.write(&VERSION_RESPONSE),
Expand Down
11 changes: 8 additions & 3 deletions protocol/src/msgque/mcq/binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl Protocol for McqBinary {
let resp = ctx.response();
let data = resp.data();
// data.restore_op(old_op_code as u8);
w.write_slice(data, 0)?;
w.write_slice(data, 0);
Ok(0)
}
#[inline]
Expand Down Expand Up @@ -123,8 +123,13 @@ impl Protocol for McqBinary {
Ok(0)
} // get: 0x01 NotFound
_ => {
log::warn!("+++ mcq NoResponseFound req: {}/{:?}", old_op_code, ctx.request());
return Err(Error::NoResponseFound); },
log::warn!(
"+++ mcq NoResponseFound req: {}/{:?}",
old_op_code,
ctx.request()
);
return Err(Error::NoResponseFound);
}
}
}
#[inline]
Expand Down
4 changes: 2 additions & 2 deletions protocol/src/msgque/mcq/text/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl Protocol for McqText {
if let Some(rsp) = response {
// 不再创建local rsp,所有server响应的rsp data长度应该大于0
debug_assert!(rsp.len() > 0, "req:{:?}, rsp:{:?}", request, rsp);
w.write_slice(rsp, 0)?;
w.write_slice(rsp, 0);
self.metrics(request, rsp, ctx);
} else {
let padding = cfg.get_padding_rsp();
Expand All @@ -160,7 +160,7 @@ impl Protocol for McqText {
// let rsp = ctx.response().data();
// // 虽然quit的响应长度为0,但不排除有其他响应长度为0的场景,还是用quit属性来断连更安全
// if rsp.len() > 0 {
// w.write_slice(rsp, 0)?;
// w.write_slice(rsp, 0);
// }

Ok(())
Expand Down
4 changes: 2 additions & 2 deletions protocol/src/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ impl Protocol for Redis {
if !cfg.multi {
// 非multi请求,有响应直接返回client,否则构建
if let Some(rsp) = response {
w.write_slice(rsp, 0)?;
w.write_slice(rsp, 0);
} else {
// 无响应,则根据cmd name构建对应响应
match cfg.cmd_type {
Expand Down Expand Up @@ -303,7 +303,7 @@ impl Protocol for Redis {
// 如果rsp是ok,或者不需要bulk num,直接发送;否则构建rsp or padding rsp
if let Some(rsp) = response {
if rsp.ok() || !cfg.need_bulk_num {
w.write_slice(rsp, 0)?;
w.write_slice(rsp, 0);
return Ok(());
}
}
Expand Down
10 changes: 4 additions & 6 deletions protocol/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,14 @@ pub trait Writer: ds::BufWriter + Sized {
fn cache(&mut self, hint: bool);

#[inline]
fn write_slice<S: Deref<Target = MemGuard>>(&mut self, data: &S, oft: usize) -> Result<()> {
(&*data).copy_to(oft, self)?;
Ok(())
fn write_slice<S: Deref<Target = MemGuard>>(&mut self, data: &S, oft: usize) {
(&*data).copy_to(oft, self);
}

// 暂时没发现更好的实现方式,先用这个实现
#[inline]
fn write_ringslice(&mut self, data: &RingSlice, oft: usize) -> Result<()> {
data.copy_to(oft, self)?;
Ok(())
fn write_ringslice(&mut self, data: &RingSlice, oft: usize) {
data.copy_to(oft, self);
}

fn shrink(&mut self);
Expand Down
2 changes: 1 addition & 1 deletion protocol/src/uuid/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl Protocol for Uuid {
I: MetricItem,
{
if let Some(rsp) = response {
w.write_slice(rsp, 0)?;
w.write_slice(rsp, 0);
Ok(())
} else {
w.write(b"SERVER_ERROR uuid no available\r\n")?;
Expand Down
9 changes: 4 additions & 5 deletions rt/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,20 +148,19 @@ impl<S: AsyncWrite + Unpin + std::fmt::Debug> protocol::Writer for Stream<S> {
}
impl<S: AsyncWrite + Unpin + std::fmt::Debug> ds::BufWriter for Stream<S> {
#[inline]
fn write_all(&mut self, data: &[u8]) -> std::io::Result<()> {
fn write_all(&mut self, data: &[u8]) {
if data.len() <= 4 {
self.buf.write(data);
} else {
let mut ctx = Context::from_waker(&NOOP);
let _ = Pin::new(self).poll_write(&mut ctx, data);
}
Ok(())
}
#[inline]
fn write_seg_all(&mut self, buf0: &[u8], buf1: &[u8]) -> std::io::Result<()> {
fn write_seg_all(&mut self, buf0: &[u8], buf1: &[u8]) {
self.buf.enable = true;
self.write_all(buf0)?;
self.write_all(buf1)
self.write_all(buf0);
self.write_all(buf1);
}
}
impl<S> protocol::BufRead for Stream<S> {
Expand Down
2 changes: 1 addition & 1 deletion stream/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ where
while let Some(req) = ready!(self.data.poll_recv(cx)) {
self.num.tx();

self.s.write_slice(&*req, 0)?;
self.s.write_slice(&*req, 0);

match req.on_sent() {
Some(r) => self.pending.push_back((r, Instant::now())),
Expand Down
Loading