Skip to content

Commit

Permalink
support doublebase
Browse files Browse the repository at this point in the history
  • Loading branch information
hustfisher committed Oct 26, 2023
1 parent 11a61b8 commit ab2a08d
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 61 deletions.
23 changes: 21 additions & 2 deletions endpoint/src/cacheservice/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ pub struct Namespace {
pub timeout_ms_slave: u32,
#[serde(default)]
pub local_affinity: bool,
/// TODO 通过bit位,设置不同的策略/属性,详见下面Flag定义,新增bool属性,考虑统一设置到flag,从DoubleBase开始 fishermen
#[serde(default)]
pub flag: u64, // 通过bit位,设置不同的策略/属性,详见下面Flag定义
pub flag: u64,
}

// 通过bit位,设置不同的策略/属性;从低位开始依次排列
Expand All @@ -46,6 +47,24 @@ pub(crate) enum Flag {
ForceWriteAll = 1,
UpdateSlavel1 = 2,
LocalAffinity = 3,
/// <pre>
/// DoubleBase默认为false,以master为准,即write master或gets失败后,即返回失败。
/// 若设置为true,则可以将slave也可以作为基准,在key维度,如果master节点异常,就以slave作为基准,目前影响cas/casq、add/addq、gets指令;
/// doubleBase设置不同值,对指令的影响:
/// 1 cas
/// mcDoubleBase=false: cas master失败,则直接返回;
/// mcDoubleBase=true: cas master失败,再尝试cas slave,如果master、slave都失败,才返回;
/// cas slave 成功,说明以slave为准,此时则将master及其他所有层也set一次,并返回成功;
/// 2 add
/// mcDoubleBase=false: add master失败,则直接返回;
/// mcDoubleBase=true: add master失败,如果master状态正常,直接返回;
/// 如果master、slave都失败,则返回失败;
/// 如果slave成功,则set master及其他层,并返回成功;
/// 3 gets:
/// mcDoubleBase=false: gets master失败,则直接返回;
/// mcDoubleBase=true: gets master失败,再尝试getes slave,如果master、slave都失败,才返回;
/// </pre>
DoubleBase = 4,
}

impl Namespace {
Expand Down Expand Up @@ -193,4 +212,4 @@ impl<'a> Config<'a> {
}
None
}
}
}
26 changes: 22 additions & 4 deletions endpoint/src/cacheservice/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ struct Context {
ctx: protocol::Context,
}

// 高16位的mask,供store、retrive两种cmds共享
const H_MASK: u64 = 0xffff << 48;
// store cmds的skip slave的bit位
const STORE_SKIP_SLAVE: u64 = 0x1 << 16;

