From 711bea32302b25fd0966e5ce92a1691ff4f11151 Mon Sep 17 00:00:00 2001 From: parabala Date: Fri, 10 Nov 2023 10:28:15 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E5=BC=80=E5=90=AF=E5=8F=AF=E7=94=A8?= =?UTF-8?q?=E5=8C=BA=E6=97=B6=EF=BC=8C=E7=94=9F=E6=88=90=E5=8F=AF=E7=94=A8?= =?UTF-8?q?=E5=8C=BA=E5=86=85=E5=AE=9E=E4=BE=8B=E6=95=B0=E9=87=8F=E7=9A=84?= =?UTF-8?q?=E7=9B=91=E6=8E=A7=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- discovery/Cargo.toml | 1 + discovery/src/distance.rs | 20 ++++++++++++++++++++ endpoint/Cargo.toml | 1 + endpoint/src/redisservice/topo.rs | 27 ++++++++++++++++++++++++++- metrics/src/lib.rs | 11 +++++++++++ metrics/src/macros.rs | 12 ++++++++++++ metrics/src/types/number.rs | 4 ++++ sharding/src/select/by_distance.rs | 16 ++++++++++------ 8 files changed, 85 insertions(+), 7 deletions(-) diff --git a/discovery/Cargo.toml b/discovery/Cargo.toml index b6e8590f4..e06860b13 100644 --- a/discovery/Cargo.toml +++ b/discovery/Cargo.toml @@ -10,6 +10,7 @@ edition = "2021" log = { path = "../log" } ds = { path = "../ds" } metrics = { path = "../metrics" } +context = { path = "../context" } url = "2.2.2" async-trait.workspace = true diff --git a/discovery/src/distance.rs b/discovery/src/distance.rs index 3e06c21bf..3a8160535 100644 --- a/discovery/src/distance.rs +++ b/discovery/src/distance.rs @@ -208,6 +208,9 @@ impl DistanceCalculator { } } } + pub fn region(&self) -> &Option { + &self.local_region + } } pub trait Addr { @@ -494,3 +497,20 @@ impl BClass for &String { b } } + +// 本机的region,优先级: +// 1. 启动参数/环境变量的region +// 2. 本机IP对应的region +// 3. 本机IP +pub fn region() -> String { + if let Some(region) = context::get().region() { + return region.to_string(); + } + + let cal = unsafe { DISTANCE_CALCULATOR.get_unchecked().get() }; + if let Some(region) = cal.region() { + return region.clone(); + } + + return metrics::raw_local_ip().to_string(); +} diff --git a/endpoint/Cargo.toml b/endpoint/Cargo.toml index 8f57ec0e8..f8bfa5c0a 100644 --- a/endpoint/Cargo.toml +++ b/endpoint/Cargo.toml @@ -15,6 +15,7 @@ sharding = { path = "../sharding" } log = { path = "../log" } rt = { path = "../rt" } context = { path = "../context" } +metrics = { path = "../metrics" } byteorder = "1.4.3" bytes = "1.0.1" diff --git a/endpoint/src/redisservice/topo.rs b/endpoint/src/redisservice/topo.rs index 7d55386c3..29e79b90b 100644 --- a/endpoint/src/redisservice/topo.rs +++ b/endpoint/src/redisservice/topo.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use crate::{Builder, Endpoint, Single, Topology}; -use discovery::TopologyWrite; +use discovery::{distance, TopologyWrite}; use protocol::{Protocol, RedisFlager, Request, Resource}; use sharding::distribution::Distribute; use sharding::hash::{Hash, HashKey, Hasher}; @@ -10,6 +10,7 @@ use sharding::Distance; use super::config::RedisNamespace; use crate::{dns::DnsConfig, Timeout}; use discovery::dns::{self, IPPort}; +use metrics::Path; #[derive(Clone)] pub struct RedisService { @@ -262,6 +263,7 @@ where for (master_addr, slaves) in addrs { assert_ne!(master_addr.len(), 0); assert_ne!(slaves.len(), 0); + let port = master_addr.port().to_string(); let master = self.take_or_build(&mut old, &master_addr, self.cfg.timeout_master()); master.enable_single(); @@ -281,6 +283,13 @@ where replicas, self.cfg.basic.region_enabled, ); + + // 推送可用区内的len + if self.cfg.basic.region_enabled { + let len_region = shard.len_region(); + self.len_region_metric(len_region, port.as_str()); + } + self.shards.push(shard); } assert_eq!(self.shards.len(), self.cfg.shards_url.len()); @@ -291,6 +300,19 @@ where true } + + fn len_region_metric(&self, len_region: u16, port: &str) { + let path = Path::new(vec![ + Resource::Redis.name(), + port, + distance::region().as_str(), + ]); + let mut metric = path.num("region_resource"); + if metric.inited() { + metric.zero_num() + }; + metric += len_region + 10000; // snapshot时>0才有输出,len_region这里加10000作为基准值,保证监控有数据; + } } #[derive(Clone)] struct Shard { @@ -327,6 +349,9 @@ impl Shard { fn next(&self, idx: usize, runs: usize) -> (usize, &(String, E)) { unsafe { self.slaves.unsafe_next(idx, runs) } } + pub fn len_region(&self) -> u16 { + self.slaves.len_region() + } } impl Shard { // 1. 主已经初始化 diff --git a/metrics/src/lib.rs b/metrics/src/lib.rs index ad3985782..5c34ff2de 100644 --- a/metrics/src/lib.rs +++ b/metrics/src/lib.rs @@ -53,6 +53,17 @@ impl Metric { &mut *(self as *const _ as *mut _) } } + pub fn inited(&mut self) -> bool { + self.item.inited() + } + // num类型,若未初始化则尝试初始化;若已经初始化,则值清0 + pub fn zero_num(&mut self) { + if !self.inited() { + self.try_inited(); + } else { + self.item.data().zero_num(); + } + } } impl AddAssign for Metric { #[inline] diff --git a/metrics/src/macros.rs b/metrics/src/macros.rs index 11a3e708e..3c422e099 100644 --- a/metrics/src/macros.rs +++ b/metrics/src/macros.rs @@ -4,6 +4,7 @@ use std::sync::Arc; pub trait MetricData: Sized { fn incr_to(self, data: &ItemData); fn incr_to_cache(self, _id: &Arc) {} + fn zero(self, _data: &ItemData) {} } #[derive(Default, Debug)] @@ -30,6 +31,13 @@ impl ItemData { assert!(self.id.t.is_num()); self.inner.num.incr(num); } + #[inline] + pub fn zero_num(&self) { + assert!(self.id.t.is_num()); + unsafe { + self.inner.num.zero(); + } + } } use crate::ToNumber; impl MetricData for T { @@ -41,6 +49,10 @@ impl MetricData for T { fn incr_to_cache(self, id: &Arc) { crate::register_cache(id, self.int()); } + #[inline] + fn zero(self, data: &ItemData) { + data.zero_num(); + } } use ds::time::Duration; impl MetricData for Duration { diff --git a/metrics/src/types/number.rs b/metrics/src/types/number.rs index 536551b72..fe43949ad 100644 --- a/metrics/src/types/number.rs +++ b/metrics/src/types/number.rs @@ -55,6 +55,10 @@ impl Number { pub(crate) fn incr(&self, v: i64) { self.inner.incr(v); } + #[inline] + pub(crate) fn zero(&self) { + self.inner.zero(); + } } pub trait ToNumber { diff --git a/sharding/src/select/by_distance.rs b/sharding/src/select/by_distance.rs index e27acc274..429cd98f2 100644 --- a/sharding/src/select/by_distance.rs +++ b/sharding/src/select/by_distance.rs @@ -31,7 +31,8 @@ impl BackendQuota { // 2. 其他资源,replicas长度与len_local相同 #[derive(Clone)] pub struct Distance { - len_local: u16, + len_local: u16, // 实际使用的local实例数量 + len_region: u16, // 通过排序计算出的可用区内的实例数量,len_region <= len_local backend_quota: bool, idx: Arc, replicas: Vec<(T, BackendQuota)>, @@ -40,6 +41,7 @@ impl Distance { pub fn new() -> Self { Self { len_local: 0, + len_region: 0, backend_quota: false, idx: Default::default(), replicas: Vec::new(), @@ -72,11 +74,10 @@ impl Distance { // 按distance选local // 1. 距离小于等于4为local // 2. local为0,则全部为local - let l = replicas.sort_by_region( - Vec::new(), - context::get().region(), - |d, _| d <= discovery::distance::DISTANCE_VAL_REGION, - ); + let l = replicas.sort_by_region(Vec::new(), context::get().region(), |d, _| { + d <= discovery::distance::DISTANCE_VAL_REGION + }); + me.len_region = l as u16; // 可用区内的实例数量 if l == 0 { log::warn!( "too few instance in region:{} total:{}, {:?}", @@ -103,6 +104,9 @@ impl Distance { me } + pub fn len_region(&self) -> u16 { + self.len_region + } #[inline] pub fn from(replicas: Vec) -> Self { Self::with_performance_tuning(replicas, true, false) From 5a04f664c36acc1c6be1d50c45fd2aab2e18dadc Mon Sep 17 00:00:00 2001 From: parabala Date: Thu, 16 Nov 2023 11:08:27 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E6=A0=B9=E6=8D=AE=E8=AF=84=E5=AE=A1?= =?UTF-8?q?=E6=84=8F=E8=A7=81=EF=BC=8C=E7=A7=BB=E9=99=A4discovery=E5=AF=B9?= =?UTF-8?q?context=E7=9A=84=E4=BE=9D=E8=B5=96=EF=BC=8C=E7=A7=BB=E5=8A=A8re?= =?UTF-8?q?gion=E7=9B=B8=E5=85=B3metics=E7=9B=B8=E5=85=B3=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0=E7=9A=84=E4=BD=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- discovery/Cargo.toml | 1 - discovery/src/distance.rs | 13 +++------- endpoint/src/redisservice/config.rs | 3 +++ endpoint/src/redisservice/topo.rs | 40 +++++++++++++++++------------ metrics/src/types/host.rs | 11 ++++++++ metrics/src/types/mod.rs | 2 +- 6 files changed, 42 insertions(+), 28 deletions(-) diff --git a/discovery/Cargo.toml b/discovery/Cargo.toml index e06860b13..b6e8590f4 100644 --- a/discovery/Cargo.toml +++ b/discovery/Cargo.toml @@ -10,7 +10,6 @@ edition = "2021" log = { path = "../log" } ds = { path = "../ds" } metrics = { path = "../metrics" } -context = { path = "../context" } url = "2.2.2" async-trait.workspace = true diff --git a/discovery/src/distance.rs b/discovery/src/distance.rs index 3a8160535..5358dc1cb 100644 --- a/discovery/src/distance.rs +++ b/discovery/src/distance.rs @@ -499,18 +499,13 @@ impl BClass for &String { } // 本机的region,优先级: -// 1. 启动参数/环境变量的region -// 2. 本机IP对应的region -// 3. 本机IP -pub fn region() -> String { - if let Some(region) = context::get().region() { - return region.to_string(); - } - +// 1. 本机IP对应的region +// 2. 未知region,返回cnx +pub fn host_region() -> String { let cal = unsafe { DISTANCE_CALCULATOR.get_unchecked().get() }; if let Some(region) = cal.region() { return region.clone(); } - return metrics::raw_local_ip().to_string(); + return "cnx".to_string(); } diff --git a/endpoint/src/redisservice/config.rs b/endpoint/src/redisservice/config.rs index e809b1f92..c54b16ed4 100644 --- a/endpoint/src/redisservice/config.rs +++ b/endpoint/src/redisservice/config.rs @@ -104,4 +104,7 @@ impl RedisNamespace { true } + pub(super) fn resource_type(&self) -> &str { + self.basic.resource_type.as_str() + } } diff --git a/endpoint/src/redisservice/topo.rs b/endpoint/src/redisservice/topo.rs index 29e79b90b..a4ba54532 100644 --- a/endpoint/src/redisservice/topo.rs +++ b/endpoint/src/redisservice/topo.rs @@ -10,7 +10,6 @@ use sharding::Distance; use super::config::RedisNamespace; use crate::{dns::DnsConfig, Timeout}; use discovery::dns::{self, IPPort}; -use metrics::Path; #[derive(Clone)] pub struct RedisService { @@ -284,11 +283,18 @@ where self.cfg.basic.region_enabled, ); - // 推送可用区内的len - if self.cfg.basic.region_enabled { - let len_region = shard.len_region(); - self.len_region_metric(len_region, port.as_str()); - } + // 生成端口副本数量监控数据 + let n: u16 = if self.cfg.basic.region_enabled { + shard.len_region() + 10000 // snapshot时>0才有输出,len_region这里加10000作为基准值,保证监控有数据; + } else { + 0 // region功能关闭时,len_region为0,也就是不输出监控数据;可用区从打开到关闭场景,0也要生成监控数据,覆盖已有的同path旧数据 + }; + metrics::resource_num_metric( + self.cfg.resource_type(), + self.cfg.service.as_str(), + (host_region() + ":" + port.as_str()).as_str(), + n, + ); self.shards.push(shard); } @@ -300,20 +306,20 @@ where true } +} - fn len_region_metric(&self, len_region: u16, port: &str) { - let path = Path::new(vec![ - Resource::Redis.name(), - port, - distance::region().as_str(), - ]); - let mut metric = path.num("region_resource"); - if metric.inited() { - metric.zero_num() - }; - metric += len_region + 10000; // snapshot时>0才有输出,len_region这里加10000作为基准值,保证监控有数据; +// 本机的region,优先级: +// 1. 启动参数/环境变量的region +// 2. 通过本机IP计算出来的region +// TODO 目前仅本文件内被使用,后续可以考虑放在一个公共的地方 +fn host_region() -> String { + if let Some(region) = context::get().region() { + return region.to_string(); } + + distance::host_region() } + #[derive(Clone)] struct Shard { master: (String, E), diff --git a/metrics/src/types/host.rs b/metrics/src/types/host.rs index c1894986a..08e3239b2 100644 --- a/metrics/src/types/host.rs +++ b/metrics/src/types/host.rs @@ -121,3 +121,14 @@ pub fn decr_task() { pub fn set_sockfile_failed(failed_count: usize) { SOCKFILE_FAILED.store(failed_count as i64, Relaxed); } + +// fn unchange_number_metric(region_enable:bool,len_region: u16, port: &str, rsname: &str, region: &str) { +pub fn resource_num_metric(source: &str, namespace: &str, bip: &str, n: u16) { + let path = crate::Path::new(vec![source, namespace, bip]); + let mut metric = path.num("region_resource"); + if metric.inited() { + metric.zero_num() + }; + + metric += n; +} diff --git a/metrics/src/types/mod.rs b/metrics/src/types/mod.rs index 3e8d3138b..990448051 100644 --- a/metrics/src/types/mod.rs +++ b/metrics/src/types/mod.rs @@ -6,7 +6,7 @@ mod rtt; mod status; pub(crate) use host::*; -pub use host::{decr_task, incr_task, set_sockfile_failed}; +pub use host::{decr_task, incr_task, resource_num_metric, set_sockfile_failed}; pub(crate) use number::*; pub(crate) use qps::*; pub(crate) use ratio::*;