From ab2a08d860d9ed68d868f1f24086fbd7ddd6d128 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=B3=A2?= Date: Thu, 26 Oct 2023 16:53:15 +0800 Subject: [PATCH 1/4] support doublebase --- endpoint/src/cacheservice/config.rs | 23 +++++- endpoint/src/cacheservice/mod.rs | 26 +++++- endpoint/src/cacheservice/topo.rs | 108 ++++++++++++++++++------- protocol/src/flag.rs | 25 +++--- protocol/src/memcache/binary/mod.rs | 3 + protocol/src/memcache/binary/packet.rs | 56 +++++++++---- 6 files changed, 180 insertions(+), 61 deletions(-) diff --git a/endpoint/src/cacheservice/config.rs b/endpoint/src/cacheservice/config.rs index fefbffd95..9f0642506 100644 --- a/endpoint/src/cacheservice/config.rs +++ b/endpoint/src/cacheservice/config.rs @@ -35,8 +35,9 @@ pub struct Namespace { pub timeout_ms_slave: u32, #[serde(default)] pub local_affinity: bool, + /// TODO 通过bit位,设置不同的策略/属性,详见下面Flag定义,新增bool属性,考虑统一设置到flag,从DoubleBase开始 fishermen #[serde(default)] - pub flag: u64, // 通过bit位,设置不同的策略/属性,详见下面Flag定义 + pub flag: u64, } // 通过bit位,设置不同的策略/属性;从低位开始依次排列 @@ -46,6 +47,24 @@ pub(crate) enum Flag { ForceWriteAll = 1, UpdateSlavel1 = 2, LocalAffinity = 3, + ///
+    /// DoubleBase默认为false,以master为准,即write master或gets失败后,即返回失败。
+    /// 若设置为true,则可以将slave也可以作为基准,在key维度,如果master节点异常,就以slave作为基准,目前影响cas/casq、add/addq、gets指令;
+    /// doubleBase设置不同值,对指令的影响:
+    /// 1 cas
+    ///   mcDoubleBase=false: cas master失败,则直接返回;
+    ///   mcDoubleBase=true:  cas master失败,再尝试cas slave,如果master、slave都失败,才返回;
+    ///                        cas slave 成功,说明以slave为准,此时则将master及其他所有层也set一次,并返回成功;
+    /// 2 add
+    ///   mcDoubleBase=false: add master失败,则直接返回;
+    ///   mcDoubleBase=true:  add master失败,如果master状态正常,直接返回;
+    ///   					  如果master、slave都失败,则返回失败;
+    ///   					  如果slave成功,则set master及其他层,并返回成功;
+    /// 3 gets:
+    ///   mcDoubleBase=false: gets master失败,则直接返回;
+    ///   mcDoubleBase=true:  gets master失败,再尝试getes slave,如果master、slave都失败,才返回;
+    /// 
+ DoubleBase = 4, } impl Namespace { @@ -193,4 +212,4 @@ impl<'a> Config<'a> { } None } -} \ No newline at end of file +} diff --git a/endpoint/src/cacheservice/mod.rs b/endpoint/src/cacheservice/mod.rs index a92d56ec7..bb1065104 100644 --- a/endpoint/src/cacheservice/mod.rs +++ b/endpoint/src/cacheservice/mod.rs @@ -9,7 +9,11 @@ struct Context { ctx: protocol::Context, } +// 高16位的mask,供store、retrive两种cmds共享 const H_MASK: u64 = 0xffff << 48; +// store cmds的skip slave的bit位 +const STORE_SKIP_SLAVE: u64 = 0x1 << 16; + impl Context { #[inline] fn from(ctx: protocol::Context) -> Self { @@ -30,16 +34,30 @@ impl Context { fn is_write(&self) -> bool { self.ctx & (1 << 62) > 0 } - // 获取idx,并将原有的idx+1 + // 获取write操作的idx,并将原有的idx+1 #[inline] fn take_write_idx(&mut self) -> u16 { let idx = self.ctx as u16; self.ctx += 1; idx } - // 低16位存储是下一次的idx - // 如果是写请求,低16位,是索引 - // 如果是读请求,则 + + /// 对store类型,doublebase时,对于cas/casq/add/addq操作slave时,需要设置该bit,后面回写时,需要skip slave_idx + /// 本操作仅对store类型cmds生效 + #[inline(always)] + fn set_skip_slave_4store(&mut self) { + assert!(self.is_write()); + self.ctx |= STORE_SKIP_SLAVE; + } + + /// 对store类型,回写时是否需要skip slave + #[inline(always)] + fn need_skip_slave_4store(&self) -> bool { + self.ctx & STORE_SKIP_SLAVE > 0 + } + + // 如果是写请求,低16位是写索引,第17个bit存放是否回写slave + // 如果是读请求,则最低16bits存访当前idx,访问下一层时,将前一次的idx向高位左移16bits #[inline] fn take_read_idx(&mut self) -> u16 { let mut low_48bit = self.low(); diff --git a/endpoint/src/cacheservice/topo.rs b/endpoint/src/cacheservice/topo.rs index 9eeec78c9..d2a2da95f 100644 --- a/endpoint/src/cacheservice/topo.rs +++ b/endpoint/src/cacheservice/topo.rs @@ -1,11 +1,12 @@ use crate::{Builder, Endpoint, Topology}; use discovery::TopologyWrite; -use protocol::{Protocol, Request, Resource, TryNextType}; +use protocol::{Protocol, Request, Resource}; use sharding::hash::{Hash, HashKey, Hasher}; use sharding::Distance; use std::collections::HashMap; use super::config::Flag; +use super::Context; use crate::shards::Shards; use crate::PerformanceTuning; use crate::Timeout; @@ -16,12 +17,18 @@ pub struct CacheService { // 一共有n组,每组1个连接。 // 排列顺序: master, master l1, slave, slave l1 streams: Distance>, + // streams 中slave所在的idx + slave_idx: usize, // streams里面的前r_num个数据是提供读的(这个长度不包含slave l1, slave)。 hasher: Hasher, parser: P, exp_sec: u32, - force_write_all: bool, // 兼容已有业务逻辑,set master失败后,是否更新其他layer - backend_no_storage: bool, // true:mc后面没有存储 + /// 兼容已有业务逻辑,set master失败后,是否更新其他layer + force_write_all: bool, + /// true:mc后面没有存储 + backend_no_storage: bool, + /// false: 以master为准; true:以master+slave为准 + double_base: bool, _marker: std::marker::PhantomData<(B, Req)>, } @@ -31,11 +38,13 @@ impl From