impl Context {
#[inline]
fn from(ctx: protocol::Context) -> Self {
Expand All @@ -30,16 +34,30 @@ impl Context {
fn is_write(&self) -> bool {
self.ctx & (1 << 62) > 0
}
// 获取idx,并将原有的idx+1
// 获取write操作的idx,并将原有的idx+1
#[inline]
fn take_write_idx(&mut self) -> u16 {
let idx = self.ctx as u16;
self.ctx += 1;
idx
}
// 低16位存储是下一次的idx
// 如果是写请求,低16位,是索引
// 如果是读请求,则

/// 对store类型,doublebase时,对于cas/casq/add/addq操作slave时,需要设置该bit,后面回写时,需要skip slave_idx
/// 本操作仅对store类型cmds生效
#[inline(always)]
fn set_skip_slave_4store(&mut self) {
assert!(self.is_write());
self.ctx |= STORE_SKIP_SLAVE;
}

/// 对store类型,回写时是否需要skip slave
#[inline(always)]
fn need_skip_slave_4store(&self) -> bool {
self.ctx & STORE_SKIP_SLAVE > 0
}

// 如果是写请求,低16位是写索引,第17个bit存放是否回写slave
// 如果是读请求,则最低16bits存访当前idx,访问下一层时,将前一次的idx向高位左移16bits
#[inline]
fn take_read_idx(&mut self) -> u16 {
let mut low_48bit = self.low();
Expand Down
108 changes: 78 additions & 30 deletions endpoint/src/cacheservice/topo.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::{Builder, Endpoint, Topology};
use discovery::TopologyWrite;
use protocol::{Protocol, Request, Resource, TryNextType};
use protocol::{Protocol, Request, Resource};
use sharding::hash::{Hash, HashKey, Hasher};
use sharding::Distance;
use std::collections::HashMap;

use super::config::Flag;
use super::Context;
use crate::shards::Shards;
use crate::PerformanceTuning;
use crate::Timeout;
Expand All @@ -16,12 +17,18 @@ pub struct CacheService<B, E, Req, P> {
// 一共有n组,每组1个连接。
// 排列顺序: master, master l1, slave, slave l1
streams: Distance<Shards<E, Req>>,
// streams 中slave所在的idx
slave_idx: usize,
// streams里面的前r_num个数据是提供读的(这个长度不包含slave l1, slave)。
hasher: Hasher,
parser: P,
exp_sec: u32,
force_write_all: bool, // 兼容已有业务逻辑,set master失败后,是否更新其他layer
backend_no_storage: bool, // true:mc后面没有存储
/// 兼容已有业务逻辑,set master失败后,是否更新其他layer
force_write_all: bool,
/// true:mc后面没有存储
backend_no_storage: bool,
/// false: 以master为准; true:以master+slave为准
double_base: bool,
_marker: std::marker::PhantomData<(B, Req)>,
}

Expand All @@ -31,11 +38,13 @@ impl<B, E, Req, P> From<P> for CacheService<B, E, Req, P> {
Self {
parser,
streams: Distance::new(),
slave_idx: 0,
exp_sec: 0,
force_write_all: false, // 兼容考虑默认为false,set master失败后,不更新其他layers,新业务推荐用true
hasher: Default::default(),
_marker: Default::default(),
backend_no_storage: false,
double_base: false,
}
}
}
Expand Down Expand Up @@ -91,61 +100,96 @@ where
fn send(&self, mut req: Self::Item) {
debug_assert!(self.streams.local_len() > 0);

let mut idx: usize = 0; // master
if !req.operation().master_only() {
let mut ctx = super::Context::from(*req.mut_context());
let (i, try_next, write_back) = if req.operation().is_store() {
self.context_store(&mut ctx, &req)
} else {
if !ctx.inited() {
// ctx未初始化, 是第一次读请求;仅第一次请求记录时间,原因如下:
// 第一次读一般访问L1,miss之后再读master;
// 读quota的更新根据第一次的请求时间更合理
if let Some(quota) = self.streams.quota() {
req.quota(quota);
}
}
self.context_get(&mut ctx)
};
// let idx: usize = 0; // master
let mut ctx = super::Context::from(*req.mut_context());
// 根据context、request 获取send的策略:mc pool索引,是否try next,是否回写
let (idx, try_next, write_back) = self.get_send_strategy(&mut ctx, &mut req);
// 设置访问策略
if ctx.ctx > 0 {
req.try_next(try_next);
req.write_back(write_back);
*req.mut_context() = ctx.ctx;
idx = i;
if idx >= self.streams.len() {
req.on_err(protocol::Error::TopChanged);
return;
}
}

log::debug!("+++ request sent prepared:{} - {} {}", idx, req, self);
debug_assert!(idx < self.streams.len(), "{} {} => {:?}", idx, self, req);

unsafe { self.streams.get_unchecked(idx).send(req) };
}
}

impl<B: Send + Sync, E, Req: Request, P: Protocol> CacheService<B, E, Req, P>
where
E: Endpoint<Item = Req>,
{
#[inline(always)]
fn get_send_strategy(&self, ctx: &mut Context, req: &mut Req) -> (usize, bool, bool) {
if !req.operation().master_only() {
// 对于非master_only,根据请求是否store,来确定访问策略
if req.operation().is_store() {
self.context_store(ctx, &req)
} else {
if !ctx.inited() {
// ctx未初始化, 是第一次读请求;仅第一次请求记录时间,原因如下:
// 第一次读一般访问L1,miss之后再读master;
// 读quota的更新根据第一次的请求时间更合理
if let Some(quota) = self.streams.quota() {
req.quota(quota);
}
}
self.context_get(ctx)
}
} else if self.double_base {
// 对于master only的请求(gets/meta),如果开启doubleBase,master异常,还可以访问slave
if !ctx.check_and_inited(false) {
let try_next = self.streams.len() > self.slave_idx;
(0, try_next, false) // 第一次访问,直接访问master
} else {
(self.slave_idx, false, false) // 第二次访问,尝试访问slave,gets毋需回写
}
} else {
// master_only,同时非double_base,只访问master
(0, false, false)
}
}

#[inline]
fn context_store(&self, ctx: &mut super::Context, req: &Req) -> (usize, bool, bool) {
let (idx, try_next, write_back);
ctx.check_and_inited(true);
use protocol::memcache::Binary;

let (mut idx, try_next, write_back);
let inited = ctx.check_and_inited(true);
if ctx.is_write() {
idx = ctx.take_write_idx() as usize;
// 对于cas/casq、add/addq,第一次访问master,失败后第二次需要访问slave,然后再重构成普通set协议回写
idx = if self.double_base && inited && req.is_cas_add() {
// doubleBase为true时,第二次直接访问slave,同时设置skip slave bit位为1
ctx.set_skip_slave_4store();
self.slave_idx
} else {
ctx.take_write_idx() as usize
};
if idx == self.slave_idx && ctx.need_skip_slave_4store() {
// 当更新到slave idx时,如果需要skip,则skip掉slave
idx = ctx.take_write_idx() as usize;
}
write_back = idx + 1 < self.streams.len();

// try_next逻辑:
// 1)如果当前为最后一个layer,设为false;
// 2)否则,根据opcode、force_write_all一起确定
// 2)否则,根据double_base、force_write_all、idx一起确定
try_next = if idx + 1 >= self.streams.len() {
false
} else {
use protocol::memcache::Binary;
match req.try_next_type() {
TryNextType::NotTryNext => false,
TryNextType::TryNext => true,
TryNextType::Unkown => self.force_write_all,
}
req.can_try_next(self.double_base, self.force_write_all, idx)
// let try_next_raw = match req.try_next_type() {
// TryNextType::NotTryNext => false,
// TryNextType::TryNext => true,
// TryNextType::Unkown => self.force_write_all,
// };
};
} else {
// 是读触发的回种的写请求
Expand Down Expand Up @@ -203,6 +247,7 @@ where
self.exp_sec = (ns.exptime / 1000) as u32; // 转换成秒
self.force_write_all = ns.flag.get(Flag::ForceWriteAll as u8);
self.backend_no_storage = ns.flag.get(Flag::BackendNoStorage as u8);
self.double_base = ns.flag.get(Flag::DoubleBase as u8);
let dist = &ns.distribution.clone();

let old_streams = self.streams.take();
Expand Down Expand Up @@ -232,6 +277,9 @@ where
local_len = backends.sort(master, |_, s| s <= l);
}

// 设置slave的索引位置,方便doubleBase策略访问slave
self.slave_idx = local_len;

let mut new = Vec::with_capacity(backends.len());
for (i, group) in backends.into_iter().enumerate() {
// 第一组是master
Expand Down
25 changes: 14 additions & 11 deletions protocol/src/flag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,22 +101,25 @@ impl Flag {
//}
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Copy)]
pub enum TryNextType {
NotTryNext = 0,
TryNext = 1,
Unkown = 2,
Unknown = 2,
}
// 方便检索
const TRY_NEXT_TABLE: [TryNextType; 3] = [
TryNextType::NotTryNext,
TryNextType::TryNext,
TryNextType::Unknown,
];

// (1) 0: not try next(对add/replace生效); (2) 1: try next; (3) 2:unkown (仅对set生效,注意提前考虑cas)
impl TryNextType {
pub fn from(val: u8) -> Self {
match val {
0 => TryNextType::NotTryNext,
1 => TryNextType::TryNext,
2 => TryNextType::Unkown,
_ => panic!("unknow try next type"),
}
impl From<u8> for TryNextType {
fn from(val: u8) -> Self {
let idx = val as usize;
assert!(idx < TRY_NEXT_TABLE.len(), "malformed tryNextType:{}", idx);

*TRY_NEXT_TABLE.get(idx).expect("malformed tryNextType")
}
}

Expand Down
3 changes: 3 additions & 0 deletions protocol/src/memcache/binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ use crate::{

use sharding::hash::Hash;

/// mc的master在内部结构存储时,idx为0,结构顺序为: master, master_l1, slave, slave_l1
pub(crate) const MASTER_IDX: usize = 0;

impl Protocol for MemcacheBinary {
#[inline]
fn config(&self) -> crate::Config {
Expand Down
Loading

0 comments on commit ab2a08d

Please sign in to comment.