Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

可用区内资源数量监控 #391

Merged
merged 4 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions discovery/src/distance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ impl DistanceCalculator {
}
}
}
pub fn region(&self) -> &Option<String> {
&self.local_region
}
}

pub trait Addr {
Expand Down Expand Up @@ -494,3 +497,15 @@ impl BClass for &String {
b
}
}

// 本机的region,优先级:
// 1. 本机IP对应的region
// 2. 未知region,返回cnx
pub fn host_region() -> String {
let cal = unsafe { DISTANCE_CALCULATOR.get_unchecked().get() };
if let Some(region) = cal.region() {
return region.clone();
}

return "cnx".to_string();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这也可以不用string吧

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

返回&str有编译报错

}
1 change: 1 addition & 0 deletions endpoint/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ sharding = { path = "../sharding" }
log = { path = "../log" }
rt = { path = "../rt" }
context = { path = "../context" }
metrics = { path = "../metrics" }
procs = { path = "../procs" }

byteorder = "1.4.3"
Expand Down
3 changes: 3 additions & 0 deletions endpoint/src/redisservice/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,7 @@ impl RedisNamespace {

true
}
pub(super) fn resource_type(&self) -> &str {
self.basic.resource_type.as_str()
}
}
33 changes: 32 additions & 1 deletion endpoint/src/redisservice/topo.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;

use crate::{Builder, Endpoint, Single, Topology};
use discovery::TopologyWrite;
use discovery::{distance, TopologyWrite};
use protocol::{Protocol, RedisFlager, Request, Resource};
use sharding::distribution::Distribute;
use sharding::hash::{Hash, HashKey, Hasher};
Expand Down Expand Up @@ -262,6 +262,7 @@ where
for (master_addr, slaves) in addrs {
assert_ne!(master_addr.len(), 0);
assert_ne!(slaves.len(), 0);
let port = master_addr.port().to_string();
let master = self.take_or_build(&mut old, &master_addr, self.cfg.timeout_master());
master.enable_single();

Expand All @@ -281,6 +282,20 @@ where
replicas,
self.cfg.basic.region_enabled,
);

// 生成端口副本数量监控数据
let n: u16 = if self.cfg.basic.region_enabled {
shard.len_region() + 10000 // snapshot时>0才有输出,len_region这里加10000作为基准值,保证监控有数据;
} else {
0 // region功能关闭时,len_region为0,也就是不输出监控数据;可用区从打开到关闭场景,0也要生成监控数据,覆盖已有的同path旧数据
};
metrics::resource_num_metric(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

增加的指标数量会不会太多,内存总量有上升吗?指标动态创建会有瞬时的较大内存申请

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

从tcp copy环境看没有肉眼可见的增加

self.cfg.resource_type(),
self.cfg.service.as_str(),
(host_region() + ":" + port.as_str()).as_str(),
n,
);

self.shards.push(shard);
}
assert_eq!(self.shards.len(), self.cfg.shards_url.len());
Expand All @@ -292,6 +307,19 @@ where
true
}
}

