From 4c932a88185c0c76abf0d1f04048a25cd006d0bc Mon Sep 17 00:00:00 2001 From: kongkong Date: Thu, 2 Nov 2023 14:38:07 +0800 Subject: [PATCH] =?UTF-8?q?bufwrite=E7=9A=84write=E6=80=BB=E6=98=AF?= =?UTF-8?q?=E6=88=90=E5=8A=9F=EF=BC=8C=E5=A3=B0=E6=98=8E=E5=92=8C=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0=E7=BB=9F=E4=B8=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ds/src/lib.rs | 8 ++++---- ds/src/mem/ring_slice.rs | 4 ++-- protocol/src/kv/mod.rs | 6 +++--- protocol/src/memcache/binary/mod.rs | 5 +++-- protocol/src/msgque/mcq/binary/mod.rs | 11 ++++++++--- protocol/src/msgque/mcq/text/mod.rs | 4 ++-- protocol/src/redis/mod.rs | 4 ++-- protocol/src/stream.rs | 10 ++++------ protocol/src/uuid/mod.rs | 2 +- rt/src/stream/mod.rs | 9 ++++----- stream/src/handler.rs | 2 +- 11 files changed, 34 insertions(+), 31 deletions(-) diff --git a/ds/src/lib.rs b/ds/src/lib.rs index a051d9dc3..3ae2f3027 100644 --- a/ds/src/lib.rs +++ b/ds/src/lib.rs @@ -27,11 +27,11 @@ mod bits; pub use bits::*; pub trait BufWriter { - fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()>; + fn write_all(&mut self, buf: &[u8]); #[inline] - fn write_seg_all(&mut self, buf0: &[u8], buf1: &[u8]) -> std::io::Result<()> { - self.write_all(buf0)?; - self.write_all(buf1) + fn write_seg_all(&mut self, buf0: &[u8], buf1: &[u8]) { + self.write_all(buf0); + self.write_all(buf1); } } diff --git a/ds/src/mem/ring_slice.rs b/ds/src/mem/ring_slice.rs index 4354c27f1..e5e370401 100644 --- a/ds/src/mem/ring_slice.rs +++ b/ds/src/mem/ring_slice.rs @@ -194,13 +194,13 @@ impl RingSlice { }); } #[inline] - pub fn copy_to(&self, oft: usize, w: &mut W) -> std::io::Result<()> { + pub fn copy_to(&self, oft: usize, w: &mut W) { with_segment!( self, oft, |p, l| w.write_all(from_raw_parts(p, l)), |p0, l0, p1, l1| { w.write_seg_all(from_raw_parts(p0, l0), from_raw_parts(p1, l1)) } - ) + ); } #[inline] pub fn copy_to_vec(&self, v: &mut Vec) { diff --git a/protocol/src/kv/mod.rs b/protocol/src/kv/mod.rs index 30fbeea8d..17e2bc972 100644 --- a/protocol/src/kv/mod.rs +++ b/protocol/src/kv/mod.rs @@ -190,7 +190,7 @@ impl Protocol for Kv { // noop: 第一个字节变更为Response,其他的与Request保持一致 OP_NOOP => { w.write_u8(RESPONSE_MAGIC)?; - w.write_slice(ctx.request(), 1)?; + w.write_slice(ctx.request(), 1); } //version: 返回固定rsp @@ -381,10 +381,10 @@ impl Kv { w.write_u32(extra)?; } if let Some(key) = &key { - w.write_ringslice(key, 0)?; + w.write_ringslice(key, 0); } if let Some(response) = response { - w.write_slice(response, 0)? // value + w.write_slice(response, 0) // value } Ok(()) } diff --git a/protocol/src/memcache/binary/mod.rs b/protocol/src/memcache/binary/mod.rs index 49c5e0c67..d2fff8658 100644 --- a/protocol/src/memcache/binary/mod.rs +++ b/protocol/src/memcache/binary/mod.rs @@ -143,7 +143,7 @@ impl Protocol for MemcacheBinary { log::debug!("+++ will write mc rsp:{:?}", rsp.data()); //let data = rsp.data_mut(); rsp.restore_op(old_op_code); - w.write_slice(rsp, 0)?; + w.write_slice(rsp, 0); return Ok(()); } @@ -154,7 +154,8 @@ impl Protocol for MemcacheBinary { // noop: 第一个字节变更为Response,其他的与Request保持一致 OP_NOOP => { w.write_u8(RESPONSE_MAGIC)?; - w.write_slice(ctx.request(), 1) + w.write_slice(ctx.request(), 1); + Ok(()) } OP_VERSION => w.write(&VERSION_RESPONSE), diff --git a/protocol/src/msgque/mcq/binary/mod.rs b/protocol/src/msgque/mcq/binary/mod.rs index 34a187ce7..049025e75 100644 --- a/protocol/src/msgque/mcq/binary/mod.rs +++ b/protocol/src/msgque/mcq/binary/mod.rs @@ -87,7 +87,7 @@ impl Protocol for McqBinary { let resp = ctx.response(); let data = resp.data(); // data.restore_op(old_op_code as u8); - w.write_slice(data, 0)?; + w.write_slice(data, 0); Ok(0) } #[inline] @@ -123,8 +123,13 @@ impl Protocol for McqBinary { Ok(0) } // get: 0x01 NotFound _ => { - log::warn!("+++ mcq NoResponseFound req: {}/{:?}", old_op_code, ctx.request()); - return Err(Error::NoResponseFound); }, + log::warn!( + "+++ mcq NoResponseFound req: {}/{:?}", + old_op_code, + ctx.request() + ); + return Err(Error::NoResponseFound); + } } } #[inline] diff --git a/protocol/src/msgque/mcq/text/mod.rs b/protocol/src/msgque/mcq/text/mod.rs index daa1251ec..23a0d6395 100644 --- a/protocol/src/msgque/mcq/text/mod.rs +++ b/protocol/src/msgque/mcq/text/mod.rs @@ -148,7 +148,7 @@ impl Protocol for McqText { if let Some(rsp) = response { // 不再创建local rsp,所有server响应的rsp data长度应该大于0 debug_assert!(rsp.len() > 0, "req:{:?}, rsp:{:?}", request, rsp); - w.write_slice(rsp, 0)?; + w.write_slice(rsp, 0); self.metrics(request, rsp, ctx); } else { let padding = cfg.get_padding_rsp(); @@ -160,7 +160,7 @@ impl Protocol for McqText { // let rsp = ctx.response().data(); // // 虽然quit的响应长度为0,但不排除有其他响应长度为0的场景,还是用quit属性来断连更安全 // if rsp.len() > 0 { - // w.write_slice(rsp, 0)?; + // w.write_slice(rsp, 0); // } Ok(()) diff --git a/protocol/src/redis/mod.rs b/protocol/src/redis/mod.rs index 0fab41083..e3780262b 100644 --- a/protocol/src/redis/mod.rs +++ b/protocol/src/redis/mod.rs @@ -268,7 +268,7 @@ impl Protocol for Redis { if !cfg.multi { // 非multi请求,有响应直接返回client,否则构建 if let Some(rsp) = response { - w.write_slice(rsp, 0)?; + w.write_slice(rsp, 0); } else { // 无响应,则根据cmd name构建对应响应 match cfg.cmd_type { @@ -303,7 +303,7 @@ impl Protocol for Redis { // 如果rsp是ok,或者不需要bulk num,直接发送;否则构建rsp or padding rsp if let Some(rsp) = response { if rsp.ok() || !cfg.need_bulk_num { - w.write_slice(rsp, 0)?; + w.write_slice(rsp, 0); return Ok(()); } } diff --git a/protocol/src/stream.rs b/protocol/src/stream.rs index 3a842527e..fdd1728af 100644 --- a/protocol/src/stream.rs +++ b/protocol/src/stream.rs @@ -62,16 +62,14 @@ pub trait Writer: ds::BufWriter + Sized { fn cache(&mut self, hint: bool); #[inline] - fn write_slice>(&mut self, data: &S, oft: usize) -> Result<()> { - (&*data).copy_to(oft, self)?; - Ok(()) + fn write_slice>(&mut self, data: &S, oft: usize) { + (&*data).copy_to(oft, self); } // 暂时没发现更好的实现方式,先用这个实现 #[inline] - fn write_ringslice(&mut self, data: &RingSlice, oft: usize) -> Result<()> { - data.copy_to(oft, self)?; - Ok(()) + fn write_ringslice(&mut self, data: &RingSlice, oft: usize) { + data.copy_to(oft, self); } fn shrink(&mut self); diff --git a/protocol/src/uuid/mod.rs b/protocol/src/uuid/mod.rs index 472a9d78c..c4afcf6c0 100644 --- a/protocol/src/uuid/mod.rs +++ b/protocol/src/uuid/mod.rs @@ -55,7 +55,7 @@ impl Protocol for Uuid { I: MetricItem, { if let Some(rsp) = response { - w.write_slice(rsp, 0)?; + w.write_slice(rsp, 0); Ok(()) } else { w.write(b"SERVER_ERROR uuid no available\r\n")?; diff --git a/rt/src/stream/mod.rs b/rt/src/stream/mod.rs index 9b6e00051..25067aa7c 100644 --- a/rt/src/stream/mod.rs +++ b/rt/src/stream/mod.rs @@ -148,20 +148,19 @@ impl protocol::Writer for Stream { } impl ds::BufWriter for Stream { #[inline] - fn write_all(&mut self, data: &[u8]) -> std::io::Result<()> { + fn write_all(&mut self, data: &[u8]) { if data.len() <= 4 { self.buf.write(data); } else { let mut ctx = Context::from_waker(&NOOP); let _ = Pin::new(self).poll_write(&mut ctx, data); } - Ok(()) } #[inline] - fn write_seg_all(&mut self, buf0: &[u8], buf1: &[u8]) -> std::io::Result<()> { + fn write_seg_all(&mut self, buf0: &[u8], buf1: &[u8]) { self.buf.enable = true; - self.write_all(buf0)?; - self.write_all(buf1) + self.write_all(buf0); + self.write_all(buf1); } } impl protocol::BufRead for Stream { diff --git a/stream/src/handler.rs b/stream/src/handler.rs index 588dabc25..cc58a79e2 100644 --- a/stream/src/handler.rs +++ b/stream/src/handler.rs @@ -119,7 +119,7 @@ where while let Some(req) = ready!(self.data.poll_recv(cx)) { self.num.tx(); - self.s.write_slice(&*req, 0)?; + self.s.write_slice(&*req, 0); match req.on_sent() { Some(r) => self.pending.push_back((r, Instant::now())),