Skip to content

Commit

Permalink
topo 发送过程中错误返回给sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
viciousstar committed Nov 13, 2023
1 parent 5ec1ba7 commit f8425c4
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 30 deletions.
25 changes: 1 addition & 24 deletions endpoint/src/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,4 @@ pub mod strategy;
pub mod topo;
pub mod uuid;

struct Context {
runs: u16, // 运行的次数
idx: u16, //最多有65535个主从
shard_idx: u16,
year: u16,
}

// #[inline]
// fn transmute(ctx: &mut u64) -> &mut Context {
// // 这个放在layout的单元测试里面
// //assert_eq!(std::mem::size_of::<Context>(), 8);
// unsafe { std::mem::transmute(ctx) }
// }

trait KVCtx {
fn ctx(&mut self) -> &mut Context;
}

impl<T: protocol::Request> KVCtx for T {
#[inline(always)]
fn ctx(&mut self) -> &mut Context {
unsafe { std::mem::transmute(self.context_mut()) }
}
}
pub(crate) use protocol::kv::KVCtx;
5 changes: 3 additions & 2 deletions endpoint/src/kv/topo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use discovery::dns::IPPort;
use discovery::TopologyWrite;
use ds::MemGuard;
use protocol::kv::Binary;
use protocol::kv::ContextError;
use protocol::kv::MysqlBuilder;
use protocol::kv::Strategy;
use protocol::Protocol;
Expand Down Expand Up @@ -102,8 +103,8 @@ where

let shards = self.shards.get(intyear);
if shards.len() == 0 {
//todo 错误类型不合适
req.on_err(protocol::Error::TopChanged);
req.ctx().error = ContextError::TopInvalid;
req.on_err(protocol::Error::TopInvalid);
return;
}
debug_assert!(
Expand Down
5 changes: 5 additions & 0 deletions protocol/src/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ impl CallbackContext {
}
}

#[inline]
pub fn flag(&self) -> crate::Context {
self.flag
}

#[inline]
pub(crate) fn on_noforward(&mut self) {
debug_assert!(self.request().noforward(), "{:?}", self);
Expand Down
1 change: 1 addition & 0 deletions protocol/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub enum Error {
ResponseProtocolInvalid,
ProtocolNotSupported,
TopChanged,
TopInvalid,
WriteResponseErr,
OpCodeNotSupported(u16),
BufferFull,
Expand Down
57 changes: 53 additions & 4 deletions protocol/src/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ mod reqpacket;
mod rsppacket;

mod mc2mysql;
use std::ops::Deref;

pub use mc2mysql::{MysqlBuilder, Strategy};

use self::common::proto::Text;
Expand Down Expand Up @@ -213,7 +215,7 @@ impl Protocol for Kv {
ctx.request(),
response
);
self.write_mc_response(ctx.request(), response.map(|r| &*r), w)?
self.write_mc_response(ctx.request(), response.map(|r| &*r), ctx.ctx(), w)?
}
// self.build_empty_response(RespStatus::NotStored, req)

Expand Down Expand Up @@ -352,7 +354,7 @@ impl Kv {
status: RespStatus,
key: Option<RingSlice>,
extra: Option<u32>,
response: Option<&crate::Command>,
response: Option<&RingSlice>,
w: &mut W,
) -> crate::Result<()>
where
Expand All @@ -371,7 +373,7 @@ impl Kv {

w.write_u16(status as u16)?; // Status 2byte

let response_len = response.as_ref().map_or(0, |r| r.len());
let response_len = response.map_or(0, |r| r.len());
let total_body_len = extra_len as u32 + key_len as u32 + response_len as u32;
w.write_u32(total_body_len)?; // total body len: 4 bytes
w.write_u32(0)?; //opaque: 4bytes
Expand All @@ -384,7 +386,7 @@ impl Kv {
w.write_ringslice(key, 0)?;
}
if let Some(response) = response {
w.write_slice(response, 0)? // value
w.write_ringslice(response, 0)? // value
}
Ok(())
}
Expand All @@ -394,6 +396,7 @@ impl Kv {
&self,
request: &HashedCommand,
response: Option<&crate::Command>,
ctx: u64,
w: &mut W,
) -> crate::Result<()>
where
Expand Down Expand Up @@ -434,6 +437,14 @@ impl Kv {
OP_GET | OP_GETQ => (None, Some(MARKER_BYTE_ARR)),
_ => (None, None),
};
let err_response;
let response = match ctx.ctx_unmut().error {
ContextError::TopInvalid => {
err_response = Some(RingSlice::from_slice(b"invalid request: no top match"));
err_response.as_ref()
}
ContextError::None => response.map(|r| r.deref().deref()),
};
if status != RespStatus::NoError && status != RespStatus::NotFound {
log::error!(
"+++ write_mc_packet error req:{:?}, rsp:{:?} status:{:?}",
Expand Down Expand Up @@ -477,3 +488,41 @@ pub enum ConnState {
// 对于AuthError,通过直接返回异常来标志
AuthOk,
}

#[repr(u8)]
pub enum ContextError {
None,
TopInvalid,
}

#[repr(C)]
pub struct Context {
pub runs: u8, // 运行的次数
pub idx: u16, //最多有65535个主从
pub shard_idx: u16,
pub year: u16,
pub error: ContextError,
}

pub trait KVCtx {
fn ctx(&mut self) -> &mut Context;
fn ctx_unmut(&self) -> &Context {
panic!("not implemented");
}
}

impl KVCtx for u64 {
fn ctx(&mut self) -> &mut Context {
unsafe { std::mem::transmute(self) }
}
fn ctx_unmut(&self) -> &Context {
unsafe { std::mem::transmute(self) }
}
}

impl<T: crate::Request> KVCtx for T {
#[inline(always)]
fn ctx(&mut self) -> &mut Context {
unsafe { std::mem::transmute(self.context_mut()) }
}
}
1 change: 1 addition & 0 deletions protocol/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ pub trait Commander<M: Metric<I>, I: MetricItem> {
// 请求所在的分片位置
fn request_shard(&self) -> usize;
fn metric(&self) -> &M;
fn ctx(&self) -> u64;
}

pub enum MetricName {
Expand Down
3 changes: 3 additions & 0 deletions stream/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,7 @@ impl<'a, M: Metric<T>, T: MetricItem, F: Fn(i64) -> usize> Commander<M, T>
fn metric(&self) -> &M {
&self.metrics
}
fn ctx(&self) -> u64 {
self.ctx.flag()
}
}
4 changes: 4 additions & 0 deletions tests/src/layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ fn checkout_basic() {
assert_eq!(24, size_of::<ds::RingSlice>());
assert_eq!(8, size_of::<protocol::Context>());
assert_eq!(size_of::<protocol::Context>(), 8);
assert_eq!(
size_of::<protocol::Context>(),
size_of::<protocol::kv::Context>()
);
assert_eq!(size_of::<protocol::StreamContext>(), 16);
assert_eq!(
size_of::<protocol::StreamContext>(),
Expand Down

0 comments on commit f8425c4

Please sign in to comment.