Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

支持新hash&打开msgque #371

Merged
merged 6 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion endpoint/src/topo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use discovery::Inited;
use protocol::{Protocol, ResOption, Resource};
use sharding::hash::{Hash, HashKey};

use crate::msgque::topo::MsgQue;
use crate::Timeout;

use enum_dispatch::enum_dispatch;
Expand Down Expand Up @@ -215,7 +216,7 @@ use crate::redisservice::topo::RedisService;
use crate::uuid::topo::UuidService;

define_topology! {
//MsgQue<B, E, R, P>, MsgQue, "mq";
MsgQue<B, E, R, P>, MsgQue, "mq";
RedisService<B, E, R, P>, RedisService, "rs";
CacheService<B, E, R, P>, CacheService, "cs";
PhantomService<B, E, R, P>, PhantomService, "pt";
Expand Down
32 changes: 19 additions & 13 deletions protocol/src/kv/common/packets/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use lexical::parse;
use smallvec::SmallVec;
use uuid::Uuid;

use std::fmt::Display;
// use std::fmt::{write, Display};
use std::str::FromStr;
use std::{
Expand Down Expand Up @@ -1744,7 +1745,9 @@ impl HandshakePacket {
/// Actual serialization of this field depends on capability flags values.
type ScrambleBuf = Either<RawBytes<LenEnc>, Either<RawBytes<U8Bytes>, RawBytes<NullBytes>>>;

#[derive(Debug, Clone, PartialEq, Eq)]
// TODO connect_attributes 的Debug有问题,先去掉Debug属性,等修复后再打开 fishermen
// #[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Clone, PartialEq, Eq)]
pub struct HandshakeResponse {
capabilities: Const<CapabilityFlags, LeU32>,
collation: RawInt<u8>,
Expand All @@ -1755,18 +1758,21 @@ pub struct HandshakeResponse {
connect_attributes: Option<HashMap<RawBytes<LenEnc>, RawBytes<LenEnc>>>,
}

// impl<'a> Display for HandshakeResponse<'a> {
// fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// write!(
// f,
// "HandshakeResponse[capabilities:{:?}, colloation:{}, user:{}, db_name:{:?}]",
// self.capabilities.0,
// self.collation.0,
// self.user.as_str(),
// self.db_name,
// )
// }
// }
impl Display for HandshakeResponse {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"HandshakeResponse[capabilities:{:?}, colloation:{}, user:{:?}, db_name:{:?}, scrable_buf:{:?}, auth_plugin:{:?}, connect_attributes:not-print-now]",
self.capabilities.0,
self.collation.0,
self.user.as_str(),
self.db_name,
self.scramble_buf,
self.auth_plugin,
// self.connect_attributes,
)
}
}

impl HandshakeResponse {
pub fn new(
Expand Down
1 change: 1 addition & 0 deletions protocol/src/kv/rsppacket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ impl<'a, S: crate::Stream> ResponsePacket<'a, S> {
client.capability_flags,
Some(client.connect_attrs()),
);
log::debug!("+++ kv handshake rsp: {}", handshake_response);
let mut buf: Vec<u8> = Vec::with_capacity(256);
handshake_response.serialize(&mut buf);
let mut src_buf = BytesMut::with_capacity(buf.len());
Expand Down
1 change: 1 addition & 0 deletions protocol/src/msgque/mcq/text/reqpacket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ impl<'a, S: crate::Stream> RequestPacket<'a, S> {

// TODO flags 如果业务已特殊设置,则先不予支持,然后根据实际场景确定方案? fishermen
if self.flags != 0 {
log::warn!("flag in msgque should be 0 but: {}", self.flags);
return Err(crate::Error::ProtocolNotSupported);
}

Expand Down
8 changes: 6 additions & 2 deletions sharding/src/distribution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod modrange;
mod modula;
//mod padding;
mod range;
mod secmod;
mod slotmod;
mod splitmod;

Expand All @@ -12,11 +13,11 @@ pub use dbrange::DBRange;
use modrange::ModRange;
use modula::Modula;
//use padding::Padding;
use self::secmod::SecMod;
use self::slotmod::SlotMod;
pub use range::Range;
use splitmod::SplitMod;

use crate::distribution::slotmod::SlotMod;

#[derive(Clone, Debug)]
pub enum Distribute {
//Padding(Padding),
Expand All @@ -26,6 +27,7 @@ pub enum Distribute {
ModRange(ModRange),
SplitMod(SplitMod),
SlotMod(SlotMod),
SecMod(SecMod),
}

//pub const DIST_PADDING: &str = "padding";
Expand Down Expand Up @@ -69,6 +71,7 @@ impl Distribute {
"modrange" => Self::ModRange(ModRange::from(num, names.len())),
"splitmod" => Self::SplitMod(SplitMod::from(num, names.len())),
"slotmod" => Self::SlotMod(SlotMod::from(num, names.len())),
"secmod" => Self::SecMod(SecMod::from(names.len())),
_ => {
log::warn!("'{}' is not valid , use modula instead", distribution);
Self::Modula(Modula::from(names.len(), false))
Expand Down Expand Up @@ -96,6 +99,7 @@ impl Distribute {
Self::ModRange(m) => m.index(hash),
Self::SplitMod(s) => s.index(hash),
Self::SlotMod(s) => s.index(hash),
Self::SecMod(s) => s.index(hash),
}
}
}
Expand Down
26 changes: 26 additions & 0 deletions sharding/src/distribution/secmod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/// 二阶modula,算法: hash / shards % shards
#[derive(Clone, Debug, Default)]
pub struct SecMod {
shard_count: usize,
}

impl SecMod {
pub fn from(shards: usize) -> Self {
assert!(shards > 0);
Self {
shard_count: shards,
}
}

pub fn index(&self, hash: i64) -> usize {
// 理论上,使用secmod的业务,hash不应该是负数; 说人话就是:对负数hash,具体idx不保证确定结果
if hash < 0 {
log::error!("found negative hash for secmod:{}", hash);
}
let idx = (hash as usize)
.wrapping_div(self.shard_count)
.wrapping_rem(self.shard_count);

idx
}
}
2 changes: 1 addition & 1 deletion tests/src/all.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//mod cow;
// mod distribute;
mod distribute;
// mod hash_test;
mod shard_test;
//mod memcached_text;
Expand Down
21 changes: 21 additions & 0 deletions tests/src/distribute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,25 @@ mod distribute_test {
let idx = dist.index(hash);
println!("key:{}, hash:{}, idx: {}", key, hash, idx);
}

#[test]
fn secmod() {
let hasher = Hasher::from("crc32abs");

// h14243dc752b5beac 对应i64算法为2461123049,i32算法为-1833844247,abs后为1833844247,
let key = "h14243dc752b5beac".to_string();
let hash = hasher.hash(&key.as_bytes());
println!("hash: {}, key:{} ", hash, key);
assert_eq!(hash, 1833844247);

let shards = vec![
"shard_0".to_string(),
"shard_1".to_string(),
"shard_2".to_string(),
];
let dist = Distribute::from("secmod", &shards);
let idx = dist.index(hash);
println!("idx:{}", idx);
assert_eq!(idx, 2);
}
}
Loading