Skip to content

Commit

Permalink
drop att
Browse files Browse the repository at this point in the history
  • Loading branch information
viciousstar committed Jul 3, 2024
1 parent 9598399 commit 8693bd7
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 1 deletion.
6 changes: 6 additions & 0 deletions protocol/src/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub struct CallbackContext {
callback: CallbackPtr,
quota: Option<BackendQuota>,
attachment: Option<Attachment>, // 附加数据,用于辅助请求和响应,目前只有kvector在使用
drop_attatch: Box<dyn Fn(Attachment)>,
}

impl CallbackContext {
Expand All @@ -61,6 +62,7 @@ impl CallbackContext {
last: bool,
retry_on_rsp_notok: bool,
max_tries: u8,
drop_attatch: Box<dyn Fn(Attachment)>,
) -> Self {
log::debug!("request prepared:{}", req);
let now = Instant::now();
Expand All @@ -84,6 +86,7 @@ impl CallbackContext {
waker,
quota: None,
attachment: None,
drop_attatch,
}
}

Expand Down Expand Up @@ -381,6 +384,9 @@ impl Drop for CallbackContext {
// 可以尝试检查double free
// 在debug环境中,设置done为false
debug_assert_eq!(*self.done.get_mut() = false, ());
if let Some(attachment) = self.attachment.take() {
(self.drop_attatch)(attachment);
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions protocol/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ pub trait Proto: Unpin + Clone + Send + Sync + 'static {
assert!(false, "{:?} {response}", attachment);
(false, true, 0)
}
#[inline]
fn drop_attach(&self, _att: Attachment) {}
}

pub trait RequestProcessor {
Expand Down
4 changes: 4 additions & 0 deletions protocol/src/vector/attachment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ impl<T: crate::Request> VAttach for T {
}

impl VecAttach {
#[inline(always)]
pub fn from(att: Attachment) -> VecAttach {
unsafe { std::mem::transmute(att) }
}
#[inline(always)]
pub fn attach(att: &Attachment) -> &VecAttach {
unsafe { std::mem::transmute(att) }
Expand Down
4 changes: 4 additions & 0 deletions protocol/src/vector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ impl Protocol for Vector {
attach.rsp_ok = true;
(true, attach.finish(), resp_count)
}
#[inline]
fn drop_attach(&self, att: Attachment) {
let _ = VecAttach::from(att);
}
}

impl Vector {
Expand Down
2 changes: 2 additions & 0 deletions stream/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ impl<'a, T: Topology<Item = Request> + TopologyCheck, P: Protocol> protocol::Req
*self.first = last;
let cb = self.top.callback();
let req_op = cmd.operation();
let parser = self.parser.clone();
let ctx = self.arena.alloc(CallbackContext::new(
cmd,
self.waker,
Expand All @@ -242,6 +243,7 @@ impl<'a, T: Topology<Item = Request> + TopologyCheck, P: Protocol> protocol::Req
last,
self.retry_on_rsp_notok,
self.parser.max_tries(req_op),
Box::new(move |att| parser.drop_attach(att)),
));
let mut ctx = CallbackContextPtr::from(ctx, self.arena);

Expand Down
6 changes: 5 additions & 1 deletion tests/src/layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ fn checkout_basic() {
assert_eq!(40, size_of::<CheckedTopology>());
assert_eq!(192, size_of::<stream::StreamMetrics>());
assert_eq!(24, size_of::<sharding::hash::Hasher>());
assert_eq!(
size_of::<protocol::Attachment>(),
size_of::<protocol::vector::attachment::VecAttach>()
);
}

// 如果要验证 layout-min模式,需要 --features layout-min --release --no-default-features
Expand All @@ -56,7 +60,7 @@ fn check_layout_rx_buffer() {
#[ignore]
#[test]
fn check_callback_ctx() {
assert_eq!(200, size_of::<CallbackContext>());
assert_eq!(320, size_of::<CallbackContext>());
//assert_eq!(16, size_of::<protocol::callback::Context>());
}
//#[ignore]
Expand Down

0 comments on commit 8693bd7

Please sign in to comment.