Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support doublebase #379

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions endpoint/src/cacheservice/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ pub struct Namespace {
pub timeout_ms_slave: u32,
#[serde(default)]
pub local_affinity: bool,
/// TODO 通过bit位,设置不同的策略/属性,详见下面Flag定义,新增bool属性,考虑统一设置到flag,从DoubleBase开始 fishermen
#[serde(default)]
pub flag: u64, // 通过bit位,设置不同的策略/属性,详见下面Flag定义
pub flag: u64,
}

// 通过bit位,设置不同的策略/属性;从低位开始依次排列
Expand All @@ -46,6 +47,24 @@ pub(crate) enum Flag {
ForceWriteAll = 1,
UpdateSlavel1 = 2,
LocalAffinity = 3,
/// <pre>
/// DoubleBase默认为false,以master为准,即write master或gets失败后,即返回失败。
/// 若设置为true,则可以将slave也可以作为基准,在key维度,如果master节点异常,就以slave作为基准,目前影响cas/casq、add/addq、gets指令;
/// doubleBase设置不同值,对指令的影响:
/// 1 cas
/// mcDoubleBase=false: cas master失败,则直接返回;
/// mcDoubleBase=true: cas master失败,再尝试cas slave,如果master、slave都失败,才返回;
/// cas slave 成功,说明以slave为准,此时则将master及其他所有层也set一次,并返回成功;
/// 2 add
/// mcDoubleBase=false: add master失败,则直接返回;
/// mcDoubleBase=true: add master失败,如果master状态正常,直接返回;
/// 如果master、slave都失败,则返回失败;
/// 如果slave成功,则set master及其他层,并返回成功;
/// 3 gets:
/// mcDoubleBase=false: gets master失败,则直接返回;
/// mcDoubleBase=true: gets master失败,再尝试getes slave,如果master、slave都失败,才返回;
/// </pre>
DoubleBase = 4,
}

impl Namespace {
Expand Down Expand Up @@ -193,4 +212,4 @@ impl<'a> Config<'a> {
}
None
}
}
}
26 changes: 22 additions & 4 deletions endpoint/src/cacheservice/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ struct Context {
ctx: protocol::Context,
}

// 高16位的mask,供store、retrive两种cmds共享
const H_MASK: u64 = 0xffff << 48;
// store cmds的skip slave的bit位
const STORE_SKIP_SLAVE: u64 = 0x1 << 16;