// 本机的region,优先级:
// 1. 启动参数/环境变量的region
// 2. 通过本机IP计算出来的region
// TODO 目前仅本文件内被使用,后续可以考虑放在一个公共的地方
fn host_region() -> String {
if let Some(region) = context::get().region() {
return region.to_string();
}

distance::host_region()
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为啥返回string呢?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

与distance::host_region()保持一致;distance::host_region()返回string的原因是返回str会报错;


#[derive(Clone)]
struct Shard<E> {
master: (String, E),
Expand Down Expand Up @@ -327,6 +355,9 @@ impl<E> Shard<E> {
fn next(&self, idx: usize, runs: usize) -> (usize, &(String, E)) {
unsafe { self.slaves.unsafe_next(idx, runs) }
}
pub fn len_region(&self) -> u16 {
self.slaves.len_region()
}
}
impl<E: discovery::Inited> Shard<E> {
// 1. 主已经初始化
Expand Down
11 changes: 11 additions & 0 deletions metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ impl Metric {
&mut *(self as *const _ as *mut _)
}
}
pub fn inited(&mut self) -> bool {
self.item.inited()
}
// num类型,若未初始化则尝试初始化;若已经初始化,则值清0
pub fn zero_num(&mut self) {
if !self.inited() {
self.try_inited();
} else {
self.item.data().zero_num();
}
}
}
impl<T: MetricData + Debug> AddAssign<T> for Metric {
#[inline]
Expand Down
12 changes: 12 additions & 0 deletions metrics/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::sync::Arc;
pub trait MetricData: Sized {
fn incr_to(self, data: &ItemData);
fn incr_to_cache(self, _id: &Arc<Id>) {}
fn zero(self, _data: &ItemData) {}
}

#[derive(Default, Debug)]
Expand All @@ -30,6 +31,13 @@ impl ItemData {
assert!(self.id.t.is_num());
self.inner.num.incr(num);
}
#[inline]
pub fn zero_num(&self) {
assert!(self.id.t.is_num());
unsafe {
self.inner.num.zero();
}
}
}
use crate::ToNumber;
impl<T: ToNumber> MetricData for T {
Expand All @@ -41,6 +49,10 @@ impl<T: ToNumber> MetricData for T {
fn incr_to_cache(self, id: &Arc<Id>) {
crate::register_cache(id, self.int());
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

增加个set方法更合适吧

#[inline]
fn zero(self, data: &ItemData) {
data.zero_num();
}
}
use ds::time::Duration;
impl MetricData for Duration {
Expand Down
11 changes: 11 additions & 0 deletions metrics/src/types/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,14 @@ pub fn decr_task() {
pub fn set_sockfile_failed(failed_count: usize) {
SOCKFILE_FAILED.store(failed_count as i64, Relaxed);
}

// fn unchange_number_metric(region_enable:bool,len_region: u16, port: &str, rsname: &str, region: &str) {
pub fn resource_num_metric(source: &str, namespace: &str, bip: &str, n: u16) {
let path = crate::Path::new(vec![source, namespace, bip]);
let mut metric = path.num("region_resource");
if metric.inited() {
metric.zero_num()
};

metric += n;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可用区指标相关不应放在host下吧

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

没合适的位置;新增一个文件似乎也不值当。有建议的地方吗?

2 changes: 1 addition & 1 deletion metrics/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod rtt;
mod status;

pub(crate) use host::*;
pub use host::{decr_task, incr_task, set_sockfile_failed};
pub use host::{decr_task, incr_task, resource_num_metric, set_sockfile_failed};
pub(crate) use number::*;
pub(crate) use qps::*;
pub(crate) use ratio::*;
Expand Down
4 changes: 4 additions & 0 deletions metrics/src/types/number.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ impl Number {
pub(crate) fn incr(&self, v: i64) {
self.inner.incr(v);
}
#[inline]
pub(crate) fn zero(&self) {
self.inner.zero();
}
}

pub trait ToNumber {
Expand Down
16 changes: 10 additions & 6 deletions sharding/src/select/by_distance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ impl BackendQuota {
// 2. 其他资源,replicas长度与len_local相同
#[derive(Clone)]
pub struct Distance<T> {
len_local: u16,
len_local: u16, // 实际使用的local实例数量
len_region: u16, // 通过排序计算出的可用区内的实例数量,len_region <= len_local
backend_quota: bool,
idx: Arc<AtomicUsize>,
replicas: Vec<(T, BackendQuota)>,
Expand All @@ -40,6 +41,7 @@ impl<T: Addr> Distance<T> {
pub fn new() -> Self {
Self {
len_local: 0,
len_region: 0,
backend_quota: false,
idx: Default::default(),
replicas: Vec::new(),
Expand Down Expand Up @@ -72,11 +74,10 @@ impl<T: Addr> Distance<T> {
// 按distance选local
// 1. 距离小于等于4为local
// 2. local为0,则全部为local
let l = replicas.sort_by_region(
Vec::new(),
context::get().region(),
|d, _| d <= discovery::distance::DISTANCE_VAL_REGION,
);
let l = replicas.sort_by_region(Vec::new(), context::get().region(), |d, _| {
d <= discovery::distance::DISTANCE_VAL_REGION
});
me.len_region = l as u16; // 可用区内的实例数量
if l == 0 {
log::warn!(
"too few instance in region:{} total:{}, {:?}",
Expand All @@ -103,6 +104,9 @@ impl<T: Addr> Distance<T> {

me
}
pub fn len_region(&self) -> u16 {
self.len_region
}
#[inline]
pub fn from(replicas: Vec<T>) -> Self {
Self::with_performance_tuning(replicas, true, false)
Expand Down
Loading