From 06f46c30b4ded475d07f67e3d6709699e355e461 Mon Sep 17 00:00:00 2001 From: kongkong Date: Thu, 9 Nov 2023 16:12:30 +0800 Subject: [PATCH 1/2] move builder to endpoint --- agent/src/main.rs | 6 +-- agent/src/service.rs | 5 +- endpoint/Cargo.toml | 2 + {stream => endpoint}/src/builder.rs | 31 ++++++++---- endpoint/src/cacheservice/topo.rs | 57 +++++++++------------ {stream => endpoint}/src/checker.rs | 4 +- {stream => endpoint}/src/handler.rs | 0 endpoint/src/kv/topo.rs | 42 ++++++---------- endpoint/src/lib.rs | 5 ++ endpoint/src/msgque/topo.rs | 78 +++++++++++++---------------- endpoint/src/phantomservice/topo.rs | 48 ++++++++---------- {stream => endpoint}/src/reconn.rs | 0 endpoint/src/redisservice/topo.rs | 50 ++++++++---------- endpoint/src/topo.rs | 65 ++++++++++++------------ endpoint/src/uuid/topo.rs | 54 ++++++++++---------- stream/Cargo.toml | 1 - stream/src/lib.rs | 8 --- tests/src/discovery.rs | 2 +- tests/src/layout.rs | 21 ++++---- 19 files changed, 222 insertions(+), 257 deletions(-) rename {stream => endpoint}/src/builder.rs (76%) rename {stream => endpoint}/src/checker.rs (98%) rename {stream => endpoint}/src/handler.rs (100%) rename {stream => endpoint}/src/reconn.rs (100%) diff --git a/agent/src/main.rs b/agent/src/main.rs index 4e2c4ff25..e09569094 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -59,10 +59,8 @@ async fn run() -> Result<()> { } use protocol::Parser; -use std::sync::Arc; -use stream::{Backend, Builder, Request}; -type Endpoint = Arc>; -type Topology = endpoint::TopologyProtocol, Endpoint, Request, Parser>; +use stream::Request; +type Topology = endpoint::TopologyProtocol; async fn discovery_init( ctx: &'static Context, rx: Receiver>, diff --git a/agent/src/service.rs b/agent/src/service.rs index 8604c7c1f..cc936fbfe 100644 --- a/agent/src/service.rs +++ b/agent/src/service.rs @@ -9,10 +9,9 @@ use ds::chan::Sender; use metrics::Path; use protocol::{Parser, Result}; use stream::pipeline::copy_bidirectional; -use stream::{Backend, Builder, CheckedTopology, Request, StreamMetrics}; +use stream::{CheckedTopology, Request, StreamMetrics}; -type Endpoint = Arc>; -type Topology = endpoint::TopologyProtocol, Endpoint, Request, Parser>; +type Topology = endpoint::TopologyProtocol; // 一直侦听,直到成功侦听或者取消侦听(当前尚未支持取消侦听) // 1. 尝试侦听之前,先确保服务配置信息已经更新完成 pub(super) async fn process_one( diff --git a/endpoint/Cargo.toml b/endpoint/Cargo.toml index 8f57ec0e8..411f13873 100644 --- a/endpoint/Cargo.toml +++ b/endpoint/Cargo.toml @@ -15,6 +15,8 @@ sharding = { path = "../sharding" } log = { path = "../log" } rt = { path = "../rt" } context = { path = "../context" } +noop-waker = "0.1.0" +metrics = { path = "../metrics" } byteorder = "1.4.3" bytes = "1.0.1" diff --git a/stream/src/builder.rs b/endpoint/src/builder.rs similarity index 76% rename from stream/src/builder.rs rename to endpoint/src/builder.rs index 5dc30bd32..e03bb6c19 100644 --- a/stream/src/builder.rs +++ b/endpoint/src/builder.rs @@ -9,7 +9,7 @@ use ds::chan::mpsc::{channel, Sender, TrySendError}; use ds::Switcher; use crate::checker::BackendChecker; -use endpoint::{Builder, Endpoint, Single, Timeout}; +use crate::{Endpoint, Single, Timeout}; use metrics::Path; use protocol::{Error, Protocol, Request, ResOption, Resource}; @@ -18,15 +18,26 @@ pub struct BackendBuilder { _marker: std::marker::PhantomData<(P, R)>, } -impl Builder>> for BackendBuilder { - fn auth_option_build( +pub type Backend = Arc>; + +impl BackendBuilder { + pub fn build( + addr: &str, + parser: P, + rsrc: Resource, + service: &str, + timeout: Timeout, + ) -> Backend { + Self::auth_option_build(addr, parser, rsrc, service, timeout, Default::default()) + } + pub fn auth_option_build( addr: &str, parser: P, rsrc: Resource, service: &str, timeout: Timeout, option: ResOption, - ) -> Arc> { + ) -> Backend { let (tx, rx) = channel(256); let finish: Switcher = false.into(); let init: Switcher = false.into(); @@ -38,7 +49,7 @@ impl Builder>> for BackendBuilder< let s = single.clone(); rt::spawn(async move { checker.start_check(s).await }); - Backend { + InnerBackend { finish, init, tx, @@ -48,7 +59,7 @@ impl Builder>> for BackendBuilder< } } -pub struct Backend { +pub struct InnerBackend { single: Arc, tx: Sender, // 实例销毁时,设置该值,通知checker,会议上check. @@ -57,7 +68,7 @@ pub struct Backend { init: Switcher, } -impl discovery::Inited for Backend { +impl discovery::Inited for InnerBackend { // 已经连接上或者至少连接了一次 #[inline] fn inited(&self) -> bool { @@ -65,13 +76,13 @@ impl discovery::Inited for Backend { } } -impl Drop for Backend { +impl Drop for InnerBackend { fn drop(&mut self) { self.finish.on(); } } -impl Endpoint for Backend { +impl Endpoint for InnerBackend { type Item = R; #[inline] fn send(&self, req: R) { @@ -84,7 +95,7 @@ impl Endpoint for Backend { } } } -impl Single for Backend { +impl Single for InnerBackend { fn single(&self) -> bool { self.single.load(Acquire) } diff --git a/endpoint/src/cacheservice/topo.rs b/endpoint/src/cacheservice/topo.rs index 9eeec78c9..9cc6435aa 100644 --- a/endpoint/src/cacheservice/topo.rs +++ b/endpoint/src/cacheservice/topo.rs @@ -1,4 +1,4 @@ -use crate::{Builder, Endpoint, Topology}; +use crate::{Backend, Endpoint, Topology}; use discovery::TopologyWrite; use protocol::{Protocol, Request, Resource, TryNextType}; use sharding::hash::{Hash, HashKey, Hasher}; @@ -12,20 +12,20 @@ use crate::Timeout; use protocol::Bit; #[derive(Clone)] -pub struct CacheService { +pub struct CacheService { // 一共有n组,每组1个连接。 // 排列顺序: master, master l1, slave, slave l1 - streams: Distance>, + streams: Distance, Req>>, // streams里面的前r_num个数据是提供读的(这个长度不包含slave l1, slave)。 hasher: Hasher, parser: P, exp_sec: u32, force_write_all: bool, // 兼容已有业务逻辑,set master失败后,是否更新其他layer backend_no_storage: bool, // true:mc后面没有存储 - _marker: std::marker::PhantomData<(B, Req)>, + _marker: std::marker::PhantomData, } -impl From

for CacheService { +impl From

for CacheService { #[inline] fn from(parser: P) -> Self { Self { @@ -40,10 +40,7 @@ impl From

for CacheService { } } -impl discovery::Inited for CacheService -where - E: discovery::Inited, -{ +impl discovery::Inited for CacheService { #[inline] fn inited(&self) -> bool { self.streams.len() > 0 @@ -54,12 +51,10 @@ where } } -impl Hash for CacheService +impl Hash for CacheService where - E: Endpoint, Req: Request, P: Protocol, - B: Send + Sync, { #[inline] fn hash(&self, k: &K) -> i64 { @@ -67,12 +62,10 @@ where } } -impl Topology for CacheService +impl Topology for CacheService where - E: Endpoint, Req: Request, P: Protocol, - B: Send + Sync, { #[inline] fn exp_sec(&self) -> u32 { @@ -80,9 +73,8 @@ where } } -impl Endpoint for CacheService +impl Endpoint for CacheService where - E: Endpoint, Req: Request, P: Protocol, { @@ -122,10 +114,7 @@ where unsafe { self.streams.get_unchecked(idx).send(req) }; } } -impl CacheService -where - E: Endpoint, -{ +impl CacheService { #[inline] fn context_store(&self, ctx: &mut super::Context, req: &Req) -> (usize, bool, bool) { let (idx, try_next, write_back); @@ -190,11 +179,10 @@ where (idx, try_next, write_back) } } -impl TopologyWrite for CacheService +impl TopologyWrite for CacheService where - B: Builder, P: Protocol, - E: Endpoint, + Req: Request, { #[inline] fn update(&mut self, namespace: &str, cfg: &str) { @@ -212,7 +200,7 @@ where let old = &mut streams; for shards in old_streams { - let group: Vec<(E, String)> = shards.into(); + let group: Vec<(Backend, String)> = shards.into(); for e in group { old.insert(e.1, e.0); } @@ -256,30 +244,35 @@ where v } } -impl CacheService +impl CacheService where - B: Builder, P: Protocol, - E: Endpoint, + Req: Request, { fn build( &self, - old: &mut HashMap, + old: &mut HashMap>, addrs: Vec, dist: &str, name: &str, timeout: Timeout, - ) -> Shards { + ) -> Shards, Req> { Shards::from(dist, addrs, |addr| { old.remove(addr).map(|e| e).unwrap_or_else(|| { - B::build(addr, self.parser.clone(), Resource::Memcache, name, timeout) + crate::BackendBuilder::build( + addr, + self.parser.clone(), + Resource::Memcache, + name, + timeout, + ) }) }) } } use std::fmt::{self, Display, Formatter}; -impl Display for CacheService { +impl Display for CacheService { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { write!( f, diff --git a/stream/src/checker.rs b/endpoint/src/checker.rs similarity index 98% rename from stream/src/checker.rs rename to endpoint/src/checker.rs index 10f08da0f..85594c6f4 100644 --- a/stream/src/checker.rs +++ b/endpoint/src/checker.rs @@ -23,7 +23,7 @@ pub struct BackendChecker { init: Switcher, parser: P, addr: String, - timeout: endpoint::Timeout, + timeout: crate::Timeout, path: Path, option: ResOption, } @@ -36,7 +36,7 @@ impl BackendChecker { init: Switcher, parser: P, path: Path, - timeout: endpoint::Timeout, + timeout: crate::Timeout, option: ResOption, ) -> Self { Self { diff --git a/stream/src/handler.rs b/endpoint/src/handler.rs similarity index 100% rename from stream/src/handler.rs rename to endpoint/src/handler.rs diff --git a/endpoint/src/kv/topo.rs b/endpoint/src/kv/topo.rs index 724d47235..bab3f40cc 100644 --- a/endpoint/src/kv/topo.rs +++ b/endpoint/src/kv/topo.rs @@ -17,7 +17,7 @@ use sharding::hash::{Hash, HashKey}; use sharding::Distance; use crate::dns::DnsConfig; -use crate::Builder; +use crate::Backend; use crate::Single; use crate::Timeout; use crate::{Endpoint, Topology}; @@ -27,16 +27,16 @@ use super::config::Years; use super::strategy::Strategist; use super::KVCtx; #[derive(Clone)] -pub struct KvService { - shards: Shards, +pub struct KvService { + shards: Shards>, // selector: Selector, strategist: Strategist, parser: P, cfg: Box>, - _mark: std::marker::PhantomData<(B, Req)>, + _mark: std::marker::PhantomData, } -impl From

for KvService { +impl From

for KvService { #[inline] fn from(parser: P) -> Self { Self { @@ -50,12 +50,10 @@ impl From

for KvService { } } -impl Hash for KvService +impl Hash for KvService where - E: Endpoint, Req: Request, P: Protocol, - B: Send + Sync, { #[inline] fn hash(&self, k: &K) -> i64 { @@ -63,18 +61,15 @@ where } } -impl Topology for KvService +impl Topology for KvService where - E: Endpoint, Req: Request, P: Protocol, - B: Send + Sync, { } -impl Endpoint for KvService +impl Endpoint for KvService where - E: Endpoint, Req: Request, P: Protocol, { @@ -157,11 +152,10 @@ where } } -impl TopologyWrite for KvService +impl TopologyWrite for KvService where - B: Builder, P: Protocol, - E: Endpoint + Single, + Req: Request, { fn need_load(&self) -> bool { self.shards.len() != self.cfg.shards_url.len() || self.cfg.need_load() @@ -176,23 +170,22 @@ where } } } -impl KvService +impl KvService where - B: Builder, P: Protocol, - E: Endpoint + Single, + Req: Request, { // #[inline] fn take_or_build( &self, - old: &mut HashMap>, + old: &mut HashMap>>, addr: &str, timeout: Timeout, res: ResOption, - ) -> E { + ) -> Backend { match old.get_mut(addr).map(|endpoints| endpoints.pop()) { Some(Some(end)) => end, - _ => B::auth_option_build( + _ => crate::BackendBuilder::auth_option_build( &addr, self.parser.clone(), Resource::Mysql, @@ -314,10 +307,7 @@ where true } } -impl discovery::Inited for KvService -where - E: discovery::Inited, -{ +impl discovery::Inited for KvService { // 每一个域名都有对应的endpoint,并且都初始化完成。 #[inline] fn inited(&self) -> bool { diff --git a/endpoint/src/lib.rs b/endpoint/src/lib.rs index 07c6e8267..e7da12993 100644 --- a/endpoint/src/lib.rs +++ b/endpoint/src/lib.rs @@ -2,12 +2,17 @@ mod shards; mod topo; pub use topo::*; +mod builder; pub mod cacheservice; +pub(crate) mod checker; +pub mod handler; pub mod kv; pub mod msgque; pub mod phantomservice; +mod reconn; pub mod redisservice; pub mod uuid; +pub use builder::*; pub(crate) mod dns; diff --git a/endpoint/src/msgque/topo.rs b/endpoint/src/msgque/topo.rs index 614e2dc7a..cc66882cc 100644 --- a/endpoint/src/msgque/topo.rs +++ b/endpoint/src/msgque/topo.rs @@ -11,7 +11,7 @@ use tokio::time::Instant; use std::collections::{BTreeMap, HashMap}; -use crate::{Builder, Endpoint, Timeout, Topology}; +use crate::{Backend, Endpoint, Timeout, Topology}; use sharding::hash::{Hash, HashKey, Hasher, Padding}; use crate::msgque::strategy::hitfirst::Node; @@ -31,15 +31,15 @@ const OFFLINE_STOP_READ_SECONDS: u64 = 60 * 20; const OFFLINE_CLEAN_SECONDS: u64 = OFFLINE_STOP_READ_SECONDS + 60 * 2; #[derive(Clone)] -pub struct MsgQue { +pub struct MsgQue { service: String, // 读写stream需要分开,读会有大量的空读 - streams_read: Vec<(String, E, usize)>, - streams_write: BTreeMap>, + streams_read: Vec<(String, Backend, usize)>, + streams_write: BTreeMap)>>, // 轮询访问,N分钟后下线 - streams_offline: Vec<(String, E)>, + streams_offline: Vec<(String, Backend)>, offline_hits: Arc, offline_time: Instant, @@ -51,10 +51,10 @@ pub struct MsgQue { timeout_write: Timeout, timeout_read: Timeout, - _marker: std::marker::PhantomData<(B, Req)>, + _marker: std::marker::PhantomData, } -impl From

for MsgQue { +impl From

for MsgQue { #[inline] fn from(parser: P) -> Self { Self { @@ -75,10 +75,7 @@ impl From

for MsgQue { } } -impl discovery::Inited for MsgQue -where - E: discovery::Inited, -{ +impl discovery::Inited for MsgQue { #[inline] fn inited(&self) -> bool { // check read streams @@ -109,12 +106,10 @@ where const PADDING: Hasher = Hasher::Padding(Padding); -impl Hash for MsgQue +impl Hash for MsgQue where - E: Endpoint, Req: Request, P: Protocol, - B: Send + Sync, { #[inline] fn hash(&self, k: &K) -> i64 { @@ -122,10 +117,8 @@ where } } -impl Topology for MsgQue +impl Topology for MsgQue where - B: Send + Sync, - E: Endpoint, Req: Request, P: Protocol, { @@ -138,10 +131,8 @@ where } //TODO: 验证的时候需要考虑512字节这种边界msg -impl Endpoint for MsgQue +impl Endpoint for MsgQue where - B: Send + Sync, - E: Endpoint, Req: Request, P: Protocol, { @@ -212,10 +203,8 @@ where } //获得待读取的streams和qid,返回的bool指示是否从offline streams中读取,true为都offline stream -impl MsgQue +impl MsgQue where - B: Send + Sync, - E: Endpoint, Req: Request, P: Protocol, { @@ -249,14 +238,16 @@ where } } -impl MsgQue +impl MsgQue where - B: Builder, - E: Endpoint, P: Protocol, + Req: Request, { // 构建下线的队列 - fn build_offline(&mut self, sized_queue: &BTreeMap>) -> Vec<(String, E)> { + fn build_offline( + &mut self, + sized_queue: &BTreeMap>, + ) -> Vec<(String, Backend)> { let mut new_addrs = HashSet::with_capacity(self.streams_read.len()); let _ = sized_queue .iter() @@ -265,7 +256,7 @@ where if !new_addrs.contains(name) { self.streams_offline.push(( name.clone(), - B::build( + crate::BackendBuilder::build( name, self.parser.clone(), Resource::MsgQue, @@ -290,11 +281,11 @@ where fn build_read_streams( &self, - old: &mut HashMap, + old: &mut HashMap>, addrs: &BTreeMap>, name: &str, timeout: Timeout, - ) -> Vec<(String, E, usize)> { + ) -> Vec<(String, Backend, usize)> { let mut streams = Vec::with_capacity(addrs.len()); for (size, servs) in addrs.iter() { for s in servs.iter() { @@ -306,7 +297,7 @@ where .map(|(srv, size)| { ( srv.clone(), - old.remove(srv).unwrap_or(B::build( + old.remove(srv).unwrap_or(crate::BackendBuilder::build( srv, self.parser.clone(), Resource::MsgQue, @@ -324,7 +315,7 @@ where addrs: &BTreeMap>, name: &str, timeout: Timeout, - ) -> BTreeMap> { + ) -> BTreeMap)>> { let mut old_streams = HashMap::with_capacity(self.streams_write.len() * 3); if self.streams_write.len() > 0 { let mut first_key = 512; @@ -348,13 +339,15 @@ where .map(|addr| { ( addr.clone(), - old_streams.remove(addr).unwrap_or(B::build( - addr, - self.parser.clone(), - Resource::MsgQue, - name, - timeout, - )), + old_streams + .remove(addr) + .unwrap_or(crate::BackendBuilder::build( + addr, + self.parser.clone(), + Resource::MsgQue, + name, + timeout, + )), ) }) .collect(), @@ -385,11 +378,10 @@ where // } } -impl TopologyWrite for MsgQue +impl TopologyWrite for MsgQue where - B: Builder, P: Protocol, - E: Endpoint, + Req: Request, { #[inline] fn update(&mut self, name: &str, cfg: &str) { @@ -403,7 +395,7 @@ where self.timeout_write.adjust(ns.timeout_write); let old_r = self.streams_read.split_off(0); - let mut old_streams_read: HashMap = + let mut old_streams_read: HashMap> = old_r.into_iter().map(|(addr, e, _)| (addr, e)).collect(); // 首先构建offline stream,如果前一次下线ips已经超时,先清理 diff --git a/endpoint/src/phantomservice/topo.rs b/endpoint/src/phantomservice/topo.rs index 56e963cf3..aeedfcfd8 100644 --- a/endpoint/src/phantomservice/topo.rs +++ b/endpoint/src/phantomservice/topo.rs @@ -1,6 +1,6 @@ use std::{collections::HashMap, marker::PhantomData}; -use crate::{dns::DnsConfig, Builder, Endpoint, Timeout, Topology}; +use crate::{dns::DnsConfig, Backend, Endpoint, Timeout, Topology}; use discovery::{ dns::{self, IPPort}, TopologyWrite, @@ -15,17 +15,17 @@ use sharding::{ use super::config::PhantomNamespace; #[derive(Clone)] -pub struct PhantomService { +pub struct PhantomService { // 一般有2组,相互做HA,每组是一个域名列表,域名下只有一个ip,但会变化 - streams: Vec>, + streams: Vec)>>, hasher: Crc32, distribution: Range, parser: P, cfg: Box>, - _mark: PhantomData<(B, Req)>, + _mark: PhantomData, } -impl From

for PhantomService { +impl From

for PhantomService { fn from(parser: P) -> Self { Self { parser, @@ -38,12 +38,10 @@ impl From

for PhantomService { } } -impl Hash for PhantomService +impl Hash for PhantomService where - E: Endpoint, Req: Request, P: Protocol, - B: Send + Sync, { #[inline] fn hash(&self, k: &K) -> i64 { @@ -51,21 +49,17 @@ where } } -impl Topology for PhantomService +impl Topology for PhantomService where - E: Endpoint, Req: Request, P: Protocol, - B: Send + Sync, { } -impl Endpoint for PhantomService +impl Endpoint for PhantomService where - E: Endpoint, Req: Request, P: Protocol, - B: Send + Sync, { type Item = Req; #[inline] @@ -92,11 +86,10 @@ where } } -impl TopologyWrite for PhantomService +impl TopologyWrite for PhantomService where - B: Builder, P: Protocol, - E: Endpoint, + Req: Request, { #[inline] fn update(&mut self, namespace: &str, cfg: &str) { @@ -129,17 +122,21 @@ where } } -impl PhantomService +impl PhantomService where - B: Builder, P: Protocol, - E: Endpoint, + Req: Request, { #[inline] - fn take_or_build(&self, old: &mut HashMap>, addr: &str, timeout: Timeout) -> E { + fn take_or_build( + &self, + old: &mut HashMap>>, + addr: &str, + timeout: Timeout, + ) -> Backend { match old.get_mut(addr).map(|endpoints| endpoints.pop()) { Some(Some(end)) => end, - _ => B::build( + _ => crate::BackendBuilder::build( &addr, self.parser.clone(), Resource::Redis, @@ -200,10 +197,7 @@ where } } -impl discovery::Inited for PhantomService -where - E: discovery::Inited, -{ +impl discovery::Inited for PhantomService { // 每一个域名都有对应的endpoint,并且都初始化完成。 #[inline] fn inited(&self) -> bool { @@ -219,7 +213,7 @@ where }) } } -impl std::fmt::Debug for PhantomService { +impl std::fmt::Debug for PhantomService { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{:?}", self.cfg) } diff --git a/stream/src/reconn.rs b/endpoint/src/reconn.rs similarity index 100% rename from stream/src/reconn.rs rename to endpoint/src/reconn.rs diff --git a/endpoint/src/redisservice/topo.rs b/endpoint/src/redisservice/topo.rs index 7d55386c3..59571dc11 100644 --- a/endpoint/src/redisservice/topo.rs +++ b/endpoint/src/redisservice/topo.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use crate::{Builder, Endpoint, Single, Topology}; +use crate::{Backend, Endpoint, Single, Topology}; use discovery::TopologyWrite; use protocol::{Protocol, RedisFlager, Request, Resource}; use sharding::distribution::Distribute; @@ -12,16 +12,16 @@ use crate::{dns::DnsConfig, Timeout}; use discovery::dns::{self, IPPort}; #[derive(Clone)] -pub struct RedisService { +pub struct RedisService { // 一共shards.len()个分片,每个分片 shard[0]是master, shard[1..]是slave - shards: Vec>, + shards: Vec>>, hasher: Hasher, distribute: Distribute, parser: P, cfg: Box>, - _mark: std::marker::PhantomData<(B, Req)>, + _mark: std::marker::PhantomData, } -impl From

for RedisService { +impl From

for RedisService { #[inline] fn from(parser: P) -> Self { Self { @@ -35,12 +35,10 @@ impl From

for RedisService { } } -impl Hash for RedisService +impl Hash for RedisService where - E: Endpoint, Req: Request, P: Protocol, - B: Send + Sync, { #[inline] fn hash(&self, k: &K) -> i64 { @@ -48,18 +46,15 @@ where } } -impl Topology for RedisService +impl Topology for RedisService where - E: Endpoint, Req: Request, P: Protocol, - B: Send + Sync, { } -impl Endpoint for RedisService +impl Endpoint for RedisService where - E: Endpoint, Req: Request, P: Protocol, { @@ -129,11 +124,10 @@ where self.distribute.index(hash) } } -impl TopologyWrite for RedisService +impl TopologyWrite for RedisService where - B: Builder, P: Protocol, - E: Endpoint + Single, + Req: Request, { #[inline] fn update(&mut self, namespace: &str, cfg: &str) { @@ -157,10 +151,7 @@ where self.cfg.load_guard().check_load(|| self.load_inner()); } } -impl discovery::Inited for RedisService -where - E: discovery::Inited, -{ +impl discovery::Inited for RedisService { // 每一个域名都有对应的endpoint,并且都初始化完成。 #[inline] fn inited(&self) -> bool { @@ -173,25 +164,29 @@ where .fold(true, |inited, shard| inited && shard.inited()) } } -impl RedisService { +impl RedisService { #[inline] fn len(&self) -> usize { self.shards.len() } } -impl RedisService +impl RedisService where - B: Builder, P: Protocol, - E: Endpoint + Single, + Req: Request, { #[inline] - fn take_or_build(&self, old: &mut HashMap>, addr: &str, timeout: Timeout) -> E { + fn take_or_build( + &self, + old: &mut HashMap>>, + addr: &str, + timeout: Timeout, + ) -> Backend { let service = &self.cfg.service; match old.get_mut(addr).map(|endpoints| endpoints.pop()) { Some(Some(end)) => end, - _ => B::build( + _ => crate::BackendBuilder::build( &addr, self.parser.clone(), Resource::Redis, @@ -342,9 +337,8 @@ impl Shard { .fold(true, |inited, (_, e)| inited && e.inited()) } } -impl std::fmt::Display for RedisService +impl std::fmt::Display for RedisService where - E: Endpoint, Req: Request, P: Protocol, { diff --git a/endpoint/src/topo.rs b/endpoint/src/topo.rs index 72223172d..15b3cd9c5 100644 --- a/endpoint/src/topo.rs +++ b/endpoint/src/topo.rs @@ -1,11 +1,10 @@ use std::io::{Error, ErrorKind, Result}; use discovery::Inited; -use protocol::{Protocol, ResOption, Resource}; +use protocol::Protocol; use sharding::hash::{Hash, HashKey}; use crate::msgque::topo::MsgQue; -use crate::Timeout; use enum_dispatch::enum_dispatch; @@ -69,31 +68,31 @@ where } } -pub trait Builder { - fn build(addr: &str, parser: P, rsrc: Resource, service: &str, timeout: Timeout) -> E { - Self::auth_option_build(addr, parser, rsrc, service, timeout, Default::default()) - } +// pub trait Builder { +// fn build(addr: &str, parser: P, rsrc: Resource, service: &str, timeout: Timeout) -> E { +// Self::auth_option_build(addr, parser, rsrc, service, timeout, Default::default()) +// } - // TODO: ResOption -> AuthOption - fn auth_option_build( - addr: &str, - parser: P, - rsrc: Resource, - service: &str, - timeout: Timeout, - option: ResOption, - ) -> E; -} +// // TODO: ResOption -> AuthOption +// fn auth_option_build( +// addr: &str, +// parser: P, +// rsrc: Resource, +// service: &str, +// timeout: Timeout, +// option: ResOption, +// ) -> E; +// } macro_rules! define_topology { ($($top:ty, $item:ident, $ep:expr);+) => { #[derive(Clone)] - pub enum TopologyProtocol { + pub enum TopologyProtocol { $($item($top)),+ } - impl TopologyProtocol { + impl TopologyProtocol { pub fn try_from(parser:P, endpoint:&str) -> Result { match &endpoint[..]{ $($ep => Ok(Self::$item(parser.into())),)+ @@ -101,7 +100,7 @@ macro_rules! define_topology { } } } - impl Inited for TopologyProtocol where E:Inited { + impl Inited for TopologyProtocol { #[inline] fn inited(&self) -> bool { match self { @@ -112,7 +111,7 @@ macro_rules! define_topology { } } -impl discovery::TopologyWrite for TopologyProtocol where P:Sync+Send+Protocol, B:Builder, E:Endpoint+Single{ +impl discovery::TopologyWrite for TopologyProtocol where P:Sync+Send+Protocol, R:protocol::Request{ #[inline] fn update(&mut self, name: &str, cfg: &str) { match self { @@ -139,8 +138,8 @@ impl discovery::TopologyWrite for TopologyProtocol where } } -impl Hash for TopologyProtocol -where P:Sync+Send+Protocol, E:Endpoint, R:protocol::Request{ +impl Hash for TopologyProtocol +where P:Sync+Send+Protocol, R:protocol::Request{ #[inline] fn hash(&self, k:&K) -> i64 { match self { @@ -151,8 +150,8 @@ where P:Sync+Send+Protocol, E:Endpoint, R:protocol::Request{ } } -impl Topology for TopologyProtocol -where P:Sync+Send+Protocol, E:Endpoint, R:protocol::Request{ +impl Topology for TopologyProtocol +where P:Sync+Send+Protocol, R:protocol::Request{ #[inline] fn exp_sec(&self) -> u32 { match self { @@ -163,11 +162,11 @@ where P:Sync+Send+Protocol, E:Endpoint, R:protocol::Request{ } } -impl Endpoint for TopologyProtocol -where P:Sync+Send+Protocol, E:Endpoint, +impl Endpoint for TopologyProtocol +where P:Sync+Send+Protocol, R: protocol::Request, P: Protocol, - B:Send+Sync, + { type Item = R; #[inline] @@ -203,12 +202,12 @@ use crate::redisservice::topo::RedisService; use crate::uuid::topo::UuidService; define_topology! { - MsgQue, MsgQue, "mq"; - RedisService, RedisService, "rs"; - CacheService, CacheService, "cs"; - PhantomService, PhantomService, "pt"; - KvService, KvService, "kv"; - UuidService, UuidService, "uuid" + MsgQue, MsgQue, "mq"; + RedisService, RedisService, "rs"; + CacheService, CacheService, "cs"; + PhantomService, PhantomService, "pt"; + KvService, KvService, "kv"; + UuidService, UuidService, "uuid" } // 从环境变量获取是否开启后端资源访问的性能模式 diff --git a/endpoint/src/uuid/topo.rs b/endpoint/src/uuid/topo.rs index b1107ba2f..f68f2836c 100644 --- a/endpoint/src/uuid/topo.rs +++ b/endpoint/src/uuid/topo.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use crate::{Builder, Endpoint, Single, Topology}; +use crate::{Backend, Endpoint, Topology}; use discovery::TopologyWrite; use protocol::{Protocol, Request, Resource}; use sharding::{ @@ -13,13 +13,13 @@ use crate::{dns::DnsConfig, Timeout}; use discovery::dns::{self, IPPort}; #[derive(Clone)] -pub struct UuidService { - shard: Distance<(String, E)>, +pub struct UuidService { + shard: Distance<(String, Backend)>, parser: P, cfg: Box>, - _mark: std::marker::PhantomData<(B, Req)>, + _mark: std::marker::PhantomData, } -impl From

for UuidService { +impl From

for UuidService { #[inline] fn from(parser: P) -> Self { Self { @@ -31,12 +31,10 @@ impl From

for UuidService { } } -impl Hash for UuidService +impl Hash for UuidService where - E: Endpoint, Req: Request, P: Protocol, - B: Send + Sync, { #[inline] fn hash(&self, _k: &K) -> i64 { @@ -44,18 +42,15 @@ where } } -impl Topology for UuidService +impl Topology for UuidService where - E: Endpoint, Req: Request, P: Protocol, - B: Send + Sync, { } -impl Endpoint for UuidService +impl Endpoint for UuidService where - E: Endpoint, Req: Request, P: Protocol, { @@ -96,11 +91,10 @@ where 0 } } -impl TopologyWrite for UuidService +impl TopologyWrite for UuidService where - B: Builder, P: Protocol, - E: Endpoint + Single, + Req: Request, { #[inline] fn update(&mut self, namespace: &str, cfg: &str) { @@ -118,10 +112,7 @@ where self.cfg.load_guard().check_load(|| self.load_inner()); } } -impl discovery::Inited for UuidService -where - E: discovery::Inited, -{ +impl discovery::Inited for UuidService { #[inline] fn inited(&self) -> bool { self.shard.len() > 0 @@ -132,18 +123,28 @@ where } } -impl UuidService +impl UuidService where - B: Builder, P: Protocol, - E: Endpoint + Single, + Req: Request, { #[inline] - fn take_or_build(&self, old: &mut HashMap>, addr: &str, timeout: Timeout) -> E { + fn take_or_build( + &self, + old: &mut HashMap>>, + addr: &str, + timeout: Timeout, + ) -> Backend { let service = &self.cfg.service; match old.get_mut(addr).map(|endpoints| endpoints.pop()) { Some(Some(end)) => end, - _ => B::build(&addr, self.parser.clone(), Resource::Uuid, service, timeout), + _ => crate::BackendBuilder::build( + &addr, + self.parser.clone(), + Resource::Uuid, + service, + timeout, + ), } } @@ -198,9 +199,8 @@ where } } -impl std::fmt::Display for UuidService +impl std::fmt::Display for UuidService where - E: Endpoint, Req: Request, P: Protocol, { diff --git a/stream/Cargo.toml b/stream/Cargo.toml index 74cab0ae9..1247349b3 100644 --- a/stream/Cargo.toml +++ b/stream/Cargo.toml @@ -17,7 +17,6 @@ rt = { path = "../rt" } log = { path = "../log" } enum_dispatch = "0.3.8" -noop-waker = "0.1.0" array-init = "2" ctor = "0.1.23" diff --git a/stream/src/lib.rs b/stream/src/lib.rs index 18e9e9433..53c8c4fa5 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -1,9 +1,7 @@ //pub mod buffer; -pub mod handler; pub mod pipeline; pub use protocol::callback::*; pub use protocol::request::*; -mod reconn; mod context; @@ -11,12 +9,6 @@ pub trait Read { fn consume (usize, Out)>(&mut self, c: C) -> Out; } -mod builder; -pub use builder::BackendBuilder as Builder; -pub use builder::*; - -pub(crate) mod checker; - mod metric; pub use metric::StreamMetrics; diff --git a/tests/src/discovery.rs b/tests/src/discovery.rs index c58341dc9..50a9b107b 100644 --- a/tests/src/discovery.rs +++ b/tests/src/discovery.rs @@ -7,7 +7,7 @@ use protocol::Parser; fn refresh() { use stream::{Backend, Builder, Request}; type Endpoint = Arc>; - type Topology = endpoint::TopologyProtocol, Endpoint, Request, Parser>; + type Topology = endpoint::TopologyProtocol; let service = "redisservice"; let p = Parser::try_from("redis").unwrap(); let top: Topology = endpoint::TopologyProtocol::try_from(p.clone(), "rs").unwrap(); diff --git a/tests/src/layout.rs b/tests/src/layout.rs index 3e904ab46..6db8d9274 100644 --- a/tests/src/layout.rs +++ b/tests/src/layout.rs @@ -1,10 +1,10 @@ // 不要轻易变更这里面的测试用例,除非你知道你在做什么。拉相关同学进行方案评审。 -use std::{mem::size_of, sync::Arc}; +use std::mem::size_of; +use endpoint::Backend; use protocol::{callback::CallbackContext, Parser}; -use stream::{Backend, Request}; -type Endpoint = Arc>; -type Topology = endpoint::TopologyProtocol; +use stream::Request; +type Topology = endpoint::TopologyProtocol; //type RefreshTopology = endpoint::RefreshTopology; type CheckedTopology = stream::CheckedTopology; @@ -12,14 +12,12 @@ type CheckedTopology = stream::CheckedTopology; type CopyBidirectional = stream::pipeline::CopyBidirectional; type Stream = rt::Stream; -type Handler<'r> = stream::handler::Handler<'r, Request, Parser, Stream>; +type Handler<'r> = endpoint::handler::Handler<'r, Request, Parser, Stream>; -type Builder = stream::Builder; -type CacheService = endpoint::cacheservice::topo::CacheService; -type RedisService = endpoint::redisservice::topo::RedisService; -type PhantomService = - endpoint::phantomservice::topo::PhantomService; -type MsgQue = endpoint::msgque::topo::MsgQue; +type CacheService = endpoint::cacheservice::topo::CacheService; +type RedisService = endpoint::redisservice::topo::RedisService; +type PhantomService = endpoint::phantomservice::topo::PhantomService; +type MsgQue = endpoint::msgque::topo::MsgQue; use rt::Entry; @@ -40,7 +38,6 @@ fn checkout_basic() { assert_eq!(64, size_of::()); assert_eq!(1, size_of::()); assert_eq!(48, size_of::>()); - assert_eq!(0, size_of::()); assert_eq!(40, size_of::()); assert_eq!(368, size_of::()); assert_eq!(24, size_of::()); From 0da31b075d0a87c402318a846f60871bff0ea637 Mon Sep 17 00:00:00 2001 From: kongkong Date: Thu, 9 Nov 2023 16:47:24 +0800 Subject: [PATCH 2/2] layout fix --- tests/src/layout.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/src/layout.rs b/tests/src/layout.rs index 6db8d9274..749a0c792 100644 --- a/tests/src/layout.rs +++ b/tests/src/layout.rs @@ -1,7 +1,7 @@ // 不要轻易变更这里面的测试用例,除非你知道你在做什么。拉相关同学进行方案评审。 use std::mem::size_of; -use endpoint::Backend; +use endpoint::InnerBackend; use protocol::{callback::CallbackContext, Parser}; use stream::Request; type Topology = endpoint::TopologyProtocol; @@ -37,7 +37,7 @@ fn checkout_basic() { assert_eq!(16, size_of::()); assert_eq!(64, size_of::()); assert_eq!(1, size_of::()); - assert_eq!(48, size_of::>()); + assert_eq!(48, size_of::>()); assert_eq!(40, size_of::()); assert_eq!(368, size_of::()); assert_eq!(24, size_of::());