for CacheService { Self { parser, streams: Distance::new(), + slave_idx: 0, exp_sec: 0, force_write_all: false, // 兼容考虑默认为false,set master失败后,不更新其他layers,新业务推荐用true hasher: Default::default(), _marker: Default::default(), backend_no_storage: false, + double_base: false, } } } @@ -91,61 +100,96 @@ where fn send(&self, mut req: Self::Item) { debug_assert!(self.streams.local_len() > 0); - let mut idx: usize = 0; // master - if !req.operation().master_only() { - let mut ctx = super::Context::from(*req.mut_context()); - let (i, try_next, write_back) = if req.operation().is_store() { - self.context_store(&mut ctx, &req) - } else { - if !ctx.inited() { - // ctx未初始化, 是第一次读请求;仅第一次请求记录时间,原因如下: - // 第一次读一般访问L1,miss之后再读master; - // 读quota的更新根据第一次的请求时间更合理 - if let Some(quota) = self.streams.quota() { - req.quota(quota); - } - } - self.context_get(&mut ctx) - }; + // let idx: usize = 0; // master + let mut ctx = super::Context::from(*req.mut_context()); + // 根据context、request 获取send的策略:mc pool索引,是否try next,是否回写 + let (idx, try_next, write_back) = self.get_send_strategy(&mut ctx, &mut req); + // 设置访问策略 + if ctx.ctx > 0 { req.try_next(try_next); req.write_back(write_back); *req.mut_context() = ctx.ctx; - idx = i; if idx >= self.streams.len() { req.on_err(protocol::Error::TopChanged); return; } } + log::debug!("+++ request sent prepared:{} - {} {}", idx, req, self); debug_assert!(idx < self.streams.len(), "{} {} => {:?}", idx, self, req); unsafe { self.streams.get_unchecked(idx).send(req) }; } } + impl CacheService where E: Endpoint, { + #[inline(always)] + fn get_send_strategy(&self, ctx: &mut Context, req: &mut Req) -> (usize, bool, bool) { + if !req.operation().master_only() { + // 对于非master_only,根据请求是否store,来确定访问策略 + if req.operation().is_store() { + self.context_store(ctx, &req) + } else { + if !ctx.inited() { + // ctx未初始化, 是第一次读请求;仅第一次请求记录时间,原因如下: + // 第一次读一般访问L1,miss之后再读master; + // 读quota的更新根据第一次的请求时间更合理 + if let Some(quota) = self.streams.quota() { + req.quota(quota); + } + } + self.context_get(ctx) + } + } else if self.double_base { + // 对于master only的请求(gets/meta),如果开启doubleBase,master异常,还可以访问slave + if !ctx.check_and_inited(false) { + let try_next = self.streams.len() > self.slave_idx; + (0, try_next, false) // 第一次访问,直接访问master + } else { + (self.slave_idx, false, false) // 第二次访问,尝试访问slave,gets毋需回写 + } + } else { + // master_only,同时非double_base,只访问master + (0, false, false) + } + } + #[inline] fn context_store(&self, ctx: &mut super::Context, req: &Req) -> (usize, bool, bool) { - let (idx, try_next, write_back); - ctx.check_and_inited(true); + use protocol::memcache::Binary; + + let (mut idx, try_next, write_back); + let inited = ctx.check_and_inited(true); if ctx.is_write() { - idx = ctx.take_write_idx() as usize; + // 对于cas/casq、add/addq,第一次访问master,失败后第二次需要访问slave,然后再重构成普通set协议回写 + idx = if self.double_base && inited && req.is_cas_add() { + // doubleBase为true时,第二次直接访问slave,同时设置skip slave bit位为1 + ctx.set_skip_slave_4store(); + self.slave_idx + } else { + ctx.take_write_idx() as usize + }; + if idx == self.slave_idx && ctx.need_skip_slave_4store() { + // 当更新到slave idx时,如果需要skip,则skip掉slave + idx = ctx.take_write_idx() as usize; + } write_back = idx + 1 < self.streams.len(); // try_next逻辑: // 1)如果当前为最后一个layer,设为false; - // 2)否则,根据opcode、force_write_all一起确定。 + // 2)否则,根据double_base、force_write_all、idx一起确定。 try_next = if idx + 1 >= self.streams.len() { false } else { - use protocol::memcache::Binary; - match req.try_next_type() { - TryNextType::NotTryNext => false, - TryNextType::TryNext => true, - TryNextType::Unkown => self.force_write_all, - } + req.can_try_next(self.double_base, self.force_write_all, idx) + // let try_next_raw = match req.try_next_type() { + // TryNextType::NotTryNext => false, + // TryNextType::TryNext => true, + // TryNextType::Unkown => self.force_write_all, + // }; }; } else { // 是读触发的回种的写请求 @@ -203,6 +247,7 @@ where self.exp_sec = (ns.exptime / 1000) as u32; // 转换成秒 self.force_write_all = ns.flag.get(Flag::ForceWriteAll as u8); self.backend_no_storage = ns.flag.get(Flag::BackendNoStorage as u8); + self.double_base = ns.flag.get(Flag::DoubleBase as u8); let dist = &ns.distribution.clone(); let old_streams = self.streams.take(); @@ -232,6 +277,9 @@ where local_len = backends.sort(master, |_, s| s <= l); } + // 设置slave的索引位置,方便doubleBase策略访问slave + self.slave_idx = local_len; + let mut new = Vec::with_capacity(backends.len()); for (i, group) in backends.into_iter().enumerate() { // 第一组是master diff --git a/protocol/src/flag.rs b/protocol/src/flag.rs index 26daac6b5..5965d3856 100644 --- a/protocol/src/flag.rs +++ b/protocol/src/flag.rs @@ -101,22 +101,25 @@ impl Flag { //} } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy)] pub enum TryNextType { NotTryNext = 0, TryNext = 1, - Unkown = 2, + Unknown = 2, } +// 方便检索 +const TRY_NEXT_TABLE: [TryNextType; 3] = [ + TryNextType::NotTryNext, + TryNextType::TryNext, + TryNextType::Unknown, +]; -// (1) 0: not try next(对add/replace生效); (2) 1: try next; (3) 2:unkown (仅对set生效,注意提前考虑cas) -impl TryNextType { - pub fn from(val: u8) -> Self { - match val { - 0 => TryNextType::NotTryNext, - 1 => TryNextType::TryNext, - 2 => TryNextType::Unkown, - _ => panic!("unknow try next type"), - } +impl From for TryNextType { + fn from(val: u8) -> Self { + let idx = val as usize; + assert!(idx < TRY_NEXT_TABLE.len(), "malformed tryNextType:{}", idx); + + *TRY_NEXT_TABLE.get(idx).expect("malformed tryNextType") } } diff --git a/protocol/src/memcache/binary/mod.rs b/protocol/src/memcache/binary/mod.rs index 49c5e0c67..91e9a351b 100644 --- a/protocol/src/memcache/binary/mod.rs +++ b/protocol/src/memcache/binary/mod.rs @@ -16,6 +16,9 @@ use crate::{ use sharding::hash::Hash; +/// mc的master在内部结构存储时,idx为0,结构顺序为: master, master_l1, slave, slave_l1 +pub(crate) const MASTER_IDX: usize = 0; + impl Protocol for MemcacheBinary { #[inline] fn config(&self) -> crate::Config { diff --git a/protocol/src/memcache/binary/packet.rs b/protocol/src/memcache/binary/packet.rs index a7fce5545..0d2305932 100644 --- a/protocol/src/memcache/binary/packet.rs +++ b/protocol/src/memcache/binary/packet.rs @@ -63,7 +63,7 @@ pub(crate) const COMMAND_IDX: [u8; 128] = [ // cas 变更为setq pub(crate) const NOREPLY_MAPPING: [u8; 128] = [ 0x09, 0x11, 0x11, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x09, 0x00, 0x00, 0x0d, 0x0d, 0x19, 0x1a, - 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1a, 0x1b, 0x1c, 0x1d, 0x1e, 0x1f, + 0x10, 0x11, 0x11, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1a, 0x1b, 0x1c, 0x1d, 0x1e, 0x1f, 0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, 0x28, 0x29, 0x2a, 0x2b, 0x2c, 0x2d, 0x2e, 0x2f, 0x30, 0x32, 0x32, 0x34, 0x34, 0x36, 0x36, 0x38, 0x38, 0x3a, 0x3a, 0x3c, 0x3c, 0x3d, 0x3e, 0x3f, 0x41, 0x42, 0x43, 0x44, 0x45, 0x46, 0x47, 0x49, 0x49, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, @@ -79,9 +79,9 @@ pub(crate) const NOREPLY_MAPPING: [u8; 128] = [ //]; // 请求完毕后,不考虑layer及其他配置,如果cmd失败,是否继续try_next: -// (1) 0: not try next(对add/replace生效); (2) 1: try next; (3) 2:unkown (仅对set生效,注意提前考虑cas) +// (1) 0: not try next(对add/replace生效); (2) 1: try next; (3) 2:unkown (仅对cas id 大于0的set/setq、add生效,注意提前考虑cas) const TRY_NEXT_TABLE: [u8; 128] = [ - 1, 2, 0, 0, 1, 1, 1, 1, 0, 1, 0, 1, 1, 1, 0, 0, 1, 2, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 0, + 1, 2, 2, 0, 1, 1, 1, 1, 0, 1, 0, 1, 1, 1, 0, 0, 1, 2, 2, 0, 1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 1, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, @@ -124,6 +124,7 @@ pub const OP_DEL: u8 = 0x04; pub const OP_ADD: u8 = 0x02; pub const OP_GETK: u8 = 0x0c; pub(crate) const OP_SETQ: u8 = 0x11; +pub(super) const OP_ADDQ: u8 = 0x12; // 这个专门为gets扩展 pub const OP_GETS: u8 = 0x48; // 这个没有业务使用,先注销掉 @@ -190,9 +191,10 @@ pub trait Binary { fn hash(&self, alg: &H) -> i64; fn check_request(&self) -> Result<()>; fn check_response(&self) -> Result<()>; - fn try_next_type(&self) -> TryNextType; + fn can_try_next(&self, double_base: bool, force_write_all: bool, cur_idx: usize) -> bool; fn sentonly(&self) -> bool; fn noforward(&self) -> bool; + fn is_cas_add(&self) -> bool; } pub trait Op {} @@ -362,22 +364,39 @@ impl Binary for RingSlice { } } + /// 以master为准时,仅仅从协议层面考虑,计算是否重试 #[inline(always)] - fn try_next_type(&self) -> TryNextType { + fn can_try_next(&self, double_base: bool, force_write_all: bool, cur_idx: usize) -> bool { let op = self.op() as usize; assert!(op < TRY_NEXT_TABLE.len()); - let try_next = TRY_NEXT_TABLE[op]; - if try_next != TryNextType::Unkown as u8 { - return TryNextType::from(try_next); + let try_next = TRY_NEXT_TABLE[op].into(); + match try_next { + TryNextType::TryNext => true, + TryNextType::NotTryNext => false, + TryNextType::Unknown => { + // Unknown type 对应的cmd只有set/setq、cas/casq以及add/addq + if double_base { + // 开启double base后,对于cas、add类型,如果当前idx为master,后面可以再try slave + cur_idx == super::MASTER_IDX || force_write_all + } else { + // 如果没开启双基准,对于set(非cas/add),根据force_write_all 来确定是否try write,而cas/add 不用try + !self.is_cas_add() && force_write_all + } + } } - // 只有set、setq 才会是unknown,此时只需要对cas再设置为NotTryNext即可 - if self.cas() > 0 { - log::debug!("not try next for cas"); - return TryNextType::NotTryNext; - } - return TryNextType::from(try_next); + // TODO 待测试稳定后,后清理 fishermen + // if try_next != TryNextType::Unkown as u8 { + // return TryNextType::from(try_next); + // } + + // // 只有set、setq 才会是unknown,此时只需要对cas再设置为NotTryNext即可 + // if self.cas() > 0 { + // log::debug!("not try next for cas"); + // return TryNextType::NotTryNext; + // } + // return TryNextType::from(try_next); } #[inline(always)] @@ -393,4 +412,13 @@ impl Binary for RingSlice { _ => false, } } + + #[inline(always)] + fn is_cas_add(&self) -> bool { + match self.op() { + OP_ADD | OP_ADDQ => true, + OP_SET | OP_SETQ => self.cas() > 0, + _ => false, + } + } } From ffa59e399ae608f8582fb53d9016989c639adbbe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=B3=A2?= Date: Thu, 26 Oct 2023 17:08:32 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E6=9A=82=E7=95=99=E4=B9=8B=E5=89=8D?= =?UTF-8?q?=E9=80=BB=E8=BE=91=EF=BC=8C=E7=A8=B3=E5=AE=9A=E5=90=8E=E5=86=8D?= =?UTF-8?q?=E6=B8=85=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- endpoint/src/cacheservice/topo.rs | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/endpoint/src/cacheservice/topo.rs b/endpoint/src/cacheservice/topo.rs index d2a2da95f..4c0769e05 100644 --- a/endpoint/src/cacheservice/topo.rs +++ b/endpoint/src/cacheservice/topo.rs @@ -100,7 +100,32 @@ where fn send(&self, mut req: Self::Item) { debug_assert!(self.streams.local_len() > 0); + // TODO 测试稳定后清理,预计2023.11.30后可清理 fishermen // let idx: usize = 0; // master + // if !req.operation().master_only() { + // let mut ctx = super::Context::from(*req.mut_context()); + // let (i, try_next, write_back) = if req.operation().is_store() { + // self.context_store(&mut ctx, &req) + // } else { + // if !ctx.inited() { + // // ctx未初始化, 是第一次读请求;仅第一次请求记录时间,原因如下: + // // 第一次读一般访问L1,miss之后再读master; + // // 读quota的更新根据第一次的请求时间更合理 + // if let Some(quota) = self.streams.quota() { + // req.quota(quota); + // } + // } + // self.context_get(&mut ctx) + // }; + // req.try_next(try_next); + // req.write_back(write_back); + // *req.mut_context() = ctx.ctx; + // idx = i; + // if idx >= self.streams.len() { + // req.on_err(protocol::Error::TopChanged); + // return; + // } + // } let mut ctx = super::Context::from(*req.mut_context()); // 根据context、request 获取send的策略:mc pool索引,是否try next,是否回写 let (idx, try_next, write_back) = self.get_send_strategy(&mut ctx, &mut req); From 31aff939b400d06b85796da35339b68d6eb17c93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=B3=A2?= Date: Thu, 26 Oct 2023 17:10:46 +0800 Subject: [PATCH 3/4] add comments --- endpoint/src/cacheservice/topo.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/endpoint/src/cacheservice/topo.rs b/endpoint/src/cacheservice/topo.rs index 4c0769e05..d5b3bec78 100644 --- a/endpoint/src/cacheservice/topo.rs +++ b/endpoint/src/cacheservice/topo.rs @@ -136,6 +136,7 @@ where *req.mut_context() = ctx.ctx; if idx >= self.streams.len() { req.on_err(protocol::Error::TopChanged); + log::warn!("+++ idx/{} top changed? {},req:{}", idx, self, req); return; } } From d1b5e603ee446823fa625b109bd36e16dd1955cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=B3=A2?= Date: Thu, 26 Oct 2023 17:13:54 +0800 Subject: [PATCH 4/4] update comments --- endpoint/src/cacheservice/topo.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/endpoint/src/cacheservice/topo.rs b/endpoint/src/cacheservice/topo.rs index d5b3bec78..35d767992 100644 --- a/endpoint/src/cacheservice/topo.rs +++ b/endpoint/src/cacheservice/topo.rs @@ -135,8 +135,8 @@ where req.write_back(write_back); *req.mut_context() = ctx.ctx; if idx >= self.streams.len() { - req.on_err(protocol::Error::TopChanged); log::warn!("+++ idx/{} top changed? {},req:{}", idx, self, req); + req.on_err(protocol::Error::TopChanged); return; } }