diff --git a/Cargo.lock b/Cargo.lock index 754e07b9a..7442fea11 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -396,9 +396,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.1.7" +version = "1.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26a5c3fd7bfa1ce3897a3a3501d362b2d87b7f2583ebcb4a949ec25911025cbc" +checksum = "504bdec147f2cc13c8b57ed9401fd8a147cc66b67ad5cb241394244f2c947549" dependencies = [ "jobserver", "libc", @@ -2076,9 +2076,9 @@ dependencies = [ [[package]] name = "object" -version = "0.36.2" +version = "0.36.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f203fa8daa7bb185f760ae12bd8e097f63d17041dcdcaf675ac54cdf863170e" +checksum = "27b64972346851a39438c60b341ebc01bba47464ae329e55cf343eb93964efd9" dependencies = [ "memchr", ] @@ -2394,6 +2394,7 @@ dependencies = [ "byteorder", "bytes", "chrono", + "chrono-tz", "ctor 0.1.26", "ds", "enum_dispatch", @@ -2830,9 +2831,9 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.204" +version = "1.0.205" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc76f558e0cbb2a839d37354c575f1dc3fdc6546b5be373ba43d95f231bf7c12" +checksum = "e33aedb1a7135da52b7c21791455563facbbcc43d0f0f66165b42c21b3dfb150" dependencies = [ "serde_derive", ] @@ -2849,9 +2850,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.204" +version = "1.0.205" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0cd7e117be63d3c3678776753929474f3b04a43a080c744d6b0ae2a8c28e222" +checksum = "692d6f5ac90220161d6774db30c662202721e64aed9058d2c394f451261420c1" dependencies = [ "proc-macro2", "quote", diff --git a/ds/src/mem/ring_slice.rs b/ds/src/mem/ring_slice.rs index aaa248b05..630c9d663 100644 --- a/ds/src/mem/ring_slice.rs +++ b/ds/src/mem/ring_slice.rs @@ -85,6 +85,9 @@ impl RingSlice { #[inline] pub fn try_str_num(&self, r: impl Range) -> Option { let (start, end) = r.range(self); + if start == end { + return None; + } let mut num = 0usize; for i in start..end { if !self[i].is_ascii_digit() { diff --git a/endpoint/src/topo.rs b/endpoint/src/topo.rs index 2f231f03c..07d81da95 100644 --- a/endpoint/src/topo.rs +++ b/endpoint/src/topo.rs @@ -36,6 +36,7 @@ procs::topology_dispatcher! { pub trait Topology : Endpoint + Hash{ fn exp_sec(&self) -> u32 {86400} + fn has_attach(&self) -> bool {false} } => where P:Protocol, E:Endpoint, R:Request, Topologies: Endpoint trait Inited { diff --git a/endpoint/src/vector/batch.rs b/endpoint/src/vector/batch.rs new file mode 100644 index 000000000..e963326b1 --- /dev/null +++ b/endpoint/src/vector/batch.rs @@ -0,0 +1,162 @@ +use super::strategy::Postfix; +use chrono::{Datelike, NaiveDate}; +use chrono_tz::Tz; +use core::fmt::Write; +use ds::RingSlice; +use protocol::Error; +use sharding::{distribution::DBRange, hash::Hasher}; + +#[derive(Clone, Debug)] +pub struct Batch { + db_prefix: String, + table_prefix: String, + table_postfix: Postfix, + hasher: Hasher, + distribution: DBRange, + keys_name: Vec, + si_cols: Vec, + si: Si, +} + +impl Batch { + pub fn new_with_db( + db_prefix: String, + table_prefix: String, + db_count: u32, + shards: u32, + table_postfix: Postfix, + keys_name: Vec, + si_cols: Vec, + si_db_prefix: String, + si_db_count: u32, + si_table_prefix: String, + si_table_count: u32, + si_shards: u32, + ) -> Self { + Self { + db_prefix, + table_prefix, + table_postfix, + distribution: DBRange::new(db_count as usize, 1usize, shards as usize), + hasher: Hasher::from("crc32"), + keys_name, + si_cols, + si: Si::new( + si_db_prefix, + si_db_count, + si_table_prefix, + si_table_count, + si_shards, + ), + } + } + + pub fn distribution(&self) -> &DBRange { + &self.distribution + } + + pub fn si_distribution(&self) -> &DBRange { + self.si.distribution() + } + + pub fn hasher(&self) -> &Hasher { + &self.hasher + } + + pub fn get_date(&self, _: &[RingSlice]) -> Result { + let now = chrono::Utc::now().with_timezone(&Tz::Asia__Shanghai); + Ok(NaiveDate::from_ymd_opt(now.year(), now.month(), now.day()).unwrap()) + } + + pub fn write_dname_with_hash(&self, buf: &mut impl Write, hash: i64) { + let db_idx: usize = self.distribution.db_idx(hash); + let _ = write!(buf, "{}_{}", self.db_prefix, db_idx); + } + + pub fn write_tname_with_date(&self, buf: &mut impl Write, date: &NaiveDate) { + let (mut year, month, day) = (date.year(), date.month(), date.day()); + year %= 100; + match self.table_postfix { + Postfix::YYMM => { + let _ = write!(buf, "{}_{:02}{:02}", &self.table_prefix, year, month); + } + //Postfix::YYMMDD + _ => { + let _ = write!( + buf, + "{}_{:02}{:02}{:02}", + &self.table_prefix, year, month, day + ); + } + } + } + + pub fn write_database_table(&self, buf: &mut impl Write, date: &NaiveDate, hash: i64) { + self.write_dname_with_hash(buf, hash); + let _ = buf.write_char('.'); + self.write_tname_with_date(buf, date) + } + + pub(crate) fn write_si_database_table(&self, buf: &mut impl Write, hash: i64) { + self.si.write_database_table(buf, hash) + } + + pub(crate) fn condition_keys(&self) -> Box> + '_> { + Box::new(self.keys_name.iter().map(|x| Some(x))) + } + + pub(crate) fn keys(&self) -> &[String] { + &self.keys_name + } + + // pub(crate) fn get_next_date(&self, year: u16, month: u8) -> NaiveDate { + // if month == 1 { + // return NaiveDate::from_ymd_opt((year - 1).into(), 12, 1).unwrap(); + // } else { + // return NaiveDate::from_ymd_opt(year.into(), (month - 1).into(), 1).unwrap(); + // } + // } + + pub(crate) fn batch(&self, limit: u64, _: &protocol::vector::VectorCmd) -> u64 { + limit + } + + pub(crate) fn si_cols(&self) -> &[String] { + &self.si_cols + } +} + +#[derive(Clone, Debug)] +struct Si { + db_prefix: String, + table_prefix: String, + distribution: DBRange, +} + +impl Si { + fn new( + db_prefix: String, + db_count: u32, + table_prefix: String, + table_count: u32, + shards: u32, + ) -> Self { + Self { + db_prefix: db_prefix, + table_prefix: table_prefix, + distribution: DBRange::new(db_count as usize, table_count as usize, shards as usize), + } + } + fn distribution(&self) -> &DBRange { + &self.distribution + } + fn write_database_table(&self, buf: &mut impl Write, hash: i64) { + let db_idx = self.distribution.db_idx(hash); + let table_idx = self.distribution.table_idx(hash); + let _ = write!( + buf, + "{}_{}.{}_{}", + self.db_prefix, db_idx, self.table_prefix, table_idx + ); + } +} diff --git a/endpoint/src/vector/config.rs b/endpoint/src/vector/config.rs index 82296a314..fe9fe870f 100644 --- a/endpoint/src/vector/config.rs +++ b/endpoint/src/vector/config.rs @@ -14,6 +14,8 @@ pub struct VectorNamespace { pub(crate) backends_flaten: Vec, #[serde(default)] pub(crate) backends: HashMap>, + #[serde(default)] + pub(crate) si_backends: Vec, } #[derive(Debug, Clone, Default, Deserialize, Serialize)] @@ -44,6 +46,20 @@ pub struct Basic { pub(crate) user: String, #[serde(default)] pub(crate) region_enabled: bool, + #[serde(default)] + pub(crate) si_db_name: String, + #[serde(default)] + pub(crate) si_table_name: String, + #[serde(default)] + pub(crate) si_db_count: u32, + #[serde(default)] + pub(crate) si_table_count: u32, + #[serde(default)] + pub(crate) si_user: String, + #[serde(default)] + pub(crate) si_password: String, + #[serde(default)] + pub(crate) si_cols: Vec, } impl VectorNamespace { @@ -66,7 +82,7 @@ impl VectorNamespace { } last_year = year.1; } - match ns.decrypt_password() { + match ns.decrypt_password(ns.basic.password.as_bytes()) { Ok(password) => ns.basic.password = password, Err(e) => { log::warn!("failed to decrypt password, e:{}", e); @@ -77,6 +93,19 @@ impl VectorNamespace { init.extend_from_slice(b.1); init }); + if ns.basic.si_password.len() > 0 { + match ns.decrypt_password(ns.basic.si_password.as_bytes()) { + Ok(password) => ns.basic.si_password = password, + Err(e) => { + log::warn!("failed to decrypt si password, e:{}", e); + return None; + } + } + } + if ns.si_backends.len() > 0 { + ns.backends_flaten + .extend_from_slice(ns.si_backends.as_slice()); + } Some(ns) } Err(e) => { @@ -87,9 +116,9 @@ impl VectorNamespace { } #[inline] - fn decrypt_password(&self) -> Result> { + fn decrypt_password(&self, data: &[u8]) -> Result> { let key_pem = fs::read_to_string(&context::get().key_path)?; - let encrypted_data = general_purpose::STANDARD.decode(self.basic.password.as_bytes())?; + let encrypted_data = general_purpose::STANDARD.decode(data)?; let decrypted_data = ds::decrypt::decrypt_password(&key_pem, &encrypted_data)?; let decrypted_string = String::from_utf8(decrypted_data)?; Ok(decrypted_string) diff --git a/endpoint/src/vector/mod.rs b/endpoint/src/vector/mod.rs index a947c7944..842ec70e6 100644 --- a/endpoint/src/vector/mod.rs +++ b/endpoint/src/vector/mod.rs @@ -1,3 +1,4 @@ +mod batch; pub(crate) mod config; mod strategy; pub mod topo; diff --git a/endpoint/src/vector/strategy.rs b/endpoint/src/vector/strategy.rs index 9367afb7a..acc0deebf 100644 --- a/endpoint/src/vector/strategy.rs +++ b/endpoint/src/vector/strategy.rs @@ -7,12 +7,14 @@ use protocol::Result; use sharding::distribution::DBRange; use sharding::hash::Hasher; +use super::batch::Batch; use super::config::VectorNamespace; use super::vectortime::VectorTime; #[derive(Debug, Clone)] pub enum Strategist { VectorTime(VectorTime), + Batch(Batch), } impl Default for Strategist { @@ -34,51 +36,121 @@ impl Default for Strategist { //1. 数据库表名的格式如 table_yymm //2. 库名表名后缀如何计算 impl Strategist { - pub fn try_from(ns: &VectorNamespace) -> Self { - Self::VectorTime(VectorTime::new_with_db( - ns.basic.db_name.clone(), - ns.basic.table_name.clone(), - ns.basic.db_count, - //此策略默认所有年都有同样的shard,basic也只配置了一项,也暗示了这个默认 - ns.backends.iter().next().unwrap().1.len() as u32, - ns.basic.table_postfix.as_str().into(), - ns.basic.keys.clone(), - )) + pub fn try_from(ns: &VectorNamespace) -> Option { + Some(match ns.basic.strategy.as_str() { + "aggregation" => { + //至少需要date和count两个字段名 + if ns.basic.si_cols.len() < 2 || ns.basic.keys.len() != 1 { + log::warn!("len si_cols < 2 or len keys != 1"); + return None; + } + Self::Batch(Batch::new_with_db( + ns.basic.db_name.clone(), + ns.basic.table_name.clone(), + ns.basic.db_count, + //此策略默认所有年都有同样的shard,basic也只配置了一项,也暗示了这个默认 + ns.backends.iter().next().unwrap().1.len() as u32, + ns.basic.table_postfix.as_str().into(), + ns.basic.keys.clone(), + ns.basic.si_cols.clone(), + ns.basic.si_db_name.clone(), + ns.basic.si_db_count, + ns.basic.si_table_name.clone(), + ns.basic.si_table_count, + ns.si_backends.len() as u32, + )) + } + _ => Self::VectorTime(VectorTime::new_with_db( + ns.basic.db_name.clone(), + ns.basic.table_name.clone(), + ns.basic.db_count, + //此策略默认所有年都有同样的shard,basic也只配置了一项,也暗示了这个默认 + ns.backends.iter().next().unwrap().1.len() as u32, + ns.basic.table_postfix.as_str().into(), + ns.basic.keys.clone(), + )), + }) } #[inline] pub fn distribution(&self) -> &DBRange { match self { Strategist::VectorTime(inner) => inner.distribution(), + Strategist::Batch(inner) => inner.distribution(), + } + } + #[inline] + pub fn si_distribution(&self) -> &DBRange { + match self { + Strategist::VectorTime(_) => panic!("not support"), + Strategist::Batch(inner) => inner.si_distribution(), } } #[inline] pub fn hasher(&self) -> &Hasher { match self { Strategist::VectorTime(inner) => inner.hasher(), + Strategist::Batch(inner) => inner.hasher(), } } #[inline] pub fn get_date(&self, keys: &[RingSlice]) -> Result { match self { Strategist::VectorTime(inner) => inner.get_date(keys), + Strategist::Batch(inner) => inner.get_date(keys), + } + } + // 请求成功后,是否有更多的数据需要请求 + #[inline] + pub fn more(&self) -> bool { + match self { + Strategist::VectorTime(_) => false, + Strategist::Batch(_) => true, } } + + // pub(crate) fn get_next_date(&self, year: u16, month: u8) -> NaiveDate { + // match self { + // Strategist::VectorTime(_) => panic!("VectorTime not support get_next_date"), + // Strategist::Batch(inner) => inner.get_next_date(year, month), + // } + // } } impl protocol::vector::Strategy for Strategist { fn keys(&self) -> &[String] { match self { Strategist::VectorTime(inner) => inner.keys(), + Strategist::Batch(inner) => inner.keys(), } } fn condition_keys(&self) -> Box> + '_> { match self { Strategist::VectorTime(inner) => inner.condition_keys(), + Strategist::Batch(inner) => inner.condition_keys(), } } fn write_database_table(&self, buf: &mut impl Write, date: &NaiveDate, hash: i64) { match self { Strategist::VectorTime(inner) => inner.write_database_table(buf, date, hash), + Strategist::Batch(inner) => inner.write_database_table(buf, date, hash), + } + } + fn write_si_database_table(&self, buf: &mut impl Write, hash: i64) { + match self { + Strategist::VectorTime(_) => panic!("not support"), + Strategist::Batch(inner) => inner.write_si_database_table(buf, hash), + } + } + fn batch(&self, limit: u64, vcmd: &protocol::vector::VectorCmd) -> u64 { + match self { + Strategist::VectorTime(_) => 0, + Strategist::Batch(inner) => inner.batch(limit, vcmd), + } + } + fn si_cols(&self) -> &[String] { + match self { + Strategist::VectorTime(_) => panic!("not support"), + Strategist::Batch(inner) => inner.si_cols(), } } } @@ -129,6 +201,13 @@ mod tests { password: Default::default(), user: Default::default(), region_enabled: Default::default(), + si_db_name: Default::default(), + si_table_name: Default::default(), + si_db_count: Default::default(), + si_table_count: Default::default(), + si_user: Default::default(), + si_password: Default::default(), + si_cols: Default::default(), }, backends_flaten: Default::default(), backends: HashMap::from([( @@ -138,8 +217,9 @@ mod tests { "127.0.0.1:8081,127.0.0.2:8081".into(), ], )]), + si_backends: Default::default(), }; - let strategy = Strategist::try_from(&ns); + let strategy = Strategist::try_from(&ns).unwrap(); let mut buf = String::new(); let buf = &mut buf; // vrange @@ -160,7 +240,8 @@ mod tests { }; let hash = strategy.hasher().hash(&"id".as_bytes()); let date = NaiveDate::from_ymd_opt(2021, 5, 1).unwrap(); - let builder = SqlBuilder::new(&vector_cmd, hash, date, &strategy).unwrap(); + let builder = + SqlBuilder::new(&vector_cmd, hash, date, &strategy, Default::default()).unwrap(); builder.write_sql(buf); println!("len: {}, act len: {}", builder.len(), buf.len()); let db_idx = strategy.distribution().db_idx(hash); @@ -184,7 +265,8 @@ mod tests { }; let hash = strategy.hasher().hash(&"id".as_bytes()); let date = NaiveDate::from_ymd_opt(2021, 5, 1).unwrap(); - let builder = SqlBuilder::new(&vector_cmd, hash, date, &strategy).unwrap(); + let builder = + SqlBuilder::new(&vector_cmd, hash, date, &strategy, Default::default()).unwrap(); buf.clear(); builder.write_sql(buf); println!("len: {}, act len: {}", builder.len(), buf.len()); @@ -231,7 +313,8 @@ mod tests { }; let hash = strategy.hasher().hash(&"id".as_bytes()); let date = NaiveDate::from_ymd_opt(2021, 5, 1).unwrap(); - let builder = SqlBuilder::new(&vector_cmd, hash, date, &strategy).unwrap(); + let builder = + SqlBuilder::new(&vector_cmd, hash, date, &strategy, Default::default()).unwrap(); buf.clear(); builder.write_sql(buf); println!("len: {}, act len: {}", builder.len(), buf.len()); @@ -273,7 +356,8 @@ mod tests { }; let hash = strategy.hasher().hash(&"id".as_bytes()); let date = NaiveDate::from_ymd_opt(2021, 5, 1).unwrap(); - let builder = SqlBuilder::new(&vector_cmd, hash, date, &strategy).unwrap(); + let builder = + SqlBuilder::new(&vector_cmd, hash, date, &strategy, Default::default()).unwrap(); buf.clear(); builder.write_sql(buf); println!("len: {}, act len: {}", builder.len(), buf.len()); @@ -313,7 +397,8 @@ mod tests { }; let hash = strategy.hasher().hash(&"id".as_bytes()); let date = NaiveDate::from_ymd_opt(2021, 5, 1).unwrap(); - let builder = SqlBuilder::new(&vector_cmd, hash, date, &strategy).unwrap(); + let builder = + SqlBuilder::new(&vector_cmd, hash, date, &strategy, Default::default()).unwrap(); buf.clear(); builder.write_sql(buf); println!("len: {}, act len: {}", builder.len(), buf.len()); @@ -366,7 +451,8 @@ mod tests { }; let hash = strategy.hasher().hash(&"id".as_bytes()); let date = NaiveDate::from_ymd_opt(2021, 5, 1).unwrap(); - let builder = SqlBuilder::new(&vector_cmd, hash, date, &strategy).unwrap(); + let builder = + SqlBuilder::new(&vector_cmd, hash, date, &strategy, Default::default()).unwrap(); buf.clear(); builder.write_sql(buf); println!("len: {}, act len: {}", builder.len(), buf.len()); @@ -408,7 +494,8 @@ mod tests { }; let hash = strategy.hasher().hash(&"id".as_bytes()); let date = NaiveDate::from_ymd_opt(2021, 5, 1).unwrap(); - let builder = SqlBuilder::new(&vector_cmd, hash, date, &strategy).unwrap(); + let builder = + SqlBuilder::new(&vector_cmd, hash, date, &strategy, Default::default()).unwrap(); buf.clear(); builder.write_sql(buf); println!("len: {}, act len: {}", builder.len(), buf.len()); @@ -436,7 +523,8 @@ mod tests { }; let hash = strategy.hasher().hash(&"id".as_bytes()); let date = NaiveDate::from_ymd_opt(2021, 5, 1).unwrap(); - let builder = SqlBuilder::new(&vector_cmd, hash, date, &strategy).unwrap(); + let builder = + SqlBuilder::new(&vector_cmd, hash, date, &strategy, Default::default()).unwrap(); buf.clear(); builder.write_sql(buf); println!("len: {}, act len: {}", builder.len(), buf.len()); @@ -461,7 +549,8 @@ mod tests { }; let hash = strategy.hasher().hash(&"id".as_bytes()); let date = NaiveDate::from_ymd_opt(2021, 5, 1).unwrap(); - let builder = SqlBuilder::new(&vector_cmd, hash, date, &strategy).unwrap(); + let builder = + SqlBuilder::new(&vector_cmd, hash, date, &strategy, Default::default()).unwrap(); buf.clear(); builder.write_sql(buf); println!("len: {}, act len: {}", builder.len(), buf.len()); @@ -508,7 +597,8 @@ mod tests { }; let hash = strategy.hasher().hash(&"id".as_bytes()); let date = NaiveDate::from_ymd_opt(2021, 5, 1).unwrap(); - let builder = SqlBuilder::new(&vector_cmd, hash, date, &strategy).unwrap(); + let builder = + SqlBuilder::new(&vector_cmd, hash, date, &strategy, Default::default()).unwrap(); buf.clear(); builder.write_sql(buf); println!("len: {}, act len: {}", builder.len(), buf.len()); diff --git a/endpoint/src/vector/topo.rs b/endpoint/src/vector/topo.rs index 0b70eb087..0aa91289f 100644 --- a/endpoint/src/vector/topo.rs +++ b/endpoint/src/vector/topo.rs @@ -1,11 +1,13 @@ use std::collections::HashMap; -use chrono::Datelike; +use chrono::{Datelike, NaiveDate}; use discovery::dns; use discovery::dns::IPPort; use discovery::TopologyWrite; use ds::MemGuard; use protocol::kv::{ContextStatus, MysqlBuilder}; +use protocol::vector::attachment::{VAttach, VecAttach}; +use protocol::vector::redis::parse_vector_detail; use protocol::Protocol; use protocol::Request; use protocol::ResOption; @@ -15,7 +17,7 @@ use sharding::hash::{Hash, HashKey}; use crate::dns::DnsConfig; use crate::Timeout; use crate::{Endpoint, Topology}; -use protocol::vector::mysql::SqlBuilder; +use protocol::vector::mysql::{SiSqlBuilder, SqlBuilder}; use super::config::VectorNamespace; use super::strategy::Strategist; @@ -25,6 +27,7 @@ use crate::shards::Shard; #[derive(Clone)] pub struct VectorService { shards: Shards, + si_shard: Vec>, strategist: Strategist, parser: P, cfg: Box>, @@ -36,6 +39,7 @@ impl From