impl Context {
#[inline]
fn from(ctx: protocol::Context) -> Self {
Expand All @@ -30,16 +34,30 @@ impl Context {
fn is_write(&self) -> bool {
self.ctx & (1 << 62) > 0
}
// 获取idx,并将原有的idx+1
// 获取write操作的idx,并将原有的idx+1
#[inline]
fn take_write_idx(&mut self) -> u16 {
let idx = self.ctx as u16;
self.ctx += 1;
idx
}
// 低16位存储是下一次的idx
// 如果是写请求,低16位,是索引
// 如果是读请求,则

/// 对store类型,doublebase时,对于cas/casq/add/addq操作slave时,需要设置该bit,后面回写时,需要skip slave_idx
/// 本操作仅对store类型cmds生效
#[inline(always)]
fn set_skip_slave_4store(&mut self) {
assert!(self.is_write());
self.ctx |= STORE_SKIP_SLAVE;
}

/// 对store类型,回写时是否需要skip slave
#[inline(always)]
fn need_skip_slave_4store(&self) -> bool {
self.ctx & STORE_SKIP_SLAVE > 0
}

// 如果是写请求,低16位是写索引,第17个bit存放是否回写slave
// 如果是读请求,则最低16bits存访当前idx,访问下一层时,将前一次的idx向高位左移16bits
#[inline]
fn take_read_idx(&mut self) -> u16 {
let mut low_48bit = self.low();
Expand Down
134 changes: 104 additions & 30 deletions endpoint/src/cacheservice/topo.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::{Builder, Endpoint, Topology};
use discovery::TopologyWrite;
use protocol::{Protocol, Request, Resource, TryNextType};
use protocol::{Protocol, Request, Resource};
use sharding::hash::{Hash, HashKey, Hasher};
use sharding::Distance;
use std::collections::HashMap;

use super::config::Flag;
use super::Context;
use crate::shards::Shards;
use crate::PerformanceTuning;
use crate::Timeout;
Expand All @@ -16,12 +17,18 @@ pub struct CacheService<B, E, Req, P> {
// 一共有n组,每组1个连接。
// 排列顺序: master, master l1, slave, slave l1
streams: Distance<Shards<E, Req>>,
// streams 中slave所在的idx
slave_idx: usize,
// streams里面的前r_num个数据是提供读的(这个长度不包含slave l1, slave)。
hasher: Hasher,
parser: P,
exp_sec: u32,
force_write_all: bool, // 兼容已有业务逻辑,set master失败后,是否更新其他layer
backend_no_storage: bool, // true:mc后面没有存储
/// 兼容已有业务逻辑,set master失败后,是否更新其他layer
force_write_all: bool,
/// true:mc后面没有存储
backend_no_storage: bool,
/// false: 以master为准; true:以master+slave为准
double_base: bool,
_marker: std::marker::PhantomData<(B, Req)>,
}

Expand All @@ -31,11 +38,13 @@ impl<B, E, Req, P> From<P> for CacheService<B, E, Req, P> {
Self {
parser,
streams: Distance::new(),
slave_idx: 0,
exp_sec: 0,
force_write_all: false, // 兼容考虑默认为false,set master失败后,不更新其他layers,新业务推荐用true
hasher: Default::default(),
_marker: Default::default(),
backend_no_storage: false,
double_base: false,
}
}
}
Expand Down Expand Up @@ -91,61 +100,122 @@ where
fn send(&self, mut req: Self::Item) {
debug_assert!(self.streams.local_len() > 0);

let mut idx: usize = 0; // master
if !req.operation().master_only() {
let mut ctx = super::Context::from(*req.mut_context());
let (i, try_next, write_back) = if req.operation().is_store() {
self.context_store(&mut ctx, &req)
} else {
if !ctx.inited() {
// ctx未初始化, 是第一次读请求;仅第一次请求记录时间,原因如下:
// 第一次读一般访问L1,miss之后再读master;
// 读quota的更新根据第一次的请求时间更合理
if let Some(quota) = self.streams.quota() {
req.quota(quota);
}
}
self.context_get(&mut ctx)
};
// TODO 测试稳定后清理,预计2023.11.30后可清理 fishermen
// let idx: usize = 0; // master
// if !req.operation().master_only() {
// let mut ctx = super::Context::from(*req.mut_context());
// let (i, try_next, write_back) = if req.operation().is_store() {
// self.context_store(&mut ctx, &req)
// } else {
// if !ctx.inited() {
// // ctx未初始化, 是第一次读请求;仅第一次请求记录时间,原因如下:
// // 第一次读一般访问L1,miss之后再读master;
// // 读quota的更新根据第一次的请求时间更合理
// if let Some(quota) = self.streams.quota() {
// req.quota(quota);
// }
// }
// self.context_get(&mut ctx)
// };
// req.try_next(try_next);
// req.write_back(write_back);
// *req.mut_context() = ctx.ctx;
// idx = i;
// if idx >= self.streams.len() {
// req.on_err(protocol::Error::TopChanged);
// return;
// }
// }
let mut ctx = super::Context::from(*req.mut_context());
// 根据context、request 获取send的策略:mc pool索引,是否try next,是否回写
let (idx, try_next, write_back) = self.get_send_strategy(&mut ctx, &mut req);
// 设置访问策略
if ctx.ctx > 0 {
req.try_next(try_next);
req.write_back(write_back);
*req.mut_context() = ctx.ctx;
idx = i;
if idx >= self.streams.len() {
log::warn!("+++ idx/{} top changed? {},req:{}", idx, self, req);
req.on_err(protocol::Error::TopChanged);
return;
}
}

log::debug!("+++ request sent prepared:{} - {} {}", idx, req, self);
debug_assert!(idx < self.streams.len(), "{} {} => {:?}", idx, self, req);

unsafe { self.streams.get_unchecked(idx).send(req) };
}
}

impl<B: Send + Sync, E, Req: Request, P: Protocol> CacheService<B, E, Req, P>
where
E: Endpoint<Item = Req>,
{
#[inline(always)]
fn get_send_strategy(&self, ctx: &mut Context, req: &mut Req) -> (usize, bool, bool) {
if !req.operation().master_only() {
// 对于非master_only,根据请求是否store,来确定访问策略
if req.operation().is_store() {
self.context_store(ctx, &req)
} else {
if !ctx.inited() {
// ctx未初始化, 是第一次读请求;仅第一次请求记录时间,原因如下:
// 第一次读一般访问L1,miss之后再读master;
// 读quota的更新根据第一次的请求时间更合理
if let Some(quota) = self.streams.quota() {
req.quota(quota);
}
}
self.context_get(ctx)
}
} else if self.double_base {
// 对于master only的请求(gets/meta),如果开启doubleBase,master异常,还可以访问slave
if !ctx.check_and_inited(false) {
let try_next = self.streams.len() > self.slave_idx;
(0, try_next, false) // 第一次访问,直接访问master
} else {
(self.slave_idx, false, false) // 第二次访问,尝试访问slave,gets毋需回写
}
} else {
// master_only,同时非double_base,只访问master
(0, false, false)
}
}

#[inline]
fn context_store(&self, ctx: &mut super::Context, req: &Req) -> (usize, bool, bool) {
let (idx, try_next, write_back);
ctx.check_and_inited(true);
use protocol::memcache::Binary;

let (mut idx, try_next, write_back);
let inited = ctx.check_and_inited(true);
if ctx.is_write() {
idx = ctx.take_write_idx() as usize;
// 对于cas/casq、add/addq,第一次访问master,失败后第二次需要访问slave,然后再重构成普通set协议回写
idx = if self.double_base && inited && req.is_cas_add() {
// doubleBase为true时,第二次直接访问slave,同时设置skip slave bit位为1
ctx.set_skip_slave_4store();
self.slave_idx
} else {
ctx.take_write_idx() as usize
};
if idx == self.slave_idx && ctx.need_skip_slave_4store() {
// 当更新到slave idx时,如果需要skip,则skip掉slave
idx = ctx.take_write_idx() as usize;
}
write_back = idx + 1 < self.streams.len();

// try_next逻辑:
// 1)如果当前为最后一个layer,设为false;
// 2)否则,根据opcode、force_write_all一起确定
// 2)否则,根据double_base、force_write_all、idx一起确定
try_next = if idx + 1 >= self.streams.len() {
false
} else {
use protocol::memcache::Binary;
match req.try_next_type() {
TryNextType::NotTryNext => false,
TryNextType::TryNext => true,
TryNextType::Unkown => self.force_write_all,
}
req.can_try_next(self.double_base, self.force_write_all, idx)
// let try_next_raw = match req.try_next_type() {
// TryNextType::NotTryNext => false,
// TryNextType::TryNext => true,
// TryNextType::Unkown => self.force_write_all,
// };
};
} else {
// 是读触发的回种的写请求
Expand Down Expand Up @@ -203,6 +273,7 @@ where
self.exp_sec = (ns.exptime / 1000) as u32; // 转换成秒
self.force_write_all = ns.flag.get(Flag::ForceWriteAll as u8);
self.backend_no_storage = ns.flag.get(Flag::BackendNoStorage as u8);
self.double_base = ns.flag.get(Flag::DoubleBase as u8);
let dist = &ns.distribution.clone();

let old_streams = self.streams.take();
Expand Down Expand Up @@ -232,6 +303,9 @@ where
local_len = backends.sort(master, |_, s| s <= l);
}

// 设置slave的索引位置,方便doubleBase策略访问slave
self.slave_idx = local_len;

let mut new = Vec::with_capacity(backends.len());
for (i, group) in backends.into_iter().enumerate() {
// 第一组是master
Expand Down
25 changes: 14 additions & 11 deletions protocol/src/flag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,22 +101,25 @@ impl Flag {
//}
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Copy)]
pub enum TryNextType {
NotTryNext = 0,
TryNext = 1,
Unkown = 2,
Unknown = 2,
}
// 方便检索
const TRY_NEXT_TABLE: [TryNextType; 3] = [
TryNextType::NotTryNext,
TryNextType::TryNext,
TryNextType::Unknown,
];

// (1) 0: not try next(对add/replace生效); (2) 1: try next; (3) 2:unkown (仅对set生效,注意提前考虑cas)
impl TryNextType {
pub fn from(val: u8) -> Self {
match val {
0 => TryNextType::NotTryNext,
1 => TryNextType::TryNext,
2 => TryNextType::Unkown,
_ => panic!("unknow try next type"),
}
impl From<u8> for TryNextType {
fn from(val: u8) -> Self {
let idx = val as usize;
assert!(idx < TRY_NEXT_TABLE.len(), "malformed tryNextType:{}", idx);

*TRY_NEXT_TABLE.get(idx).expect("malformed tryNextType")
}
}

Expand Down
3 changes: 3 additions & 0 deletions protocol/src/memcache/binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ use crate::{

use sharding::hash::Hash;

/// mc的master在内部结构存储时,idx为0,结构顺序为: master, master_l1, slave, slave_l1
pub(crate) const MASTER_IDX: usize = 0;

impl Protocol for MemcacheBinary {
#[inline]
fn config(&self) -> crate::Config {
Expand Down
Loading
Loading