Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

移除Builder trait #389

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,8 @@ async fn run() -> Result<()> {
}

use protocol::Parser;
use std::sync::Arc;
use stream::{Backend, Builder, Request};
type Endpoint = Arc<Backend<Request>>;
type Topology = endpoint::TopologyProtocol<Builder<Parser, Request>, Endpoint, Request, Parser>;
use stream::Request;
type Topology = endpoint::TopologyProtocol<Request, Parser>;
async fn discovery_init(
ctx: &'static Context,
rx: Receiver<TopologyWriteGuard<Topology>>,
Expand Down
5 changes: 2 additions & 3 deletions agent/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ use ds::chan::Sender;
use metrics::Path;
use protocol::{Parser, Result};
use stream::pipeline::copy_bidirectional;
use stream::{Backend, Builder, CheckedTopology, Request, StreamMetrics};
use stream::{CheckedTopology, Request, StreamMetrics};

type Endpoint = Arc<Backend<Request>>;
type Topology = endpoint::TopologyProtocol<Builder<Parser, Request>, Endpoint, Request, Parser>;
type Topology = endpoint::TopologyProtocol<Request, Parser>;
// 一直侦听,直到成功侦听或者取消侦听(当前尚未支持取消侦听)
// 1. 尝试侦听之前,先确保服务配置信息已经更新完成
pub(super) async fn process_one(
Expand Down
2 changes: 2 additions & 0 deletions endpoint/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ sharding = { path = "../sharding" }
log = { path = "../log" }
rt = { path = "../rt" }
context = { path = "../context" }
noop-waker = "0.1.0"
metrics = { path = "../metrics" }

byteorder = "1.4.3"
bytes = "1.0.1"
Expand Down
31 changes: 21 additions & 10 deletions stream/src/builder.rs → endpoint/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use ds::chan::mpsc::{channel, Sender, TrySendError};
use ds::Switcher;

use crate::checker::BackendChecker;
use endpoint::{Builder, Endpoint, Single, Timeout};
use crate::{Endpoint, Single, Timeout};
use metrics::Path;
use protocol::{Error, Protocol, Request, ResOption, Resource};

Expand All @@ -18,15 +18,26 @@ pub struct BackendBuilder<P, R> {
_marker: std::marker::PhantomData<(P, R)>,
}

impl<P: Protocol, R: Request> Builder<P, R, Arc<Backend<R>>> for BackendBuilder<P, R> {
fn auth_option_build(
pub type Backend<R> = Arc<InnerBackend<R>>;

impl<P: Protocol, R: Request> BackendBuilder<P, R> {
pub fn build(
addr: &str,
parser: P,
rsrc: Resource,
service: &str,
timeout: Timeout,
) -> Backend<R> {
Self::auth_option_build(addr, parser, rsrc, service, timeout, Default::default())
}
pub fn auth_option_build(
addr: &str,
parser: P,
rsrc: Resource,
service: &str,
timeout: Timeout,
option: ResOption,
) -> Arc<Backend<R>> {
) -> Backend<R> {
let (tx, rx) = channel(256);
let finish: Switcher = false.into();
let init: Switcher = false.into();
Expand All @@ -38,7 +49,7 @@ impl<P: Protocol, R: Request> Builder<P, R, Arc<Backend<R>>> for BackendBuilder<
let s = single.clone();
rt::spawn(async move { checker.start_check(s).await });

Backend {
InnerBackend {
finish,
init,
tx,
Expand All @@ -48,7 +59,7 @@ impl<P: Protocol, R: Request> Builder<P, R, Arc<Backend<R>>> for BackendBuilder<
}
}

pub struct Backend<R> {
pub struct InnerBackend<R> {
single: Arc<AtomicBool>,
tx: Sender<R>,
// 实例销毁时,设置该值,通知checker,会议上check.
Expand All @@ -57,21 +68,21 @@ pub struct Backend<R> {
init: Switcher,
}

impl<R> discovery::Inited for Backend<R> {
impl<R> discovery::Inited for InnerBackend<R> {
// 已经连接上或者至少连接了一次
#[inline]
fn inited(&self) -> bool {
self.init.get()
}
}

impl<R> Drop for Backend<R> {
impl<R> Drop for InnerBackend<R> {
fn drop(&mut self) {
self.finish.on();
}
}

impl<R: Request> Endpoint for Backend<R> {
impl<R: Request> Endpoint for InnerBackend<R> {
type Item = R;
#[inline]
fn send(&self, req: R) {
Expand All @@ -84,7 +95,7 @@ impl<R: Request> Endpoint for Backend<R> {
}
}
}
impl<R> Single for Backend<R> {
impl<R> Single for InnerBackend<R> {
fn single(&self) -> bool {
self.single.load(Acquire)
}
Expand Down
57 changes: 25 additions & 32 deletions endpoint/src/cacheservice/topo.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{Builder, Endpoint, Topology};
use crate::{Backend, Endpoint, Topology};
use discovery::TopologyWrite;
use protocol::{Protocol, Request, Resource, TryNextType};
use sharding::hash::{Hash, HashKey, Hasher};
Expand All @@ -12,20 +12,20 @@ use crate::Timeout;
use protocol::Bit;

#[derive(Clone)]
pub struct CacheService<B, E, Req, P> {
pub struct CacheService<Req, P> {
// 一共有n组,每组1个连接。
// 排列顺序: master, master l1, slave, slave l1
streams: Distance<Shards<E, Req>>,
streams: Distance<Shards<Backend<Req>, Req>>,
// streams里面的前r_num个数据是提供读的(这个长度不包含slave l1, slave)。
hasher: Hasher,
parser: P,
exp_sec: u32,
force_write_all: bool, // 兼容已有业务逻辑,set master失败后,是否更新其他layer
backend_no_storage: bool, // true:mc后面没有存储
_marker: std::marker::PhantomData<(B, Req)>,
_marker: std::marker::PhantomData<Req>,
}

impl<B, E, Req, P> From<P> for CacheService<B, E, Req, P> {
impl<Req, P> From<P> for CacheService<Req, P> {
#[inline]
fn from(parser: P) -> Self {
Self {
Expand All @@ -40,10 +40,7 @@ impl<B, E, Req, P> From<P> for CacheService<B, E, Req, P> {
}
}

impl<B, E, Req, P> discovery::Inited for CacheService<B, E, Req, P>
where
E: discovery::Inited,
{
impl<Req, P> discovery::Inited for CacheService<Req, P> {
#[inline]
fn inited(&self) -> bool {
self.streams.len() > 0
Expand All @@ -54,35 +51,30 @@ where
}
}

impl<B, E, Req, P> Hash for CacheService<B, E, Req, P>
impl<Req, P> Hash for CacheService<Req, P>
where
E: Endpoint<Item = Req>,
Req: Request,
P: Protocol,
B: Send + Sync,
{
#[inline]
fn hash<K: HashKey>(&self, k: &K) -> i64 {
self.hasher.hash(k)
}
}

impl<B, E, Req, P> Topology for CacheService<B, E, Req, P>
impl<Req, P> Topology for CacheService<Req, P>
where
E: Endpoint<Item = Req>,
Req: Request,
P: Protocol,
B: Send + Sync,
{
#[inline]
fn exp_sec(&self) -> u32 {
self.exp_sec
}
}

impl<B: Send + Sync, E, Req, P> Endpoint for CacheService<B, E, Req, P>
impl<Req, P> Endpoint for CacheService<Req, P>
where
E: Endpoint<Item = Req>,
Req: Request,
P: Protocol,
{
Expand Down Expand Up @@ -122,10 +114,7 @@ where
unsafe { self.streams.get_unchecked(idx).send(req) };
}
}
impl<B: Send + Sync, E, Req: Request, P: Protocol> CacheService<B, E, Req, P>
where
E: Endpoint<Item = Req>,
{
impl<Req: Request, P: Protocol> CacheService<Req, P> {
#[inline]
fn context_store(&self, ctx: &mut super::Context, req: &Req) -> (usize, bool, bool) {
let (idx, try_next, write_back);
Expand Down Expand Up @@ -190,11 +179,10 @@ where
(idx, try_next, write_back)
}
}
impl<B, E, Req, P> TopologyWrite for CacheService<B, E, Req, P>
impl<Req, P> TopologyWrite for CacheService<Req, P>
where
B: Builder<P, Req, E>,
P: Protocol,
E: Endpoint<Item = Req>,
Req: Request,
{
#[inline]
fn update(&mut self, namespace: &str, cfg: &str) {
Expand All @@ -212,7 +200,7 @@ where
let old = &mut streams;

for shards in old_streams {
let group: Vec<(E, String)> = shards.into();
let group: Vec<(Backend<Req>, String)> = shards.into();
for e in group {
old.insert(e.1, e.0);
}
Expand Down Expand Up @@ -256,30 +244,35 @@ where
v
}
}
impl<B, E, Req, P> CacheService<B, E, Req, P>
impl<Req, P> CacheService<Req, P>
where
B: Builder<P, Req, E>,
P: Protocol,
E: Endpoint<Item = Req>,
Req: Request,
{
fn build(
&self,
old: &mut HashMap<String, E>,
old: &mut HashMap<String, Backend<Req>>,
addrs: Vec<String>,
dist: &str,
name: &str,
timeout: Timeout,
) -> Shards<E, Req> {
) -> Shards<Backend<Req>, Req> {
Shards::from(dist, addrs, |addr| {
old.remove(addr).map(|e| e).unwrap_or_else(|| {
B::build(addr, self.parser.clone(), Resource::Memcache, name, timeout)
crate::BackendBuilder::build(
addr,
self.parser.clone(),
Resource::Memcache,
name,
timeout,
)
})
})
}
}

use std::fmt::{self, Display, Formatter};
impl<B, E, Req, P> Display for CacheService<B, E, Req, P> {
impl<Req, P> Display for CacheService<Req, P> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
f,
Expand Down
4 changes: 2 additions & 2 deletions stream/src/checker.rs → endpoint/src/checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub struct BackendChecker<P, Req> {
init: Switcher,
parser: P,
addr: String,
timeout: endpoint::Timeout,
timeout: crate::Timeout,
path: Path,
option: ResOption,
}
Expand All @@ -36,7 +36,7 @@ impl<P, Req> BackendChecker<P, Req> {
init: Switcher,
parser: P,
path: Path,
timeout: endpoint::Timeout,
timeout: crate::Timeout,
option: ResOption,
) -> Self {
Self {
Expand Down
File renamed without changes.
Loading
Loading