for VectorService { Self { parser, shards: Default::default(), + si_shard: Default::default(), strategist: Default::default(), cfg: Default::default(), } @@ -59,59 +63,164 @@ where Req: Request, P: Protocol, { + fn has_attach(&self) -> bool { + self.strategist.more() + } } -impl Endpoint for VectorService +impl VectorService where E: Endpoint, Req: Request, P: Protocol, { - type Item = Req; + fn get_shard(&self, req: &mut Req) -> Result<&Shard, protocol::Error> { + let (year, shard_idx) = if req.ctx_mut().runs == 0 { + let vcmd = parse_vector_detail(****req, req.flag())?; + //定位年库 + let date = self.strategist.get_date(&vcmd.keys)?; + let year = date.year() as u16; - fn send(&self, mut req: Self::Item) { - let shard = (|| -> Result<&Shard, protocol::Error> { - let (year, shard_idx) = if req.ctx_mut().runs == 0 { - let vcmd = protocol::vector::redis::parse_vector_detail(&req)?; - //定位年库 - let date = self.strategist.get_date(&vcmd.keys)?; - let year = date.year() as u16; + let shard_idx = self.shard_idx(req.hash()); + req.ctx_mut().year = year; + req.ctx_mut().shard_idx = shard_idx as u16; - let shard_idx = self.shard_idx(req.hash()); - req.ctx_mut().year = year; - req.ctx_mut().shard_idx = shard_idx as u16; + let vector_builder = SqlBuilder::new(&vcmd, req.hash(), date, &self.strategist, 0)?; + let cmd = MysqlBuilder::build_packets_for_vector(vector_builder)?; + req.reshape(MemGuard::from_vec(cmd)); - let vector_builder = SqlBuilder::new(&vcmd, req.hash(), date, &self.strategist)?; - let cmd = MysqlBuilder::build_packets_for_vector(vector_builder)?; + (year, shard_idx) + } else { + (req.ctx_mut().year, req.ctx_mut().shard_idx as usize) + }; + + let shards = self.shards.get(year); + let shard = shards.get(shard_idx).ok_or(protocol::Error::TopInvalid)?; + log::debug!( + "+++ mysql {} send {} year {} shards {:?} => {:?}", + self.cfg.service, + shard_idx, + year, + shards, + req + ); + Ok(shard) + } + fn more_get_shard(&self, req: &mut Req) -> Result<&Shard, protocol::Error> { + req.attachment_mut() + .get_or_insert(VecAttach::default().to_attach()); + //分别代表请求的轮次和每轮重试次数 + let (round, runs) = (req.attach().round, req.ctx_mut().runs); + //runs == 0 表示第一轮第一次请求 + let shard = if runs == 0 || req.attach().rsp_ok { + let shard = if runs == 0 { + //请求si表 + assert_eq!(req.attach().left_count, 0); + assert_eq!(*req.context_mut(), 0); + let vcmd = parse_vector_detail(****req, req.flag())?; + if req.operation().is_retrival() { + req.retry_on_rsp_notok(true); + let limit = vcmd.limit(); + assert!(limit > 0, "{limit}"); + //需要在buildsql之前设置 + req.attach_mut().init(limit as u16); + } + + let si_sql = SiSqlBuilder::new(&vcmd, req.hash(), &self.strategist)?; + let cmd = MysqlBuilder::build_packets_for_vector(si_sql)?; req.reshape(MemGuard::from_vec(cmd)); - (year, shard_idx) + let si_shard_idx = self.strategist.si_distribution().index(req.hash()); + req.ctx_mut().shard_idx = si_shard_idx as u16; + req.attach_mut().vcmd = vcmd; + req.set_last(false); + &self.si_shard[si_shard_idx] } else { - (req.ctx_mut().year, req.ctx_mut().shard_idx as usize) + //根据round获取si + let si_items = req.attach().si(); + assert!(si_items.len() > 0, "si_items.len() = 0"); + assert!( + round <= si_items.len() as u16, + "round = {round}, si_items.len() = {}", + si_items.len() + ); + let si_item = &si_items[(round - 1) as usize]; + + let year = si_item.date.year as u16 + 2000; + //构建sql + let limit = req.attach().left_count.min(si_item.count); + assert!(si_item.count > 0, "{}", si_item.count); + assert!(req.attach().left_count > 0, "{}", req.attach().left_count); + + let Some(date) = NaiveDate::from_ymd_opt(year.into(), si_item.date.month.into(), 1) + else { + return Err(protocol::Error::ResponseInvalidMagic); + }; + let vector_builder = SqlBuilder::new( + &req.attach().vcmd, + req.hash(), + date, + &self.strategist, + limit as u64, + )?; + let cmd = MysqlBuilder::build_packets_for_vector(vector_builder)?; + + //更新轮次信息 + if round == si_items.len() as u16 { + req.set_last(true); + } + + req.reshape(MemGuard::from_vec(cmd)); + //获取shard + let shard_idx = self.shard_idx(req.hash()); + req.ctx_mut().year = year; + req.ctx_mut().shard_idx = shard_idx as u16; + let shards = self.shards.get(year); + shards.get(shard_idx).ok_or(protocol::Error::TopInvalid)? }; - let shards = self.shards.get(year); - if shards.len() == 0 { - return Err(protocol::Error::TopInvalid); + //重新发送后,视作新的请求,重置响应和runs + req.attach_mut().rsp_ok = false; + req.attach_mut().round += 1; + req.ctx_mut().runs = 0; + req.set_fitst_try(); + shard + } else { + if round - 1 == 0 { + //上一轮si表重试 + &self.si_shard[req.ctx_mut().shard_idx as usize] + } else { + let (year, shard_idx) = (req.ctx_mut().year, req.ctx_mut().shard_idx); + let shards = self.shards.get(year); + shards + .get(shard_idx as usize) + .ok_or(protocol::Error::TopInvalid)? } - debug_assert!( - shard_idx < shards.len(), - "mysql: {}/{} req:{:?}", - shard_idx, - shards.len(), - req - ); - let shard = unsafe { shards.get_unchecked(shard_idx) }; - log::debug!( - "+++ mysql {} send {} year {} shards {:?} => {:?}", - self.cfg.service, - shard_idx, - year, - shards, - req - ); - Ok(shard) - })(); + }; + + log::debug!( + "+++ mysql {} shards {:?} => {:?}", + self.cfg.service, + shard, + req + ); + Ok(shard) + } +} +impl Endpoint for VectorService +where + E: Endpoint, + Req: Request, + P: Protocol, +{ + type Item = Req; + + fn send(&self, mut req: Self::Item) { + let shard = if !self.strategist.more() { + self.get_shard(&mut req) + } else { + self.more_get_shard(&mut req) + }; let shard = match shard { Ok(shard) => shard, Err(e) => { @@ -119,6 +228,7 @@ where protocol::Error::TopInvalid => ContextStatus::TopInvalid, _ => ContextStatus::ReqInvalid, }; + req.try_next(false); req.on_err(e); return; } @@ -162,14 +272,18 @@ where E: Endpoint, { fn need_load(&self) -> bool { - self.shards.len() != self.cfg.shards_url.len() || self.cfg.need_load() + (self.shards.len() + self.si_shard.len()) != self.cfg.shards_url.len() + || self.cfg.need_load() } fn load(&mut self) -> bool { self.cfg.load_guard().check_load(|| self.load_inner()) } fn update(&mut self, namespace: &str, cfg: &str) { if let Some(ns) = VectorNamespace::try_from(cfg) { - self.strategist = Strategist::try_from(&ns); + let Some(strategist) = Strategist::try_from(&ns) else { + return; + }; + self.strategist = strategist; self.cfg.update(namespace, ns); } } @@ -297,13 +411,114 @@ where } self.shards.push((interval, shards_per_interval)); } - assert_eq!(self.shards.len(), self.cfg.shards_url.len()); + if !self.load_inner_si() { + return false; + } + assert_eq!( + self.shards.len() + self.si_shard.len(), + self.cfg.shards_url.len() + ); + log::info!("{} load complete. dropping:{:?}", self.cfg.service, { old.retain(|_k, v| v.len() > 0); old.keys() }); true } + + #[inline] + fn load_inner_si(&mut self) -> bool { + // 所有的ip要都能解析出主从域名 + let mut addrs = Vec::with_capacity(self.cfg.si_backends.len()); + for shard in &self.cfg.si_backends { + let shard: Vec<&str> = shard.split(",").collect(); + if shard.len() < 2 { + log::warn!("{} si both master and slave required.", self.cfg.service); + return false; + } + let master_url = &shard[0]; + let mut master = String::new(); + dns::lookup_ips(master_url.host(), |ips| { + if ips.len() > 0 { + master = ips[0].to_string() + ":" + master_url.port(); + } + }); + let mut slaves = Vec::with_capacity(8); + for url_port in &shard[1..] { + let url = url_port.host(); + let port = url_port.port(); + use ds::vec::Add; + dns::lookup_ips(url, |ips| { + for ip in ips { + slaves.add(ip.to_string() + ":" + port); + } + }); + } + if master.len() == 0 || slaves.len() == 0 { + log::warn!( + "master:({}=>{}) or slave ({:?}=>{:?}) not looked up", + master_url, + master, + &shard[1..], + slaves + ); + return false; + } + addrs.push((master, slaves)); + } + + // 到这之后,所有的shard都能解析出ip + let mut old = HashMap::with_capacity(addrs.len()); + self.si_shard.split_off(0).into_iter().for_each(|shard| { + old.entry(shard.master.addr().to_string()) + .or_insert(Vec::new()) + .push(shard.master); + for endpoint in shard.slaves.into_inner() { + let addr = endpoint.addr().to_string(); + // 一个ip可能存在于多个域名中。 + old.entry(addr).or_insert(Vec::new()).push(endpoint); + } + }); + + // 遍历所有的shards_url + for (master_addr, slaves) in addrs { + assert_ne!(master_addr.len(), 0); + assert_ne!(slaves.len(), 0); + // 用户名和密码 + let res_option = ResOption { + token: self.cfg.basic.si_password.clone(), + username: self.cfg.basic.si_user.clone(), + }; + let master = self.take_or_build( + &mut old, + &master_addr, + self.cfg.timeout_master(), + res_option.clone(), + ); + // slave + let mut replicas = Vec::with_capacity(8); + for addr in slaves { + let slave = self.take_or_build( + &mut old, + &addr, + self.cfg.timeout_slave(), + res_option.clone(), + ); + replicas.push(slave); + } + + use crate::PerformanceTuning; + let shard = Shard::selector( + self.cfg.basic.selector.tuning_mode(), + master, + replicas, + self.cfg.basic.region_enabled, + ); + self.si_shard.push(shard); + } + + true + } } impl discovery::Inited for VectorService where @@ -314,7 +529,7 @@ where fn inited(&self) -> bool { // 每一个分片都有初始, 并且至少有一主一从。 self.shards.len() > 0 - && self.shards.len() == self.cfg.shards_url.len() + && (self.shards.len() + self.si_shard.len()) == self.cfg.shards_url.len() && self.shards.inited() } } diff --git a/protocol/Cargo.toml b/protocol/Cargo.toml index df7a9b676..767987c18 100644 --- a/protocol/Cargo.toml +++ b/protocol/Cargo.toml @@ -42,6 +42,7 @@ url = "2.1" percent-encoding = "2.1.0" seq-macro = "*" chrono = "0.4" +chrono-tz = { version = "0.5", default-features = false } paste = "1.0" [features] diff --git a/protocol/src/callback.rs b/protocol/src/callback.rs index 5cd43460e..f7471921e 100644 --- a/protocol/src/callback.rs +++ b/protocol/src/callback.rs @@ -1,5 +1,4 @@ use std::{ - cell::OnceCell, mem::MaybeUninit, ptr::{self, NonNull}, sync::{ @@ -8,7 +7,7 @@ use std::{ }, }; -use crate::BackendQuota; +use crate::{Attachment, BackendQuota}; use ds::{time::Instant, AtomicWaker}; use crate::{request::Request, Command, Error, HashedCommand}; @@ -38,7 +37,7 @@ pub struct CallbackContext { pub(crate) try_next: bool, // 请求失败后,topo层面是否允许重试 pub(crate) retry_on_rsp_notok: bool, // 有响应且响应不ok时,协议层面是否允许重试 pub(crate) write_back: bool, // 请求结束后,是否需要回写。 - pub(crate) max_tries: OnceCell, // 最大重试次数 + pub(crate) max_tries: u8, // 最大重试次数 first: bool, // 当前请求是否是所有子请求的第一个 last: bool, // 当前请求是否是所有子请求的最后一个 tries: AtomicU8, @@ -48,6 +47,8 @@ pub struct CallbackContext { waker: *const Arc, callback: CallbackPtr, quota: Option, + attachment: Option, // 附加数据,用于辅助请求和响应,目前只有kvector在使用 + drop_attach: Option>, } impl CallbackContext { @@ -60,6 +61,7 @@ impl CallbackContext { last: bool, retry_on_rsp_notok: bool, max_tries: u8, + drop_attach: Option>, ) -> Self { log::debug!("request prepared:{}", req); let now = Instant::now(); @@ -73,7 +75,7 @@ impl CallbackContext { try_next: false, retry_on_rsp_notok, write_back: false, - max_tries: OnceCell::from(max_tries), + max_tries, request: req, response: MaybeUninit::uninit(), callback: cb, @@ -81,6 +83,8 @@ impl CallbackContext { tries: 0.into(), waker, quota: None, + attachment: None, + drop_attach, } } @@ -114,16 +118,48 @@ impl CallbackContext { } } #[inline] - pub fn on_complete(&mut self, resp: Command) { + pub fn on_complete(&mut self, parser: &P, resp: Command) { log::debug!("on-complete:{} resp:{}", self, resp); // 异步请求不关注response。 if !self.async_mode { debug_assert!(!self.complete(), "{:?}", self); - self.swap_response(resp); + if self.attachment.is_some() { + // vector聚合场景 + self.on_complete_aggregate(parser, resp); + } else { + self.swap_response(resp); + } } self.on_done(); } + #[inline] + pub fn on_complete_aggregate(&mut self, parser: &P, mut resp: Command) { + // 返回成功: + // 1. 第一轮获取si;若si获取失败(例如si为空),则终止请求 + // 2. 后续轮次更新attachment,并判断是否是最后一轮。 + // 返回失败,则终止请求。 + if resp.ok() { + // 更新attachment + let attach = self.attachment.as_mut().expect("attach"); + let last = parser.update_attachment(attach, &mut resp); + if last { + self.set_last(true); + } + // 更新attachment不成功,或者响应数足够,终止请求 + } else { + self.set_last(true); + } + if self.last() { + // 中间轮次的resp没有被使用,可忽略; + self.swap_response(resp); + } else { + // 重置下一轮访问需要的变量 + self.try_next = true; // 可以进入下一轮访问 + self.set_fitst_try(); + } + } + #[inline] pub fn take_response(&mut self) -> Option { match self.inited.compare_exchange(true, false, AcqRel, Acquire) { @@ -142,7 +178,7 @@ impl CallbackContext { // 当前重试条件为 rsp == None || ("mc" && !rsp.ok()) if self.inited() { // 优先筛出正常的请求,便于理解 - // rsp.ok 不需要重试 + // rsp.ok if unsafe { self.unchecked_response().ok() } { return false; } @@ -151,8 +187,8 @@ impl CallbackContext { return false; } } - let max_tries = *self.max_tries.get().expect("max tries"); - self.try_next && self.tries.fetch_add(1, Release) < max_tries + + self.try_next && self.tries.fetch_add(1, Release) < self.max_tries } else { // write back请求 self.write_back @@ -172,6 +208,10 @@ impl CallbackContext { // 需要重试或回写 return self.goon(); } + + // 改到这里,不需要额外判断逻辑了 + self.set_last(true); + //markdone后,req标记为已完成,那么CallbackContext和CopyBidirectional都有可能被释放 //CopyBidirectional会提前释放,所以需要提前clone一份 //CallbackContext会提前释放,则需要在此clone到栈上 @@ -290,6 +330,27 @@ impl CallbackContext { pub fn quota(&mut self, quota: BackendQuota) { self.quota = Some(quota); } + #[inline] + pub fn attachment(&self) -> Option<&Attachment> { + self.attachment.as_ref() + } + #[inline] + pub fn attachment_mut(&mut self) -> &mut Option { + &mut self.attachment + } + #[inline] + pub fn set_last(&mut self, last: bool) { + // todo: 可优化为依据请求数或者响应数量判断可以设置last为true + self.last = last; + } + #[inline] + pub fn set_max_tries(&mut self, max_tries: u8) { + self.max_tries = max_tries; + } + #[inline] + pub fn set_fitst_try(&mut self) { + self.tries = 0.into(); + } } impl Drop for CallbackContext { @@ -300,6 +361,9 @@ impl Drop for CallbackContext { // 可以尝试检查double free // 在debug环境中,设置done为false debug_assert_eq!(*self.done.get_mut() = false, ()); + if let Some(attachment) = self.attachment.take() { + (self.drop_attach.as_ref().expect("should has drop_attach"))(attachment); + } } } diff --git a/protocol/src/flag.rs b/protocol/src/flag.rs index dda81ba37..4c8accf64 100644 --- a/protocol/src/flag.rs +++ b/protocol/src/flag.rs @@ -79,38 +79,20 @@ impl Flag { } } -// TODO 暂时保留备查,2024.2后再考虑清理 fishermen -// #[derive(Debug, Clone)] -// pub enum TryNextType { -// NotTryNext = 0, -// TryNext = 1, -// // 去掉unknow类型,统一逻辑处理,测试稳定后清理,预计2024.1后清理 fishermen -// // Unkown = 2, -// } - -// // (1) 0: not try next(对add/replace生效); (2) 1: try next; (3) 2:unkown (仅对set生效,注意提前考虑cas) -// impl From for TryNextType { -// fn from(val: u8) -> Self { -// match val { -// 0 => TryNextType::NotTryNext, -// 1 => TryNextType::TryNext, -// // 2 => TryNextType::Unkown, -// _ => panic!("unknow try next type"), -// } -// } -// // TODO 暂时保留,线上稳定后清理,预计2024.2之后 fishermen -// // pub fn from(val: u8) -> Self { -// // match val { -// // 0 => TryNextType::NotTryNext, -// // 1 => TryNextType::TryNext, -// // // 2 => TryNextType::Unkown, -// // _ => panic!("unknow try next type"), -// // } -// // } -// } +#[derive(Debug, Default)] +pub struct ResponseHeader { + pub(crate) header: Vec, + pub(crate) rows: u16, // 包含的响应行数 + pub(crate) columns: u16, // 每行的响应列数 +} -// impl Default for TryNextType { -// fn default() -> Self { -// TryNextType::TryNext -// } -// } +impl ResponseHeader { + #[inline] + pub fn new(header: Vec, rows: u16, columns: u16) -> Self { + ResponseHeader { + header, + rows, + columns, + } + } +} diff --git a/protocol/src/kv/common/constants.rs b/protocol/src/kv/common/constants.rs index 29fda03df..2d401e4bd 100644 --- a/protocol/src/kv/common/constants.rs +++ b/protocol/src/kv/common/constants.rs @@ -640,6 +640,7 @@ impl ColumnType { | MYSQL_TYPE_INT24 | MYSQL_TYPE_LONG | MYSQL_TYPE_LONGLONG + | MYSQL_TYPE_NEWDECIMAL ) } diff --git a/protocol/src/kv/common/error/mod.rs b/protocol/src/kv/common/error/mod.rs index 371720428..6b731b557 100644 --- a/protocol/src/kv/common/error/mod.rs +++ b/protocol/src/kv/common/error/mod.rs @@ -63,7 +63,9 @@ pub enum Error { UrlError(UrlError), #[cfg(any(feature = "native-tls", feature = "rustls"))] TlsError(tls::TlsError), + #[allow(dead_code)] FromValueError(Value), + #[allow(dead_code)] FromRowError(Row), } diff --git a/protocol/src/kv/common/query_result.rs b/protocol/src/kv/common/query_result.rs index 199cc24eb..2e41335c1 100644 --- a/protocol/src/kv/common/query_result.rs +++ b/protocol/src/kv/common/query_result.rs @@ -64,6 +64,7 @@ pub enum SetIteratorState { /// Iterator is in a non-empty set. InSet(Arc<[Column]>), /// Iterator is in an empty set. + #[allow(dead_code)] InEmptySet(OkPacket), /// Iterator is in an errored result set. Errored(Error), diff --git a/protocol/src/parser.rs b/protocol/src/parser.rs index 97b1e7ddf..397f69470 100644 --- a/protocol/src/parser.rs +++ b/protocol/src/parser.rs @@ -9,7 +9,7 @@ use crate::msgque::MsgQue; use crate::redis::Redis; use crate::uuid::Uuid; use crate::vector::Vector; -use crate::{Error, Flag, OpCode, Operation, Result, Stream, Writer}; +use crate::{Attachment, Error, Flag, OpCode, Operation, ResponseHeader, Result, Stream, Writer}; #[derive(Clone)] #[enum_dispatch(Proto)] @@ -128,6 +128,17 @@ pub trait Proto: Unpin + Clone + Send + Sync + 'static { fn max_tries(&self, _req_op: Operation) -> u8 { 1_u8 } + + #[inline] + fn update_attachment(&self, attachment: &mut Attachment, response: &mut Command) -> bool { + // 默认情况下,attachment应该为空 + assert!(false, "{:?} {response}", attachment); + false + } + #[inline] + fn drop_attach(&self, _att: Attachment) { + panic!("unreachable"); + } } pub trait RequestProcessor { @@ -139,8 +150,11 @@ pub trait RequestProcessor { fn process(&mut self, req: HashedCommand, last: bool); } +// TODO Command实质就是response,考虑直接用response? fishermen pub struct Command { ok: bool, + pub(crate) header: ResponseHeader, + count: u32, cmd: MemGuard, } @@ -156,8 +170,24 @@ pub struct HashedCommand { impl Command { #[inline] pub fn from(ok: bool, cmd: ds::MemGuard) -> Self { - Self { ok, cmd } + Self { + ok, + header: Default::default(), + count: 0, + cmd, + } } + #[inline] + pub fn with_assemble_pack(ok: bool, header: ResponseHeader, body: ds::MemGuard) -> Self { + let count = header.rows as u32; + Self { + ok, + header, + count, + cmd: body, + } + } + pub fn from_ok(cmd: ds::MemGuard) -> Self { Self::from(true, cmd) } @@ -165,6 +195,18 @@ impl Command { pub fn ok(&self) -> bool { self.ok } + #[inline] + + pub fn update_ok(&mut self, ok: bool) { + self.ok = ok; + } + pub fn count(&self) -> u32 { + self.count + } + #[inline] + pub fn set_count(&mut self, n: u32) { + self.count = n; + } } impl std::ops::Deref for Command { type Target = MemGuard; @@ -250,14 +292,13 @@ impl HashedCommand { } #[inline] pub fn reshape(&mut self, mut dest_cmd: MemGuard) { - assert!( - self.origin_cmd.is_none(), - "origin cmd should be none: {:?}", - self.origin_cmd - ); - // 将dest cmd设给cmd,并将换出的cmd保留在origin_cmd中 - mem::swap(&mut self.cmd, &mut dest_cmd); - self.origin_cmd = Some(dest_cmd); + if self.origin_cmd.is_none() { + // 将dest cmd设给cmd,并将换出的cmd保留在origin_cmd中 + mem::swap(&mut self.cmd, &mut dest_cmd); + self.origin_cmd = Some(dest_cmd); + } else { + self.cmd = dest_cmd; + } } } @@ -294,6 +335,7 @@ pub trait Commander, I: MetricItem> { fn request_shard(&self) -> usize; fn metric(&self) -> &M; fn ctx(&self) -> u64; + fn attachment(&self) -> Option<&Attachment>; } pub enum MetricName { diff --git a/protocol/src/redis/packet.rs b/protocol/src/redis/packet.rs index 634ca0604..a21c589be 100644 --- a/protocol/src/redis/packet.rs +++ b/protocol/src/redis/packet.rs @@ -486,7 +486,10 @@ impl Packet { pub fn num(&self, oft: &mut usize) -> crate::Result { // 至少4个字节 if *oft + 4 <= self.len() { - debug_assert!(self[*oft] == b'*' || self[*oft] == b'$', "packet:{self:?}"); + debug_assert!( + self[*oft] == b'*' || self[*oft] == b'$' || self[*oft] == b':', + "packet:{self:?}" + ); let start = *oft; *oft += num_skips(self.at(*oft + 1)); let mut val: usize = 0; @@ -512,7 +515,7 @@ impl Packet { } if b.is_ascii_digit() { val = val * 10 + (b - b'0') as usize; - if val <= std::u32::MAX as usize { + if val <= std::usize::MAX as usize { continue; } } diff --git a/protocol/src/req.rs b/protocol/src/req.rs index 4577248bc..d3b88a268 100644 --- a/protocol/src/req.rs +++ b/protocol/src/req.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use crate::{Command, HashedCommand}; pub type Context = u64; - +pub type Attachment = [u8; 280]; #[repr(transparent)] #[derive(Clone, Default)] pub struct BackendQuota { @@ -49,7 +49,7 @@ pub trait Request: fn start_at(&self) -> Instant; fn on_noforward(&mut self); fn on_sent(self) -> Option; - fn on_complete(self, resp: Command); + fn on_complete(self, parser: &P, resp: Command); fn on_err(self, err: crate::Error); #[inline] fn context_mut(&mut self) -> &mut Context { @@ -67,4 +67,11 @@ pub trait Request: fn retry_on_rsp_notok(&mut self, retry: bool); // 初始化quota fn quota(&mut self, quota: BackendQuota); + // 对request增加附加信息 + fn attachment_mut(&mut self) -> &mut Option; + // 获取附加信息 + fn attachment(&self) -> Option<&Attachment>; + fn set_max_tries(&mut self, max_tries: u8); + fn set_fitst_try(&mut self); + fn set_last(&mut self, last: bool); } diff --git a/protocol/src/request.rs b/protocol/src/request.rs index f376095d4..c66adbdf3 100644 --- a/protocol/src/request.rs +++ b/protocol/src/request.rs @@ -1,4 +1,6 @@ -use crate::{callback::CallbackContext, BackendQuota, Command, Context, Error, HashedCommand}; +use crate::{ + callback::CallbackContext, Attachment, BackendQuota, Command, Context, Error, HashedCommand, +}; use std::{ fmt::{self, Debug, Display, Formatter}, ptr::NonNull, @@ -26,8 +28,8 @@ impl crate::Request for Request { } } #[inline] - fn on_complete(self, resp: Command) { - self.ctx().on_complete(resp); + fn on_complete(self, parser: &P, resp: Command) { + self.ctx().on_complete(parser, resp); } #[inline] fn on_err(self, err: Error) { @@ -57,6 +59,25 @@ impl crate::Request for Request { fn quota(&mut self, quota: BackendQuota) { self.ctx().quota(quota); } + #[inline] + fn attachment(&self) -> Option<&Attachment> { + self.ctx().attachment() + } + #[inline] + fn attachment_mut(&mut self) -> &mut Option { + self.ctx().attachment_mut() + } + #[inline] + fn set_max_tries(&mut self, max_tries: u8) { + self.ctx().set_max_tries(max_tries); + } + #[inline] + fn set_fitst_try(&mut self) { + self.ctx().set_fitst_try(); + } + fn set_last(&mut self, last: bool) { + self.ctx().set_last(last); + } } impl Request { #[inline] diff --git a/protocol/src/vector/attachment.rs b/protocol/src/vector/attachment.rs new file mode 100644 index 000000000..f45ec8a8a --- /dev/null +++ b/protocol/src/vector/attachment.rs @@ -0,0 +1,213 @@ +use crate::Attachment; +use crate::Command; +use crate::Packet; +use ds::RingSlice; + +use super::VectorCmd; +#[derive(Debug, Default)] +#[repr(C)] +pub struct VecAttach { + pub rsp_ok: bool, + // 查询的轮次,0代表si + pub round: u16, + // 待查询数量,不能超过u16::MAX + pub left_count: u16, + body_token_count: u16, + //本轮响应是否成功 + // header,*2 + column names + header: Vec, + body: Vec>, + // 查询响应的body中token数量 + si: Vec, // si表中查询到的数据, si字段信息在配置里存放 + pub vcmd: VectorCmd, +} + +#[derive(Debug, Default)] +pub struct SiItem { + pub date: VDate, + pub count: u16, +} +impl SiItem { + pub fn new(yy: u8, mm: u8, count: u16) -> Self { + Self { + date: VDate { + year: yy, + month: mm, + }, + count, + } + } +} +#[derive(Debug, Default)] +pub struct VDate { + pub year: u8, // year + pub month: u8, // month +} +impl VDate { + // 2024-11-01 -> {24, 11} + pub fn from(d: &RingSlice) -> Self { + if let Some(first) = d.find(0, b'-') { + let y = d.try_str_num(0..first).unwrap_or(0); + if let Some(second) = d.find(first + 1, b'-') { + let m = d.try_str_num(first + 1..second).unwrap_or(0); + if y > 0 && m > 0 { + return Self { + year: y.checked_rem(100).unwrap_or(0) as u8, + month: m as u8, + }; + } + } + } + Self { year: 0, month: 0 } + } + #[inline] + pub fn year(&self) -> u8 { + self.year + } + #[inline] + pub fn month(&self) -> u8 { + self.month + } + #[inline] + pub fn is_valid(&self) -> bool { + self.month > 0 && self.year > 0 + } +} + +pub trait VAttach { + fn attach(&self) -> &VecAttach; + fn attach_mut(&mut self) -> &mut VecAttach; +} + +impl VAttach for T { + #[inline(always)] + fn attach(&self) -> &VecAttach { + unsafe { std::mem::transmute(self.attachment().expect("attach is none")) } + } + #[inline(always)] + fn attach_mut(&mut self) -> &mut VecAttach { + unsafe { std::mem::transmute(self.attachment_mut().as_mut().expect("attach is none")) } + } +} + +impl VecAttach { + #[inline(always)] + pub fn from(att: Attachment) -> VecAttach { + unsafe { std::mem::transmute(att) } + } + #[inline(always)] + pub fn attach(att: &Attachment) -> &VecAttach { + unsafe { std::mem::transmute(att) } + } + #[inline(always)] + pub fn attach_mut(att: &mut Attachment) -> &mut VecAttach { + unsafe { std::mem::transmute(att) } + } + #[inline(always)] + pub fn to_attach(self) -> Attachment { + unsafe { std::mem::transmute(self) } + } + #[inline] + pub fn init(&mut self, left_count: u16) { + *self = VecAttach { + round: 0, + left_count, + header: Vec::with_capacity(8), + body: Vec::with_capacity(12), + body_token_count: 0, + rsp_ok: false, + si: Vec::with_capacity(6), + vcmd: Default::default(), + }; + } + #[inline] + pub fn is_empty(&self) -> bool { + self.body.is_empty() + } + + pub fn attach_header(&mut self, header: Vec) { + self.header = header; + } + #[inline] + pub fn attach_body(&mut self, body_data: Vec, rows: u16, columns: u16) { + self.body.push(body_data); + self.body_token_count += rows * columns; + self.left_count = self.left_count.saturating_sub(rows); + } + + #[inline] + pub fn header(&self) -> &Vec { + &self.header + } + + #[inline] + pub fn body(&self) -> &Vec> { + &self.body + } + + #[inline] + pub fn body_token_count(&self) -> u16 { + self.body_token_count + } + // 从response中解析si + // 约定:si返回结果的结构: uid、date、count顺序排列 + #[inline] + pub fn attach_si(&mut self, response: &Command) -> bool { + let rows = response.header.rows; + let cols = response.header.columns; + debug_assert_eq!(cols, 3); + self.si.reserve(rows as usize); + let data = Packet::from(***response); + let mut oft: usize = 0; + while oft < data.len() { + if data.num(&mut oft).is_err() { + return false; + } + let d = data.bulk_string(&mut oft); + if d.is_err() { + return false; + } + if let Ok(count) = data.num(&mut oft) { + if count > 0 { + let date = VDate::from(&d.unwrap()); + if date.is_valid() { + let si_item = SiItem::new(date.year(), date.month(), count as u16); + self.si.push(si_item); + } + } + } + } + self.si.len() > 0 + } + #[inline] + pub fn has_si(&mut self) -> bool { + self.si.len() > 0 + } + #[inline] + pub fn si(&self) -> &Vec { + &self.si + } +} + +#[cfg(test)] +mod tests { + use ds::MemGuard; + + use crate::ResponseHeader; + + use super::*; + #[test] + fn test_attach_si() { + let header: ResponseHeader = ResponseHeader::new( + "*3\r\n$3\r\nuid\r\n$10\r\nstart_date\r\n$5\r\ncount\r\n".into(), + 246, + 3, + ); + let body: MemGuard = MemGuard::from_vec(":6351590999\r\n$10\r\n2024-06-01\r\n:674\r\n:6351590999\r\n$10\r\n2024-05-01\r\n:1113\r\n:6351590999\r\n$10\r\n2024-04-01\r\n:833\r\n:6351590999\r\n$10\r\n2024-03-01\r\n:45\r\n:6351590999\r\n$10\r\n2024-02-01\r\n:61\r\n:6351590999\r\n$10\r\n2024-01-01\r\n:59\r\n:6351590999\r\n$10\r\n2023-12-01\r\n:20\r\n:6351590999\r\n$10\r\n2023-11-01\r\n:9\r\n:6351590999\r\n$10\r\n2023-10-01\r\n:13\r\n:6351590999\r\n$10\r\n2023-09-01\r\n:50\r\n:6351590999\r\n$10\r\n2023-08-01\r\n:16\r\n:6351590999\r\n$10\r\n2023-07-01\r\n:61\r\n:6351590999\r\n$10\r\n2023-06-01\r\n:30\r\n:6351590999\r\n$10\r\n2023-05-01\r\n:41\r\n:6351590999\r\n$10\r\n2023-04-01\r\n:54\r\n:6351590999\r\n$10\r\n2023-03-01\r\n:108\r\n:6351590999\r\n$10\r\n2023-02-01\r\n:213\r\n:6351590999\r\n$10\r\n2023-01-01\r\n:159\r\n:6351590999\r\n$10\r\n2022-12-01\r\n:26\r\n:6351590999\r\n$10\r\n2022-11-01\r\n:16\r\n:6351590999\r\n$10\r\n2022-10-01\r\n:14\r\n:6351590999\r\n$10\r\n2022-09-01\r\n:3\r\n:6351590999\r\n$10\r\n2022-08-01\r\n:10\r\n:6351590999\r\n$10\r\n2022-07-01\r\n:9\r\n:6351590999\r\n$10\r\n2022-06-01\r\n:4\r\n:6351590999\r\n$10\r\n2022-05-01\r\n:23\r\n:6351590999\r\n$10\r\n2022-04-01\r\n:4\r\n:6351590999\r\n$10\r\n2022-03-01\r\n:4\r\n:6351590999\r\n$10\r\n2022-02-01\r\n:4\r\n:6351590999\r\n$10\r\n2022-01-01\r\n:5\r\n:6351590999\r\n$10\r\n2021-12-01\r\n:14\r\n:6351590999\r\n$10\r\n2021-11-01\r\n:4\r\n:6351590999\r\n$10\r\n2021-10-01\r\n:2\r\n:6351590999\r\n$10\r\n2021-09-01\r\n:3\r\n:6351590999\r\n$10\r\n2021-08-01\r\n:25\r\n:6351590999\r\n$10\r\n2021-07-01\r\n:36\r\n:6351590999\r\n$10\r\n2021-06-01\r\n:30\r\n:6351590999\r\n$10\r\n2021-05-01\r\n:18\r\n:6351590999\r\n$10\r\n2021-04-01\r\n:20\r\n:6351590999\r\n$10\r\n2021-03-01\r\n:21\r\n:6351590999\r\n$10\r\n2021-02-01\r\n:35\r\n:6351590999\r\n$10\r\n2021-01-01\r\n:22\r\n:6351590999\r\n$10\r\n2020-12-01\r\n:55\r\n:6351590999\r\n$10\r\n2020-11-01\r\n:22\r\n:6351590999\r\n$10\r\n2020-10-01\r\n:37\r\n:6351590999\r\n$10\r\n2020-09-01\r\n:33\r\n:6351590999\r\n$10\r\n2020-08-01\r\n:15\r\n:6351590999\r\n$10\r\n2020-07-01\r\n:12\r\n:6351590999\r\n$10\r\n2020-06-01\r\n:26\r\n:6351590999\r\n$10\r\n2020-05-01\r\n:54\r\n:6351590999\r\n$10\r\n2020-04-01\r\n:38\r\n:6351590999\r\n$10\r\n2020-03-01\r\n:27\r\n:6351590999\r\n$10\r\n2020-02-01\r\n:80\r\n:6351590999\r\n$10\r\n2020-01-01\r\n:99\r\n:6351590999\r\n$10\r\n2019-12-01\r\n:67\r\n:6351590999\r\n$10\r\n2019-11-01\r\n:120\r\n:6351590999\r\n$10\r\n2019-10-01\r\n:80\r\n:6351590999\r\n$10\r\n2019-09-01\r\n:76\r\n:6351590999\r\n$10\r\n2019-08-01\r\n:120\r\n:6351590999\r\n$10\r\n2019-07-01\r\n:140\r\n:6351590999\r\n$10\r\n2019-06-01\r\n:118\r\n:6351590999\r\n$10\r\n2019-05-01\r\n:146\r\n:6351590999\r\n$10\r\n2019-04-01\r\n:287\r\n:6351590999\r\n$10\r\n2019-03-01\r\n:83\r\n:6351590999\r\n$10\r\n2019-02-01\r\n:88\r\n:6351590999\r\n$10\r\n2019-01-01\r\n:262\r\n:6351590999\r\n$10\r\n2018-12-01\r\n:213\r\n:6351590999\r\n$10\r\n2018-11-01\r\n:251\r\n:6351590999\r\n$10\r\n2018-10-01\r\n:215\r\n:6351590999\r\n$10\r\n2018-09-01\r\n:192\r\n:6351590999\r\n$10\r\n2018-08-01\r\n:208\r\n:6351590999\r\n$10\r\n2018-07-01\r\n:339\r\n:6351590999\r\n$10\r\n2018-06-01\r\n:97\r\n:6351590999\r\n$10\r\n2018-05-01\r\n:162\r\n:6351590999\r\n$10\r\n2018-04-01\r\n:127\r\n:6351590999\r\n$10\r\n2018-03-01\r\n:147\r\n:6351590999\r\n$10\r\n2018-02-01\r\n:529\r\n:6351590999\r\n$10\r\n2018-01-01\r\n:702\r\n:6351590999\r\n$10\r\n2017-12-01\r\n:453\r\n:6351590999\r\n$10\r\n2017-11-01\r\n:70\r\n:6351590999\r\n$10\r\n2017-10-01\r\n:1\r\n:6351590999\r\n$10\r\n2017-08-01\r\n:1\r\n".into()); + let response: Command = Command::with_assemble_pack(true, header, body); + let mut att = VecAttach::default(); + att.init(1); + let r = att.attach_si(&response); + assert!(r); + } +} diff --git a/protocol/src/vector/mod.rs b/protocol/src/vector/mod.rs index 65917f384..047d22cf1 100644 --- a/protocol/src/vector/mod.rs +++ b/protocol/src/vector/mod.rs @@ -1,3 +1,4 @@ +pub mod attachment; mod command; pub(crate) mod error; pub mod flager; @@ -9,15 +10,18 @@ mod reqpacket; mod rsppacket; use std::fmt::Write; +use std::mem; use crate::{ - Command, Commander, Error, HashedCommand, Metric, MetricItem, Protocol, RequestProcessor, - Result, Stream, Writer, + Attachment, Command, Commander, Error, HashedCommand, Metric, MetricItem, Protocol, + RequestProcessor, Result, Stream, Writer, }; use chrono::NaiveDate; use ds::RingSlice; use sharding::hash::Hash; +use self::attachment::VecAttach; +use self::packet::RedisPack; use self::reqpacket::RequestPacket; use self::rsppacket::ResponsePacket; use crate::kv::client::Client; @@ -117,13 +121,34 @@ impl Protocol for Vector { w.write("-ERR ".as_bytes())?; w.write_slice(response, 0)?; // mysql返回的错误信息 w.write("\r\n".as_bytes())?; - return Ok(()); } else { - // response已封装为redis协议。正常响应有三种: - // 1. 只返回影响的行数 - // 2. 一行或多行数据 - // 3. 结果为空 - w.write_slice(response, 0)?; // value + if ctx.attachment().is_some() { + // 有attachment: 组装rsp: header(vec[0]) + *body_tokens + vec[1..] + let attach = VecAttach::attach(ctx.attachment().unwrap()); + if attach.body_token_count() > 0 { + w.write(attach.header())?; + w.write(format!("*{}\r\n", attach.body_token_count()).as_bytes())?; + for b in attach.body() { + w.write(b.as_slice())?; + } + } else { + // 返回空 + w.write("$-1\r\n".as_bytes())?; + } + } else { + // 无attachment: response已封装为redis协议。正常响应有三种: + // 1. 只返回影响的行数 + // 2. 一行或多行数据 + // 3. 结果为空 + if response.header.rows > 0 { + w.write(response.header.header.as_ref())?; + w.write( + format!("*{}\r\n", response.header.rows * response.header.columns) + .as_bytes(), + )?; + } + w.write_slice(response, 0)?; // value + } } return Ok(()); } @@ -149,6 +174,42 @@ impl Protocol for Vector { log::debug!("+++ send to client padding {:?}", ctx.request()); Ok(()) } + + // 将中间响应放到attachment中,方便后续继续查询 + // 先收集si信息,再收集body + // 返回值:是否需要继续查询 + #[inline] + fn update_attachment(&self, attachment: &mut Attachment, response: &mut Command) -> bool { + assert!(response.ok()); + let attach = VecAttach::attach_mut(attachment); + //收到响应就算ok,响应有问题也不会发送到topo了 + attach.rsp_ok = true; + + if attach.is_empty() { + // TODO 先打通,此处的内存操作需要考虑优化 fishermen + let mut header_data = Vec::new(); + let header = &mut response.header; + mem::swap(&mut header_data, &mut header.header); + attach.attach_header(header_data); + } + + // TODO 先打通,此处的内存操作需要考虑优化 fishermen + match attach.has_si() { + true => { + if response.header.rows > 0 { + let header = &response.header; + attach.attach_body(response.data().0.to_vec(), header.rows, header.columns); + } + attach.left_count == 0 + } + // 按si解析响应: 未成功获取有效si信息或者解析si失败,并终止后续请求 + false => response.count() == 0 || !attach.attach_si(response), + } + } + #[inline] + fn drop_attach(&self, att: Attachment) { + let _ = VecAttach::from(att); + } } impl Vector { @@ -213,41 +274,11 @@ impl Vector { Ok(cmd) => Ok(cmd), Err(crate::kv::error::Error::UnhandleResponseError(emsg)) => { // 对于UnhandleResponseError,需要构建rsp,发给client - let cmd = rsp_packet.build_final_rsp_cmd(false, emsg); + let cmd = rsp_packet.build_final_rsp_cmd(false, RedisPack::with_simple(emsg)); Ok(cmd) } Err(e) => Err(e.into()), } - - // let meta = match rsp_packet.parse_result_set_meta() { - // Ok(meta) => meta, - // Err(crate::kv::error::Error::UnhandleResponseError(emsg)) => { - // // 对于UnhandleResponseError,需要构建rsp,发给client - // let cmd = rsp_packet.build_final_rsp_cmd(false, emsg); - // return Ok(cmd); - // } - // Err(e) => return Err(e.into()), - // }; - - // // 如果是只有meta的ok packet,直接返回影响的列数,如insert/delete/update - // if let Or::B(ok) = meta { - // let affected = ok.affected_rows(); - // let cmd = rsp_packet.build_final_affected_rows_rsp_cmd(affected); - // return Ok(cmd); - // } - - // // 解析meta后面的rows,返回列记录,如select - // // 有可能多行数据,直接build成 - // let mut query_result: QueryResult = QueryResult::new(rsp_packet, meta); - // match query_result.parse_rows_to_cmd() { - // Ok(cmd) => Ok(cmd), - // Err(crate::kv::error::Error::UnhandleResponseError(emsg)) => { - // // 对于UnhandleResponseError,需要构建rsp,发给client - // let cmd = query_result.build_final_rsp_cmd(false, emsg); - // Ok(cmd) - // } - // Err(e) => Err(e.into()), - // } } } @@ -272,6 +303,8 @@ pub(crate) const COND_ORDER: &[u8] = b"ORDER"; pub(crate) const COND_LIMIT: &[u8] = b"LIMIT"; pub(crate) const COND_GROUP: &[u8] = b"GROUP"; +const DEFAULT_LIMIT: usize = 15; + #[derive(Debug, Clone, Default)] pub struct Condition { pub field: RingSlice, @@ -326,6 +359,16 @@ pub struct VectorCmd { pub group_by: GroupBy, } +impl VectorCmd { + #[inline(always)] + pub fn limit(&self) -> usize { + match self.limit.limit.try_str_num(..) { + Some(limit) => limit, + None => DEFAULT_LIMIT, + } + } +} + /// field 字段的值,对于‘field’关键字,值是|分隔的field names,否则就是二进制value #[derive(Debug, Clone)] pub enum FieldVal { @@ -375,4 +418,7 @@ pub trait Strategy { //todo 通过代理类型实现 fn condition_keys(&self) -> Box> + '_>; fn write_database_table(&self, buf: &mut impl Write, date: &NaiveDate, hash: i64); + fn write_si_database_table(&self, buf: &mut impl Write, hash: i64); + fn batch(&self, limit: u64, vcmd: &VectorCmd) -> u64; + fn si_cols(&self) -> &[String]; } diff --git a/protocol/src/vector/mysql.rs b/protocol/src/vector/mysql.rs index 33315317a..8c960f8dc 100644 --- a/protocol/src/vector/mysql.rs +++ b/protocol/src/vector/mysql.rs @@ -157,12 +157,12 @@ impl<'a> Display for UpdateFields<'a> { } } -struct KeysAndCondsAndOrderAndLimit<'a, S>(&'a S, &'a VectorCmd); +struct KeysAndCondsAndOrderAndLimit<'a, S>(&'a S, &'a VectorCmd, u64); impl<'a, S: Strategy> Display for KeysAndCondsAndOrderAndLimit<'a, S> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let &Self( strategy, - VectorCmd { + vcmd @ VectorCmd { cmd: _, keys, fields: _, @@ -171,6 +171,7 @@ impl<'a, S: Strategy> Display for KeysAndCondsAndOrderAndLimit<'a, S> { limit, group_by, }, + extra, ) = self; for (i, key) in (&mut strategy.condition_keys()).enumerate() { if let Some(key) = key { @@ -195,7 +196,10 @@ impl<'a, S: Strategy> Display for KeysAndCondsAndOrderAndLimit<'a, S> { VRingSlice(&order.order) ); } - if limit.offset.len() != 0 { + let strategy_limit = strategy.batch(extra, vcmd); + if strategy_limit > 0 { + let _ = write!(f, " limit {}", strategy.batch(extra, vcmd),); + } else if limit.offset.len() != 0 { let _ = write!( f, " limit {} offset {}", @@ -212,10 +216,17 @@ pub struct SqlBuilder<'a, S> { hash: i64, date: NaiveDate, strategy: &'a S, + limit: u64, } impl<'a, S: Strategy> SqlBuilder<'a, S> { - pub fn new(vcmd: &'a VectorCmd, hash: i64, date: NaiveDate, strategy: &'a S) -> Result { + pub fn new( + vcmd: &'a VectorCmd, + hash: i64, + date: NaiveDate, + strategy: &'a S, + limit: u64, + ) -> Result { if vcmd.keys.len() != strategy.keys().len() { Err(Error::RequestProtocolInvalid) } else { @@ -224,6 +235,7 @@ impl<'a, S: Strategy> SqlBuilder<'a, S> { hash, date, strategy, + limit, }) } } @@ -280,7 +292,6 @@ impl<'a, S: Strategy> VectorSqlBuilder for SqlBuilder<'a, S> { } fn write_sql(&self, buf: &mut impl Write) { - // let cmd_type = vector::get_cmd_type(self.op).unwrap_or(vector::CommandType::Unknown); match self.vcmd.cmd { CommandType::VRange | CommandType::VGet => { let _ = write!( @@ -288,7 +299,7 @@ impl<'a, S: Strategy> VectorSqlBuilder for SqlBuilder<'a, S> { "select {} from {} where {}", Select(self.vcmd.fields.get(0)), Table(self.strategy, &self.date, self.hash), - KeysAndCondsAndOrderAndLimit(self.strategy, &self.vcmd), + KeysAndCondsAndOrderAndLimit(self.strategy, &self.vcmd, self.limit), ); } CommandType::VCard => { @@ -296,7 +307,7 @@ impl<'a, S: Strategy> VectorSqlBuilder for SqlBuilder<'a, S> { buf, "select count(*) from {} where {}", Table(self.strategy, &self.date, self.hash), - KeysAndCondsAndOrderAndLimit(self.strategy, &self.vcmd), + KeysAndCondsAndOrderAndLimit(self.strategy, &self.vcmd, self.limit), ); } CommandType::VAdd => { @@ -314,7 +325,7 @@ impl<'a, S: Strategy> VectorSqlBuilder for SqlBuilder<'a, S> { "update {} set {} where {}", Table(self.strategy, &self.date, self.hash), UpdateFields(&self.vcmd.fields), - KeysAndCondsAndOrderAndLimit(self.strategy, &self.vcmd), + KeysAndCondsAndOrderAndLimit(self.strategy, &self.vcmd, self.limit), ); } CommandType::VDel => { @@ -322,7 +333,7 @@ impl<'a, S: Strategy> VectorSqlBuilder for SqlBuilder<'a, S> { buf, "delete from {} where {}", Table(self.strategy, &self.date, self.hash), - KeysAndCondsAndOrderAndLimit(self.strategy, &self.vcmd), + KeysAndCondsAndOrderAndLimit(self.strategy, &self.vcmd, self.limit), ); } _ => { @@ -332,3 +343,110 @@ impl<'a, S: Strategy> VectorSqlBuilder for SqlBuilder<'a, S> { } } } + +pub struct SiSqlBuilder<'a, S> { + vcmd: &'a VectorCmd, + hash: i64, + strategy: &'a S, +} + +impl<'a, S: Strategy> SiSqlBuilder<'a, S> { + pub fn new(vcmd: &'a VectorCmd, hash: i64, strategy: &'a S) -> Result { + if vcmd.keys.len() != strategy.keys().len() { + Err(Error::RequestProtocolInvalid) + } else { + Ok(Self { + vcmd, + hash, + strategy, + }) + } + } +} + +impl<'a, S> MysqlBinary for SiSqlBuilder<'a, S> { + fn mysql_cmd(&self) -> Command { + Command::COM_QUERY + } +} + +impl<'a, S: Strategy> VectorSqlBuilder for SiSqlBuilder<'a, S> { + fn len(&self) -> usize { + 128 + } + + // (1) 根据object_type查用户的si数据 + // select uid, start_date as stat_date, sum(count) as count from $db$.$tb$ where uid=? and object_type in(?) group by uid, start_date order by start_date desc + + // (2) 查用户所有的si数据 + // select uid, start_date as stat_date, sum(count) as count from $db$.$tb$ where uid=? group by start_date order by start_date desc + + // select date,count字段名, + // 条件需要key,字段名 + fn write_sql(&self, buf: &mut impl Write) { + match self.vcmd.cmd { + CommandType::VRange => { + let _ = write!( + buf, + "select {} from {} where {}", + SiSelect(self.strategy.keys(), self.strategy.si_cols()), + SiTable(self.strategy, self.hash), + SiKeysAndCondsAndOrder(self.strategy, &self.vcmd), + ); + } + _ => { + //校验应该在parser_req出 + panic!("not support cmd_type:{:?}", self.vcmd.cmd); + } + } + } +} + +//keys, cols +struct SiSelect<'a>(&'a [String], &'a [String]); +impl<'a> Display for SiSelect<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // select key, start_date, sum(count) + write!( + f, + "{},{},sum({})", + self.0[0], + self.1[0], + self.1.last().unwrap() + ) + } +} + +struct SiKeysAndCondsAndOrder<'a, S>(&'a S, &'a VectorCmd); +impl<'a, S: Strategy> Display for SiKeysAndCondsAndOrder<'a, S> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let &Self(strategy, VectorCmd { keys, wheres, .. }) = self; + let key_name = &strategy.keys()[0]; + let cols = strategy.si_cols(); + let _ = write!(f, "`{}`={}", key_name, Val(&keys[0])); + for w in wheres { + //条件中和si相同的列写入条件 + for col in cols { + if w.field.equal(col.as_bytes()) { + let _ = write!(f, " and {}", ConditionDisplay(w)); + break; + } + } + } + //按key和日期group,按日期倒叙排 + let _ = write!( + f, + " group by {},{} order by {} desc", + key_name, cols[0], cols[0] + ); + Ok(()) + } +} + +struct SiTable<'a, S>(&'a S, i64); +impl<'a, S: Strategy> Display for SiTable<'a, S> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.write_si_database_table(f, self.1); + Ok(()) + } +} diff --git a/protocol/src/vector/packet.rs b/protocol/src/vector/packet.rs index 8a9302fd4..8d4898a9a 100644 --- a/protocol/src/vector/packet.rs +++ b/protocol/src/vector/packet.rs @@ -4,14 +4,17 @@ use std::{fmt::Display, num::NonZeroUsize}; use ds::{ByteOrder, RingSlice}; -use crate::kv::{ - common::{ - constants::{CapabilityFlags, StatusFlags}, - error::Error::MySqlError, - packets::{ErrPacket, OkPacket, OkPacketDeserializer, OkPacketKind}, - ParseBuf, +use crate::{ + kv::{ + common::{ + constants::{CapabilityFlags, StatusFlags}, + error::Error::MySqlError, + packets::{ErrPacket, OkPacket, OkPacketDeserializer, OkPacketKind}, + ParseBuf, + }, + error::{Error, Result}, }, - error::{Error, Result}, + Command, ResponseHeader, }; const HEADER_LEN: usize = 4; @@ -191,3 +194,56 @@ impl Display for MysqlPacket { write!(f, "seq: {}, payload:{:?}", self.seq, self.payload) } } + +#[derive(Debug)] +pub struct RedisPackAssemble { + pub(crate) header: Vec, + pub(crate) body: Vec, + pub(crate) rows: u16, + pub(crate) columns: u16, +} + +#[derive(Debug)] +pub struct RedisPackSimple { + pub(crate) packet: Vec, +} + +#[derive(Debug)] +pub enum RedisPack { + Assemble(RedisPackAssemble), + Simple(RedisPackSimple), +} + +impl RedisPack { + #[inline] + pub fn with_simple(packet: Vec) -> Self { + let pack = RedisPackSimple { packet }; + RedisPack::Simple(pack) + } + + #[inline] + pub fn with_assamble(header: Vec, body: Vec, rows: u16, columns: u16) -> Self { + let assemble = RedisPackAssemble { + header, + body, + rows, + columns, + }; + RedisPack::Assemble(assemble) + } + + #[inline] + pub fn to_cmd(self, ok: bool) -> Command { + match self { + RedisPack::Simple(pack) => { + let mem = ds::MemGuard::from_vec(pack.packet); + Command::from(ok, mem) + } + RedisPack::Assemble(assemble) => { + let body = ds::MemGuard::from_vec(assemble.body); + let header = ResponseHeader::new(assemble.header, assemble.rows, assemble.columns); + Command::with_assemble_pack(ok, header, body) + } + } + } +} diff --git a/protocol/src/vector/query_result.rs b/protocol/src/vector/query_result.rs index 0b00872c8..994d86428 100644 --- a/protocol/src/vector/query_result.rs +++ b/protocol/src/vector/query_result.rs @@ -4,7 +4,7 @@ use crate::kv::error::Result; pub use crate::kv::common::proto::Text; -use super::packet::MysqlRawPacket; +use super::packet::{MysqlRawPacket, RedisPack}; use crate::kv::common::{io::ParseBuf, packets::OkPacket, row::RowDeserializer}; use std::marker::PhantomData; @@ -125,7 +125,7 @@ impl QueryResult { /// 解析meta后面的rows #[inline(always)] - pub(crate) fn parse_rows_to_redis(&mut self, oft: &mut usize) -> Result> { + pub(crate) fn parse_rows_to_redis(&mut self, oft: &mut usize) -> Result { // 解析出mysql rows // 改为每次只处理本次的响应 let mut rows = Vec::with_capacity(8); @@ -182,12 +182,12 @@ impl QueryResult { } #[inline] -pub fn format_to_redis(rows: &Vec) -> Vec { +pub fn format_to_redis(rows: &Vec) -> RedisPack { let mut data = Vec::with_capacity(32 * rows.len()); // 响应为空,返回 if rows.len() == 0 { data.extend_from_slice("$-1\r\n".as_bytes()); - return data; + return RedisPack::with_simple(data); } let columns = rows.get(0).expect("columns unexists").columns_ref(); @@ -198,12 +198,11 @@ pub fn format_to_redis(rows: &Vec) -> Vec { const VCARD_NAME: &[u8] = b"count(*)"; if columns.len() == 1 && columns[0].name_ref().eq(VCARD_NAME) { format_for_vcard(rows, &mut data); - return data; + return RedisPack::with_simple(data); } // 至此,只有vrange了(select * ..),后续可能还有其他协议 - format_for_commons(rows, &mut data, columns); - data + format_for_commons(rows, columns) } fn format_for_vcard(rows: &Vec, data: &mut Vec) { @@ -218,29 +217,37 @@ fn format_for_vcard(rows: &Vec, data: &mut Vec) { } /// 为vrange等带column header + rows的指令构建响应 -fn format_for_commons(rows: &Vec, data: &mut Vec, columns: &[Column]) { +fn format_for_commons(rows: &Vec, columns: &[Column]) -> RedisPack { + // TODO old 实现,测试完毕后清理 fishermen // 构建 resp协议的header总array计数 以及 column的计数 - data.put(&b"*2\r\n*"[..]); - data.put(columns.len().to_string().as_bytes()); - data.put(CRLF); + // header.put(&b"*2\r\n*"[..]); + + // header记录*2以及column name,最后发送时再拼装,其只拼装第一个包的header + let mut header = Vec::with_capacity(6 + 8 * columns.len()); + header.put(&b"*2\r\n*"[..]); + header.put(columns.len().to_string().as_bytes()); + header.put(CRLF); // 构建columns 内容 for idx in 0..columns.len() { let col = columns.get(idx).expect("column"); - data.push(b'+'); - data.put(col.name_str().as_bytes()); - data.put(CRLF); + header.push(b'+'); + header.put(col.name_str().as_bytes()); + header.put(CRLF); } // 构建column values // 先构建value的header - let val_count = columns.len() * rows.len(); - data.put_u8(b'*'); - data.put(val_count.to_string().as_bytes()); - data.put(CRLF); + // let val_count = columns.len() * rows.len(); + // data.put_u8(b'*'); + // data.put(val_count.to_string().as_bytes()); + // data.put(CRLF); // 再写入每个row的val + + let mut body = Vec::with_capacity(64 * rows.len()); for ri in 0..rows.len() { let row = rows.get(ri).expect("row unexists"); - row.write_as_redis(data); + row.write_as_redis(&mut body); } + RedisPack::with_assamble(header, body, rows.len() as u16, columns.len() as u16) } diff --git a/protocol/src/vector/redis.rs b/protocol/src/vector/redis.rs index aa6bc4f06..2eb025183 100644 --- a/protocol/src/vector/redis.rs +++ b/protocol/src/vector/redis.rs @@ -1,15 +1,14 @@ use super::{command::get_cfg, flager::KvFlager, *}; -use crate::{HashedCommand, Packet, Result}; + +use crate::{Flag, Packet, Result}; use ds::RingSlice; pub(crate) const FIELD_BYTES: &'static [u8] = b"FIELD"; - pub(crate) const KVECTOR_SEPARATOR: u8 = b','; /// 根据parse的结果,此处进一步获得kvector的detail/具体字段信息,以便进行sql构建 -pub fn parse_vector_detail(cmd: &HashedCommand) -> crate::Result { - let data = Packet::from(cmd.sub_slice(0, cmd.len())); - let flag = cmd.flag(); +pub fn parse_vector_detail(cmd: RingSlice, flag: &Flag) -> crate::Result { + let data = Packet::from(cmd); let mut vcmd: VectorCmd = Default::default(); vcmd.cmd = get_cfg(flag.op_code())?.cmd_type; diff --git a/protocol/src/vector/reqpacket.rs b/protocol/src/vector/reqpacket.rs index e00d3fddc..a8e05dd36 100644 --- a/protocol/src/vector/reqpacket.rs +++ b/protocol/src/vector/reqpacket.rs @@ -103,10 +103,14 @@ impl<'a, S: crate::Stream> RequestPacket<'a, S> { assert!(cfg.has_key, "{:?}", self); let key = self.parse_key(flag)?; + if self.bulks == 0 { + return Ok(Some(self.main_key(&key))); + } + // 如果有field,则接下来解析fields,不管是否有field,统一先把where这个token消费掉,方便后续统一从condition位置解析 if cfg.can_hold_field { self.parse_fields(flag)?; - } else if self.bulks > 0 && cfg.can_hold_where_condition { + } else if cfg.can_hold_where_condition { // 如果还有bulks,且该cmd可以hold where condition,此处肯定是where token,直接读出skip掉 let token = self.next_bulk_string()?; if !token.equal_ignore_case(BYTES_WHERE) || self.bulks < 1 { @@ -128,8 +132,7 @@ impl<'a, S: crate::Stream> RequestPacket<'a, S> { log::debug!("++++ after condition parsed oft:{}", self.oft); // 返回main-key,不带sub-key、ext-key - let main_key_len = key.find(0, KEY_SEPERATOR).map_or(key.len(), |len| len); - Ok(Some(key.sub_slice(0, main_key_len))) + Ok(Some(self.main_key(&key))) } #[inline] @@ -159,16 +162,12 @@ impl<'a, S: crate::Stream> RequestPacket<'a, S> { } } - // #[inline] - // fn parse_cmd_name(&mut self) -> Result<()> { - // // 第一个bulk是bulk-string类型的cmd - // let cmd = self.next_bulk_string()?; - // self.cmd_type = cmd.into(); - // if self.cmd_type.is_invalid() { - // return Err(Error::FlushOnClose(ERR_UNSUPPORT_CMD.into())); - // } - // Ok(()) - // } + /// 获取主key,不带sub-key、ext-key + #[inline] + fn main_key(&self, key: &RingSlice) -> RingSlice { + let main_key_len = key.find(0, KEY_SEPERATOR).map_or(key.len(), |len| len); + key.sub_slice(0, main_key_len) + } /// 读取下一个bulk string,bulks会减1 #[inline] diff --git a/protocol/src/vector/rsppacket.rs b/protocol/src/vector/rsppacket.rs index 1be3b8a2d..ba6e1bad9 100644 --- a/protocol/src/vector/rsppacket.rs +++ b/protocol/src/vector/rsppacket.rs @@ -19,6 +19,7 @@ use crate::{Command, StreamContext}; use ds::RingSlice; use super::packet::MysqlRawPacket; +use super::packet::RedisPack; use super::query_result::{QueryResult, Text}; // const HEADER_LEN: usize = 4; @@ -92,6 +93,7 @@ impl<'a, S: crate::Stream> ResponsePacket<'a, S> { let mut query_result: QueryResult = QueryResult::new(self.data.clone(), self.has_results, meta); // 解析出mysql rows + // let (redis_data, count) = query_result.parse_rows_to_redis(&mut self.oft)?; let redis_data = query_result.parse_rows_to_redis(&mut self.oft)?; // 构建响应 @@ -136,10 +138,16 @@ impl<'a, S: crate::Stream> ResponsePacket<'a, S> { /// 构建最终的响应,并对已解析的内容进行take #[inline(always)] - pub(super) fn build_final_rsp_cmd(&mut self, ok: bool, rsp_data: Vec) -> Command { + pub(super) fn build_final_rsp_cmd(&mut self, ok: bool, redis_pack: RedisPack) -> Command { // 构建最终返回给client的响应内容 - let mem = ds::MemGuard::from_vec(rsp_data); - let cmd = Command::from(ok, mem); + let cmd = redis_pack.to_cmd(ok); + + // TODO 冲突,暂时注释掉 + // // 构建最终返回给client的响应内容 + // let mem = ds::MemGuard::from_vec(rsp_data); + // let mut cmd = Command::from(ok, mem); + // cmd.set_count(count); + log::debug!("+++ build kvector rsp, ok:{} => {:?}", ok, cmd); // 返回最终响应前,take走已经解析的数据 diff --git a/stream/src/context.rs b/stream/src/context.rs index 6987f406f..10df719b3 100644 --- a/stream/src/context.rs +++ b/stream/src/context.rs @@ -1,8 +1,8 @@ use std::{marker::PhantomData, ptr::NonNull, sync::Arc}; use protocol::{ - callback::CallbackContext, request::Request, Command, Commander, HashedCommand, Metric, - MetricItem, Protocol, + callback::CallbackContext, request::Request, Attachment, Command, Commander, HashedCommand, + Metric, MetricItem, Protocol, }; use crate::arena::CallbackContextArena; @@ -118,4 +118,8 @@ impl<'a, M: Metric, T: MetricItem, F: Fn(i64) -> usize> Commander fn ctx(&self) -> u64 { self.ctx.flag() } + #[inline] + fn attachment(&self) -> Option<&Attachment> { + self.ctx.attachment() + } } diff --git a/stream/src/handler.rs b/stream/src/handler.rs index 42e31194f..f0b805d78 100644 --- a/stream/src/handler.rs +++ b/stream/src/handler.rs @@ -144,7 +144,7 @@ where // 统计请求耗时。 self.rtt += start.elapsed(); self.parser.check(&*req, &cmd); - req.on_complete(cmd); + req.on_complete(&self.parser, cmd); continue; } if l == self.s.len() { diff --git a/stream/src/pipeline.rs b/stream/src/pipeline.rs index f0e8490c2..b7a9e6163 100644 --- a/stream/src/pipeline.rs +++ b/stream/src/pipeline.rs @@ -11,7 +11,7 @@ use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use crate::topology::TopologyCheck; use ds::{time::Instant, AtomicWaker}; use endpoint::Topology; -use protocol::Error::FlushOnClose; +use protocol::{Attachment, Error::FlushOnClose}; use protocol::{HashedCommand, Protocol, Result, Stream}; use crate::{ @@ -124,6 +124,7 @@ where arena: &mut self.arena, retry_on_rsp_notok: self.parser.config().retry_on_rsp_notok, parser: &self.parser, + has_attach: self.top.has_attach(), }; self.parser @@ -163,7 +164,6 @@ where *self.metrics.key() += 1; let mut response = ctx.take_response(); - self.parser.write_response( &mut ResponseContext::new(&mut ctx, &self.metrics, |hash| self.top.shard_idx(hash)), response.as_mut(), @@ -222,6 +222,7 @@ struct Visitor<'a, T, P> { first: &'a mut bool, arena: &'a mut CallbackContextArena, retry_on_rsp_notok: bool, + has_attach: bool, } impl<'a, T: Topology + TopologyCheck, P: Protocol> protocol::RequestProcessor @@ -235,6 +236,12 @@ impl<'a, T: Topology + TopologyCheck, P: Protocol> protocol::Req *self.first = last; let cb = self.top.callback(); let req_op = cmd.operation(); + let drop_attach: Option> = if self.has_attach { + let parser = self.parser.clone(); + Some(Box::new(move |att| parser.drop_attach(att))) + } else { + None + }; let ctx = self.arena.alloc(CallbackContext::new( cmd, self.waker, @@ -243,6 +250,7 @@ impl<'a, T: Topology + TopologyCheck, P: Protocol> protocol::Req last, self.retry_on_rsp_notok, self.parser.max_tries(req_op), + drop_attach, )); let mut ctx = CallbackContextPtr::from(ctx, self.arena); diff --git a/stream/src/topology.rs b/stream/src/topology.rs index dd08023df..a185c2125 100644 --- a/stream/src/topology.rs +++ b/stream/src/topology.rs @@ -83,4 +83,7 @@ impl Topology for CheckedTopology { fn exp_sec(&self) -> u32 { self.top.exp_sec() } + fn has_attach(&self) -> bool { + self.top.has_attach() + } } diff --git a/tests/src/benches/redis.rs b/tests/src/benches/redis.rs index 5f681223a..99031b731 100644 --- a/tests/src/benches/redis.rs +++ b/tests/src/benches/redis.rs @@ -173,6 +173,10 @@ mod proto_hook { fn ctx(&self) -> u64 { todo!() } + + fn attachment(&self) -> Option<&protocol::Attachment> { + todo!() + } } #[derive(Debug)] pub(crate) struct TestStream { diff --git a/tests/src/layout.rs b/tests/src/layout.rs index 293972061..1518bb3a0 100644 --- a/tests/src/layout.rs +++ b/tests/src/layout.rs @@ -49,6 +49,10 @@ fn checkout_basic() { assert_eq!(40, size_of::()); assert_eq!(192, size_of::()); assert_eq!(24, size_of::()); + assert_eq!( + size_of::(), + size_of::() + ); } // 如果要验证 layout-min模式,需要 --features layout-min --release --no-default-features @@ -60,7 +64,7 @@ fn check_layout_rx_buffer() { #[ignore] #[test] fn check_callback_ctx() { - assert_eq!(192, size_of::()); + assert_eq!(520, size_of::()); //assert_eq!(16, size_of::()); } //#[ignore] diff --git a/tests/src/mq/protocol.rs b/tests/src/mq/protocol.rs index a3f225327..f24612a8f 100644 --- a/tests/src/mq/protocol.rs +++ b/tests/src/mq/protocol.rs @@ -1,11 +1,11 @@ +use std::cell::UnsafeCell; + use crate::proto_hook; use protocol::{ msgque::{MsgQue, OP_GET, OP_QUIT, OP_SET, OP_STATS, OP_VERSION}, - Error, Proto, + Attachment, BufRead, Commander, Error, HashedCommand, Metric, Proto, }; -use protocol::BufRead; - /// 请求以任意长度发送 #[test] fn test_req_reenter() { @@ -228,6 +228,67 @@ fn test_rsp() { } } +#[allow(dead_code)] +struct TestCtx { + req: HashedCommand, + metric: TestMetric, +} + +impl TestCtx { + #[allow(dead_code)] + fn new(req: HashedCommand) -> Self { + Self { + req, + metric: TestMetric { + item: UnsafeCell::new(TestMetricItem {}), + }, + } + } +} + +struct TestMetricItem {} +impl std::ops::AddAssign for TestMetricItem { + fn add_assign(&mut self, _rhs: i64) {} +} +impl std::ops::AddAssign for TestMetricItem { + fn add_assign(&mut self, _rhs: bool) {} +} + +struct TestMetric { + item: UnsafeCell, +} +impl Metric for TestMetric { + fn get(&self, _name: protocol::MetricName) -> &mut TestMetricItem { + unsafe { &mut *self.item.get() } + } +} + +impl Commander for TestCtx { + fn request_mut(&mut self) -> &mut HashedCommand { + todo!() + } + + fn request(&self) -> &HashedCommand { + &self.req + } + + fn request_shard(&self) -> usize { + todo!() + } + + fn metric(&self) -> &TestMetric { + &self.metric + } + + fn ctx(&self) -> u64 { + todo!() + } + + fn attachment(&self) -> Option<&Attachment> { + todo!() + } +} + #[test] fn test_write_response() { let proto = MsgQue; diff --git a/tests/src/proto_hook.rs b/tests/src/proto_hook.rs index 42ede040c..22d3952ad 100644 --- a/tests/src/proto_hook.rs +++ b/tests/src/proto_hook.rs @@ -77,6 +77,10 @@ impl Commander for TestCtx { fn ctx(&self) -> u64 { todo!() } + + fn attachment(&self) -> Option<&protocol::Attachment> { + todo!() + } } #[derive(Debug)] pub(crate) struct TestStream {