Skip to content

Commit

Permalink
vcmd 存到 attach
Browse files Browse the repository at this point in the history
  • Loading branch information
viciousstar committed Jul 3, 2024
1 parent 8693bd7 commit 5e368a1
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 9 deletions.
11 changes: 8 additions & 3 deletions endpoint/src/vector/topo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ where

let si_shard_idx = self.strategist.si_distribution().index(req.hash());
req.ctx_mut().shard_idx = si_shard_idx as u16;
req.attach_mut().vcmd = vcmd;
&self.si_shard[si_shard_idx]
} else {
let vcmd = parse_vector_detail(**req.origin_data(), req.flag())?;
//根据round获取si
let si_items = req.attach().si();
assert!(si_items.len() > 0, "si_items.len() = 0");
Expand All @@ -150,8 +150,13 @@ where
else {
return Err(protocol::Error::ResponseInvalidMagic);
};
let vector_builder =
SqlBuilder::new(&vcmd, req.hash(), date, &self.strategist, limit as u64)?;
let vector_builder = SqlBuilder::new(
&req.attach().vcmd,
req.hash(),
date,
&self.strategist,
limit as u64,
)?;
let cmd = MysqlBuilder::build_packets_for_vector(vector_builder)?;

//更新轮次信息
Expand Down
8 changes: 4 additions & 4 deletions protocol/src/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub struct CallbackContext {
callback: CallbackPtr,
quota: Option<BackendQuota>,
attachment: Option<Attachment>, // 附加数据,用于辅助请求和响应,目前只有kvector在使用
drop_attatch: Box<dyn Fn(Attachment)>,
drop_attach: Box<dyn Fn(Attachment)>,
}

impl CallbackContext {
Expand All @@ -62,7 +62,7 @@ impl CallbackContext {
last: bool,
retry_on_rsp_notok: bool,
max_tries: u8,
drop_attatch: Box<dyn Fn(Attachment)>,
drop_attach: Box<dyn Fn(Attachment)>,
) -> Self {
log::debug!("request prepared:{}", req);
let now = Instant::now();
Expand All @@ -86,7 +86,7 @@ impl CallbackContext {
waker,
quota: None,
attachment: None,
drop_attatch,
drop_attach,
}
}

Expand Down Expand Up @@ -385,7 +385,7 @@ impl Drop for CallbackContext {
// 在debug环境中,设置done为false
debug_assert_eq!(*self.done.get_mut() = false, ());
if let Some(attachment) = self.attachment.take() {
(self.drop_attatch)(attachment);
(self.drop_attach)(attachment);
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion protocol/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ pub trait Proto: Unpin + Clone + Send + Sync + 'static {
(false, true, 0)
}
#[inline]
fn drop_attach(&self, _att: Attachment) {}
fn drop_attach(&self, _att: Attachment) {
panic!("unreachable");
}
}

pub trait RequestProcessor {
Expand Down
2 changes: 1 addition & 1 deletion protocol/src/req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::Arc;
use crate::{Command, HashedCommand};

pub type Context = u64;
pub type Attachment = [u8; 80];
pub type Attachment = [u8; 280];
#[repr(transparent)]
#[derive(Clone, Default)]
pub struct BackendQuota {
Expand Down
4 changes: 4 additions & 0 deletions protocol/src/vector/attachment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use crate::Attachment;
use crate::Command;
use crate::Packet;
use ds::RingSlice;

use super::VectorCmd;
#[derive(Debug, Default)]
#[repr(C)]
pub struct VecAttach {
Expand All @@ -18,6 +20,7 @@ pub struct VecAttach {
body: Vec<Vec<u8>>,
// 查询响应的body中token数量
si: Vec<SiItem>, // si表中查询到的数据, si字段信息在配置里存放
pub vcmd: VectorCmd,
}

#[derive(Debug, Default)]
Expand Down Expand Up @@ -114,6 +117,7 @@ impl VecAttach {
body_token_count: 0,
rsp_ok: false,
si: Vec::with_capacity(6),
vcmd: Default::default(),
};
}
#[inline]
Expand Down

0 comments on commit 5e368a1

Please sign in to comment.