Skip to content

Commit

Permalink
sqlinfo
Browse files Browse the repository at this point in the history
  • Loading branch information
viciousstar committed Aug 14, 2024
1 parent b2398ec commit a7e2f6a
Showing 1 changed file with 45 additions and 26 deletions.
71 changes: 45 additions & 26 deletions endpoint/src/vector/topo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use ds::MemGuard;
use protocol::kv::{ContextStatus, MysqlBuilder};
use protocol::vector::attachment::{VAttach, VecAttach};
use protocol::vector::redis::parse_vector_detail;
use protocol::vector::CommandType;
use protocol::Protocol;
use protocol::Request;
use protocol::ResOption;
Expand Down Expand Up @@ -74,6 +75,44 @@ where
Req: Request,
P: Protocol,
{
fn sql_info(
&self,
req: &mut Req,
round: u16,
) -> Result<(bool, u16, NaiveDate), protocol::Error> {
let vcmd = &req.attach().vcmd;
match vcmd.cmd {
CommandType::VRange => {
//根据round获取si
let si_items = req.attach().si();
assert!(si_items.len() > 0, "si_items.len() = 0");
assert!(
round <= si_items.len() as u16,
"round = {round}, si_items.len() = {}",
si_items.len()
);
let si_item = &si_items[(round - 1) as usize];

let year = si_item.date.year as u16 + 2000;
//构建sql
let limit = req.attach().left_count.min(si_item.count);
assert!(si_item.count > 0, "{}", si_item.count);
assert!(req.attach().left_count > 0, "{}", req.attach().left_count);

let Some(date) = NaiveDate::from_ymd_opt(year.into(), si_item.date.month.into(), 1)
else {
return Err(protocol::Error::ResponseInvalidMagic);
};
Ok((round == si_items.len() as u16, limit, date))
}
//相比vrange多了一个日期key
CommandType::VAdd | CommandType::VDel => {
Ok((true, 0, NaiveDate::from_ymd_opt(1, 1, 1).unwrap()))
}
_ => panic!("not sup {:?}", vcmd.cmd),
}
}

fn get_shard(&self, req: &mut Req) -> Result<&Shard<E>, protocol::Error> {
let (year, shard_idx) = if req.ctx_mut().runs == 0 {
let vcmd = parse_vector_detail(****req, req.flag())?;
Expand Down Expand Up @@ -118,8 +157,8 @@ where
assert_eq!(req.attach().left_count, 0);
assert_eq!(*req.context_mut(), 0);
let vcmd = parse_vector_detail(****req, req.flag())?;
//上行请求不需要初始化leftcount
if req.operation().is_retrival() {
req.retry_on_rsp_notok(true);
let limit = vcmd.limit();
assert!(limit > 0, "{limit}");
//需要在buildsql之前设置
Expand All @@ -136,26 +175,7 @@ where
req.set_last(false);
&self.si_shard[si_shard_idx]
} else {
//根据round获取si
let si_items = req.attach().si();
assert!(si_items.len() > 0, "si_items.len() = 0");
assert!(
round <= si_items.len() as u16,
"round = {round}, si_items.len() = {}",
si_items.len()
);
let si_item = &si_items[(round - 1) as usize];

let year = si_item.date.year as u16 + 2000;
//构建sql
let limit = req.attach().left_count.min(si_item.count);
assert!(si_item.count > 0, "{}", si_item.count);
assert!(req.attach().left_count > 0, "{}", req.attach().left_count);

let Some(date) = NaiveDate::from_ymd_opt(year.into(), si_item.date.month.into(), 1)
else {
return Err(protocol::Error::ResponseInvalidMagic);
};
let (last, limit, date) = self.sql_info(req, round)?;
let vector_builder = SqlBuilder::new(
&req.attach().vcmd,
req.hash(),
Expand All @@ -165,17 +185,15 @@ where
)?;
let cmd = MysqlBuilder::build_packets_for_vector(vector_builder)?;

//更新轮次信息
if round == si_items.len() as u16 {
if last {
req.set_last(true);
}

req.reshape(MemGuard::from_vec(cmd));
//获取shard
let shard_idx = self.shard_idx(req.hash());
req.ctx_mut().year = year;
req.ctx_mut().year = date.year() as u16;
req.ctx_mut().shard_idx = shard_idx as u16;
let shards = self.shards.get(year);
let shards = self.shards.get(date.year() as u16);
shards.get(shard_idx).ok_or(protocol::Error::TopInvalid)?
};

Expand Down Expand Up @@ -207,6 +225,7 @@ where
Ok(shard)
}
}

impl<E, Req, P> Endpoint for VectorService<E, P>
where
E: Endpoint<Item = Req>,
Expand Down

0 comments on commit a7e2f6a

Please sign in to comment.