Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Main kvector hybrid 202405 #464

Merged
merged 67 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
6a5070b
扩展ctx
viciousstar May 10, 2024
fd37b3f
batch策略
viciousstar May 10, 2024
3eddd34
batch topo
viciousstar May 10, 2024
fda4f62
batch sql
viciousstar May 13, 2024
520bc40
协议处理增加attachment机制
hustfisher May 14, 2024
9831d64
协议处理增加attachment机制
hustfisher May 14, 2024
683affd
batch sql
viciousstar May 13, 2024
d8ab1fa
增加mrange指令
parabala May 14, 2024
a076396
kvector一条req的多个response计数
parabala May 19, 2024
b9fd391
支持attachment的写入及更新
hustfisher May 21, 2024
d7019d5
fix conflicts
hustfisher May 21, 2024
f8cded8
add attachement file
hustfisher May 21, 2024
57e1dc3
sql构建时加上mrange
parabala May 21, 2024
6e5211d
去掉VectorService里面的loop_table成员,改用strategist判断是单个请求还是多个请求
parabala May 22, 2024
2299e17
pre month
viciousstar May 22, 2024
d298590
kvector对于batch策略,空响应不计入attach,max tries设为6,在response notok时需要重试
hustfisher May 22, 2024
695c902
Merge branch 'main_kvector_hybrid_202405' of https://github.com/weibo…
hustfisher May 22, 2024
1feaab3
为打通kvector流程,增加请求开始和结束相关设置
parabala May 23, 2024
41a38ec
kvector最后一条请求不更改resp的状态
parabala May 23, 2024
d0e4c93
add attachement to write_response
hustfisher May 23, 2024
54795d5
1 分拆attachment的header、body; 2 根据attachment写response
hustfisher May 23, 2024
eda5eba
使用attatch存储
viciousstar May 24, 2024
a1bff05
rspok
viciousstar May 24, 2024
d198a31
remove不必要的代码
parabala May 24, 2024
2ab9cf1
收到请求,给attach里面的rsp_ok置为true
parabala May 24, 2024
7396ade
必须提供limit
viciousstar May 24, 2024
63700ac
不论响应里是否有数据,attach里面的rsp_ok都置为true
parabala May 24, 2024
e6599e5
支持最简vrange指令
hustfisher May 28, 2024
09cc3ff
去掉mrange指令
parabala May 31, 2024
623d449
解析si信息
parabala Jun 6, 2024
9e93895
get shard
viciousstar Jun 12, 2024
f7202bf
vectro配置中增加si配置信息
parabala Jun 13, 2024
0567a4b
更新si相关实现,并用finish替换left_count
parabala Jun 13, 2024
f358ed3
get more shard
viciousstar Jun 13, 2024
254a8c4
build si sql
viciousstar Jun 17, 2024
ab74d88
select
viciousstar Jun 18, 2024
dcd0479
where
viciousstar Jun 18, 2024
fa74613
table
viciousstar Jun 18, 2024
81ec28a
si_dist
viciousstar Jun 18, 2024
a818923
增加si实现
parabala Jun 18, 2024
f9acab1
write si db
viciousstar Jun 18, 2024
c080239
si未获取到有效内容,退出请求
parabala Jun 19, 2024
382e355
fix联调中发现的问题
parabala Jun 21, 2024
5af6f88
根据讨论修改聚合访问配置结构
parabala Jun 26, 2024
d035314
优化聚合访问
parabala Jun 26, 2024
7e0d2be
恢复vector timeout_master timeout_slave实现
parabala Jun 27, 2024
0bd0e5e
redis解析数字上限从u32变更为usize
parabala Jun 27, 2024
3488d12
优化vector响应
parabala Jun 27, 2024
64305b1
test
viciousstar Jun 28, 2024
0f62758
rm warn
viciousstar Jun 28, 2024
4c3835a
fix vector响应
parabala Jun 28, 2024
5681198
移除attach,si_item不在序列化
viciousstar Jul 2, 2024
9598399
优化VDate
parabala Jul 2, 2024
8693bd7
drop att
viciousstar Jul 3, 2024
29b0700
vcmd 存到 attach
viciousstar Jul 3, 2024
eedb08e
使用last作为是否最后一轮标记
viciousstar Jul 4, 2024
92c2b7e
按需创建drop attach
viciousstar Jul 5, 2024
fdf6d22
优化生成VDate的内存分配次数
parabala Jul 5, 2024
423bca1
body按轮次创建
viciousstar Jul 5, 2024
683b2d3
优化生成VDate
parabala Jul 5, 2024
77770b0
has attach
viciousstar Jul 5, 2024
43e20a2
vector批量访问,如果遇到失败,则给client返回失败
parabala Jul 8, 2024
f99c23f
si的长度大于0,attach_si才算成功
parabala Jul 11, 2024
8efc5bd
comment
viciousstar Jul 11, 2024
2cc02f1
数据请求完不进行下一轮
viciousstar Jul 15, 2024
0c2f9c8
Merge remote-tracking branch 'origin/main' into main_kvector_hybrid_2…
viciousstar Aug 8, 2024
ffd12f1
rm merge err
viciousstar Aug 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions ds/src/mem/ring_slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ impl RingSlice {
#[inline]
pub fn try_str_num(&self, r: impl Range) -> Option<usize> {
let (start, end) = r.range(self);
if start == end {
return None;
}
let mut num = 0usize;
for i in start..end {
if !self[i].is_ascii_digit() {
Expand Down
1 change: 1 addition & 0 deletions endpoint/src/topo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ procs::topology_dispatcher! {

pub trait Topology : Endpoint + Hash{
fn exp_sec(&self) -> u32 {86400}
fn has_attach(&self) -> bool {false}
} => where P:Protocol, E:Endpoint<Item = R>, R:Request, Topologies<E, P>: Endpoint

trait Inited {
Expand Down
162 changes: 162 additions & 0 deletions endpoint/src/vector/batch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
use super::strategy::Postfix;
use chrono::{Datelike, NaiveDate};
use chrono_tz::Tz;
use core::fmt::Write;
use ds::RingSlice;
use protocol::Error;
use sharding::{distribution::DBRange, hash::Hasher};

#[derive(Clone, Debug)]
pub struct Batch {
db_prefix: String,
table_prefix: String,
table_postfix: Postfix,
hasher: Hasher,
distribution: DBRange,
keys_name: Vec<String>,
si_cols: Vec<String>,
si: Si,
}

impl Batch {
pub fn new_with_db(
db_prefix: String,
table_prefix: String,
db_count: u32,
shards: u32,
table_postfix: Postfix,
keys_name: Vec<String>,
si_cols: Vec<String>,
si_db_prefix: String,
si_db_count: u32,
si_table_prefix: String,
si_table_count: u32,
si_shards: u32,
) -> Self {
Self {
db_prefix,
table_prefix,
table_postfix,
distribution: DBRange::new(db_count as usize, 1usize, shards as usize),
hasher: Hasher::from("crc32"),
keys_name,
si_cols,
si: Si::new(
si_db_prefix,
si_db_count,
si_table_prefix,
si_table_count,
si_shards,
),
}
}

pub fn distribution(&self) -> &DBRange {
&self.distribution
}

pub fn si_distribution(&self) -> &DBRange {
self.si.distribution()
}

pub fn hasher(&self) -> &Hasher {
&self.hasher
}

pub fn get_date(&self, _: &[RingSlice]) -> Result<NaiveDate, Error> {
let now = chrono::Utc::now().with_timezone(&Tz::Asia__Shanghai);
Ok(NaiveDate::from_ymd_opt(now.year(), now.month(), now.day()).unwrap())
}

pub fn write_dname_with_hash(&self, buf: &mut impl Write, hash: i64) {
let db_idx: usize = self.distribution.db_idx(hash);
let _ = write!(buf, "{}_{}", self.db_prefix, db_idx);
}

pub fn write_tname_with_date(&self, buf: &mut impl Write, date: &NaiveDate) {
let (mut year, month, day) = (date.year(), date.month(), date.day());
year %= 100;
match self.table_postfix {
Postfix::YYMM => {
let _ = write!(buf, "{}_{:02}{:02}", &self.table_prefix, year, month);
}
//Postfix::YYMMDD
_ => {
let _ = write!(
buf,
"{}_{:02}{:02}{:02}",
&self.table_prefix, year, month, day
);
}
}
}

pub fn write_database_table(&self, buf: &mut impl Write, date: &NaiveDate, hash: i64) {
self.write_dname_with_hash(buf, hash);
let _ = buf.write_char('.');
self.write_tname_with_date(buf, date)
}

pub(crate) fn write_si_database_table(&self, buf: &mut impl Write, hash: i64) {
self.si.write_database_table(buf, hash)
}

pub(crate) fn condition_keys(&self) -> Box<dyn Iterator<Item = Option<&String>> + '_> {
Box::new(self.keys_name.iter().map(|x| Some(x)))
}

pub(crate) fn keys(&self) -> &[String] {
&self.keys_name
}

// pub(crate) fn get_next_date(&self, year: u16, month: u8) -> NaiveDate {
// if month == 1 {
// return NaiveDate::from_ymd_opt((year - 1).into(), 12, 1).unwrap();
// } else {
// return NaiveDate::from_ymd_opt(year.into(), (month - 1).into(), 1).unwrap();
// }
// }

pub(crate) fn batch(&self, limit: u64, _: &protocol::vector::VectorCmd) -> u64 {
limit
}

pub(crate) fn si_cols(&self) -> &[String] {
&self.si_cols
}
}

#[derive(Clone, Debug)]
struct Si {
db_prefix: String,
table_prefix: String,
distribution: DBRange,
}

impl Si {
fn new(
db_prefix: String,
db_count: u32,
table_prefix: String,
table_count: u32,
shards: u32,
) -> Self {
Self {
db_prefix: db_prefix,
table_prefix: table_prefix,
distribution: DBRange::new(db_count as usize, table_count as usize, shards as usize),
}
}
fn distribution(&self) -> &DBRange {
&self.distribution
}
fn write_database_table(&self, buf: &mut impl Write, hash: i64) {
let db_idx = self.distribution.db_idx(hash);
let table_idx = self.distribution.table_idx(hash);
let _ = write!(
buf,
"{}_{}.{}_{}",
self.db_prefix, db_idx, self.table_prefix, table_idx
);
}
}
35 changes: 32 additions & 3 deletions endpoint/src/vector/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ pub struct VectorNamespace {
pub(crate) backends_flaten: Vec<String>,
#[serde(default)]
pub(crate) backends: HashMap<Years, Vec<String>>,
#[serde(default)]
pub(crate) si_backends: Vec<String>,
}

#[derive(Debug, Clone, Default, Deserialize, Serialize)]
Expand Down Expand Up @@ -44,6 +46,20 @@ pub struct Basic {
pub(crate) user: String,
#[serde(default)]
pub(crate) region_enabled: bool,
#[serde(default)]
pub(crate) si_db_name: String,
#[serde(default)]
pub(crate) si_table_name: String,
#[serde(default)]
pub(crate) si_db_count: u32,
#[serde(default)]
pub(crate) si_table_count: u32,
#[serde(default)]
pub(crate) si_user: String,
#[serde(default)]
pub(crate) si_password: String,
#[serde(default)]
pub(crate) si_cols: Vec<String>,
}

impl VectorNamespace {
Expand All @@ -66,7 +82,7 @@ impl VectorNamespace {
}
last_year = year.1;
}
match ns.decrypt_password() {
match ns.decrypt_password(ns.basic.password.as_bytes()) {
Ok(password) => ns.basic.password = password,
Err(e) => {
log::warn!("failed to decrypt password, e:{}", e);
Expand All @@ -77,6 +93,19 @@ impl VectorNamespace {
init.extend_from_slice(b.1);
init
});
if ns.basic.si_password.len() > 0 {
match ns.decrypt_password(ns.basic.si_password.as_bytes()) {
Ok(password) => ns.basic.si_password = password,
Err(e) => {
log::warn!("failed to decrypt si password, e:{}", e);
return None;
}
}
}
if ns.si_backends.len() > 0 {
ns.backends_flaten
.extend_from_slice(ns.si_backends.as_slice());
}
Some(ns)
}
Err(e) => {
Expand All @@ -87,9 +116,9 @@ impl VectorNamespace {
}

#[inline]
fn decrypt_password(&self) -> Result<String, Box<dyn std::error::Error>> {
fn decrypt_password(&self, data: &[u8]) -> Result<String, Box<dyn std::error::Error>> {
let key_pem = fs::read_to_string(&context::get().key_path)?;
let encrypted_data = general_purpose::STANDARD.decode(self.basic.password.as_bytes())?;
let encrypted_data = general_purpose::STANDARD.decode(data)?;
let decrypted_data = ds::decrypt::decrypt_password(&key_pem, &encrypted_data)?;
let decrypted_string = String::from_utf8(decrypted_data)?;
Ok(decrypted_string)
Expand Down
1 change: 1 addition & 0 deletions endpoint/src/vector/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod batch;
pub(crate) mod config;
mod strategy;
pub mod topo;
Expand Down
Loading
Loading