Skip to content

Commit

Permalink
Merge pull request #479 from weibocom/redisproto
Browse files Browse the repository at this point in the history
redis协议可重入优化
  • Loading branch information
viciousstar authored Jul 23, 2024
2 parents 14d579c + aa0bec6 commit 2dc8b28
Show file tree
Hide file tree
Showing 22 changed files with 1,205 additions and 576 deletions.
3 changes: 2 additions & 1 deletion protocol/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ pub enum Error {
ChanDisabled,
ChanWriteClosed,
ChanReadClosed,
ProtocolIncomplete,
//协议完整至少还需要x个字节
ProtocolIncomplete(usize),
RequestInvalidMagic,
ResponseInvalidMagic,
RequestProtocolInvalid,
Expand Down
2 changes: 1 addition & 1 deletion protocol/src/kv/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl Into<crate::Error> for Error {
log::warn!("found unhandle response: {}", packet.utf8());
crate::Error::ResponseProtocolInvalid
}
Self::ProtocolIncomplete => crate::Error::ProtocolIncomplete,
Self::ProtocolIncomplete => crate::Error::ProtocolIncomplete(0),
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions protocol/src/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl Protocol for Kv {
) -> crate::Result<HandShake> {
match self.handshake_inner(stream, option) {
Ok(h) => Ok(h),
Err(crate::Error::ProtocolIncomplete) => Ok(HandShake::Continue),
Err(crate::Error::ProtocolIncomplete(0)) => Ok(HandShake::Continue),
Err(e) => {
log::warn!("+++ found err when shake hand:{:?}", e);
Err(e)
Expand Down Expand Up @@ -143,7 +143,7 @@ impl Protocol for Kv {
// 解析完毕rsp后,除了数据未读完的场景,其他不管是否遇到err,都要进行take
match self.parse_response_inner(&mut rsp_packet) {
Ok(cmd) => Ok(Some(cmd)),
Err(crate::Error::ProtocolIncomplete) => Ok(None),
Err(crate::Error::ProtocolIncomplete(0)) => Ok(None),
Err(e) => {
// 非MysqlError需要日志并外层断连处理
log::error!("+++ err when parse mysql response: {:?}", e);
Expand Down
2 changes: 1 addition & 1 deletion protocol/src/kv/rsppacket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ impl<'a, S: crate::Stream> ResponsePacket<'a, S> {
// fn _next_packet(&mut self) -> Result<RingSlice> {
// match self.try_next_packet() {
// Ok(pld) => Ok(pld),
// Err(Error::ProtocolIncomplete) => Err(crate::Error::ProtocolIncomplete),
// Err(Error::ProtocolIncomplete(0)) => Err(crate::Error::ProtocolIncomplete(0)),
// Err(e) => {
// // 发现异常,说明异常数据已读完,此处统一take
// self.take();
Expand Down
6 changes: 3 additions & 3 deletions protocol/src/msgque/mcq_bk/text/mcreqpacket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl<'a, S: crate::Stream> RequestPacket<'a, S> {
if self.oft <= self.data.len() {
return Ok(());
}
return Err(super::Error::ProtocolIncomplete);
return Err(super::Error::ProtocolIncomplete(0));
}

#[inline]
Expand Down Expand Up @@ -331,7 +331,7 @@ impl<'a, S: crate::Stream> RequestPacket<'a, S> {
ReqPacketState::Val => {
m = self.oft + self.vlen;
if m >= self.data.len() {
return Err(super::Error::ProtocolIncomplete);
return Err(super::Error::ProtocolIncomplete(0));
}
if self.data.at(m) == CR {
self.skip(self.vlen)?;
Expand Down Expand Up @@ -435,7 +435,7 @@ impl<'a, S: crate::Stream> RequestPacket<'a, S> {
}
}
}
Err(super::Error::ProtocolIncomplete)
Err(super::Error::ProtocolIncomplete(0))
}

#[inline]
Expand Down
8 changes: 4 additions & 4 deletions protocol/src/msgque/mcq_bk/text/mcrsppacket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl<'a, S: crate::Stream> RspPacket<'a, S> {
let mut state = self.state;

if self.data.len() < 2 {
return Err(super::Error::ProtocolIncomplete);
return Err(super::Error::ProtocolIncomplete(0));
}

if self.current().is_ascii_digit() {
Expand Down Expand Up @@ -252,7 +252,7 @@ impl<'a, S: crate::Stream> RspPacket<'a, S> {
m = self.oft + self.vlen;
// TODO 提升解析性能 speedup fishermen
if m >= self.data.len() {
return Err(super::Error::ProtocolIncomplete);
return Err(super::Error::ProtocolIncomplete(0));
}
match self.data.at(m) {
CR => {
Expand Down Expand Up @@ -317,7 +317,7 @@ impl<'a, S: crate::Stream> RspPacket<'a, S> {

self.skip(1)?;
}
Err(super::Error::ProtocolIncomplete)
Err(super::Error::ProtocolIncomplete(0))
}

#[inline]
Expand All @@ -340,7 +340,7 @@ impl<'a, S: crate::Stream> RspPacket<'a, S> {
if self.oft <= self.data.len() {
return Ok(());
}
return Err(super::Error::ProtocolIncomplete);
return Err(super::Error::ProtocolIncomplete(0));
}

#[inline]
Expand Down
4 changes: 2 additions & 2 deletions protocol/src/msgque/mcq_bk/text/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl Protocol for McqText {
) -> Result<()> {
match self.parse_request_inner(stream, alg, process) {
Ok(_) => Ok(()),
Err(Error::ProtocolIncomplete) => Ok(()),
Err(Error::ProtocolIncomplete(0)) => Ok(()),
e => e,
}
}
Expand All @@ -106,7 +106,7 @@ impl Protocol for McqText {
fn parse_response<S: Stream>(&self, data: &mut S) -> Result<Option<Command>> {
match self.parse_response_inner(data) {
Ok(cmd) => Ok(cmd),
Err(Error::ProtocolIncomplete) => Ok(None),
Err(Error::ProtocolIncomplete(0)) => Ok(None),
e => e,
}
}
Expand Down
10 changes: 5 additions & 5 deletions protocol/src/msgque/mcq_bk/text/reqpacket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl<'a, S: crate::Stream> RequestPacket<'a, S> {
if self.oft <= self.data.len() {
return Ok(());
}
return Err(super::Error::ProtocolIncomplete);
return Err(super::Error::ProtocolIncomplete(0));
}

#[inline]
Expand Down Expand Up @@ -229,7 +229,7 @@ impl<'a, S: crate::Stream> RequestPacket<'a, S> {
ReqPacketState::Val => {
m = self.oft + vlen;
if m >= self.data.len() {
return Err(super::Error::ProtocolIncomplete);
return Err(super::Error::ProtocolIncomplete(0));
}
if self.data.at(m) == CR {
self.skip(vlen)?;
Expand All @@ -251,7 +251,7 @@ impl<'a, S: crate::Stream> RequestPacket<'a, S> {
// 当前字节处理完毕,继续下一个字节
self.skip(1)?;
}
Err(super::Error::ProtocolIncomplete)
Err(super::Error::ProtocolIncomplete(0))
}

#[inline]
Expand Down Expand Up @@ -324,7 +324,7 @@ impl Packet for RingSlice {
*oft = idx + 2;
Ok(())
} else {
Err(crate::Error::ProtocolIncomplete)
Err(crate::Error::ProtocolIncomplete(0))
}
}

Expand All @@ -351,7 +351,7 @@ impl Packet for RingSlice {
return Ok(());
}
}
Err(super::Error::ProtocolIncomplete)
Err(super::Error::ProtocolIncomplete(0))
}
}
// mcq 解析时状态
Expand Down
10 changes: 5 additions & 5 deletions protocol/src/msgque/mcq_bk/text/rsppacket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl<'a, S: crate::Stream> RspPacket<'a, S> {
#[inline]
fn start_with(&self, oft: usize, s: &[u8]) -> Result<bool> {
if oft + s.len() > self.data.len() {
Err(crate::Error::ProtocolIncomplete)
Err(crate::Error::ProtocolIncomplete(0))
} else {
Ok(self.data.start_with(oft, s))
}
Expand All @@ -64,7 +64,7 @@ impl<'a, S: crate::Stream> RspPacket<'a, S> {
// memcache rsponse 解析的状态机,后续考虑优化 fishermen
pub(super) fn parse(&mut self) -> Result<()> {
if self.data.len() < 2 {
return Err(super::Error::ProtocolIncomplete);
return Err(super::Error::ProtocolIncomplete(0));
}

let mut state = RspPacketState::RspStr;
Expand Down Expand Up @@ -209,7 +209,7 @@ impl<'a, S: crate::Stream> RspPacket<'a, S> {
RspPacketState::Val => {
token = self.oft + vlen;
if token >= self.data.len() {
return Err(super::Error::ProtocolIncomplete);
return Err(super::Error::ProtocolIncomplete(0));
}
self.skip(vlen)?;
match self.current() {
Expand Down Expand Up @@ -276,7 +276,7 @@ impl<'a, S: crate::Stream> RspPacket<'a, S> {

self.skip(1)?;
}
Err(super::Error::ProtocolIncomplete)
Err(super::Error::ProtocolIncomplete(0))
}

pub(super) fn delay_metric(&mut self) -> Result<()> {
Expand Down Expand Up @@ -344,7 +344,7 @@ impl<'a, S: crate::Stream> RspPacket<'a, S> {
if self.oft <= self.data.len() {
return Ok(());
}
return Err(super::Error::ProtocolIncomplete);
return Err(super::Error::ProtocolIncomplete(0));
}

#[inline]
Expand Down
2 changes: 1 addition & 1 deletion protocol/src/redis/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl CommandHasher {
}
h.hash(slice[i]);
}
Err(crate::Error::ProtocolIncomplete)
Err(crate::Error::ProtocolIncomplete(0))
}
}

Expand Down
37 changes: 10 additions & 27 deletions protocol/src/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,32 +70,23 @@ impl Redis {
}

#[inline]
fn parse_response_inner<S: Stream>(&self, s: &mut S) -> Result<Option<Command>> {
pub fn parse_response_inner<S: Stream>(&self, s: &mut S) -> Result<Option<Command>> {
let data: Packet = s.slice().into();
let ctx: &mut ResponseContext = transmute(s.context());
log::debug!("+++ will parse redis rsp:{:?}", data);
// data.check_onetoken(*oft)?;

match data.at(0) {
b'-' | b':' | b'+' => data.line(&mut ctx.oft)?,
b'$' => ctx.oft += data.num_of_string(&mut ctx.oft)? + 2,
b'*' => data.skip_multibulks(ctx)?,
b'$' => data.skip_string_check(&mut ctx.oft)?,
b'*' => data.skip_multibulks_with_ctx(ctx)?,
_ => return Err(RedisError::RespInvalid.into()),
}

let oft = ctx.oft;
ctx.oft = 0; // 响应消息是b'$',若数据未接收完整,下次需要从起始位置开始解析
match oft <= data.len() {
true => Ok(Some(Command::from_ok(s.take(oft)))),
false => Err(Error::ProtocolIncomplete),
}
// Ok((*oft <= data.len()).then(|| Command::from_ok(s.take(*oft))))
}
#[inline(always)]
fn left_bytes<S: Stream>(&self, s: &mut S) -> usize {
let ctx = transmute(s.context());
// 64是经验值
ctx.bulk * 64
assert!(oft != 0);
assert!(oft <= data.len());
ctx.oft = 0;
Ok(Some(Command::from_ok(s.take(oft))))
}
}

Expand All @@ -117,7 +108,7 @@ impl Protocol for Redis {
let mut packet = RequestPacket::new(stream);
match self.parse_request_inner(&mut packet, alg, process) {
Ok(_) => Ok(()),
Err(Error::ProtocolIncomplete) => {
Err(Error::ProtocolIncomplete(0)) => {
// 如果解析数据不够,提前reserve stream的空间
packet.reserve_stream_buff();
Ok(())
Expand All @@ -134,18 +125,10 @@ impl Protocol for Redis {
fn parse_response<S: Stream>(&self, data: &mut S) -> Result<Option<Command>> {
match self.parse_response_inner(data) {
Ok(cmd) => Ok(cmd),
Err(Error::ProtocolIncomplete) => {
let ctx = transmute(data.context());
let oft = ctx.oft;
//assert!(oft + 3 >= data.len(), "oft:{} => {:?}", oft, data.slice());
if ctx.bulk > 0 {
// 响应消息是array场景
let left = self.left_bytes(data);
Err(Error::ProtocolIncomplete(left)) => {
if left > 0 {
data.reserve(left);
} else if oft > data.len() {
data.reserve(oft - data.len());
}

Ok(None)
}
e => e,
Expand Down
Loading

0 comments on commit 2dc8b28

Please sign in to comment.