Skip to content

Commit

Permalink
feat: add performance metrics log task
Browse files Browse the repository at this point in the history
  • Loading branch information
vicanso committed Dec 24, 2024
1 parent 252c7c3 commit 7dbe176
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 40 deletions.
6 changes: 3 additions & 3 deletions src/acme/lets_encrypt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ async fn update_certificate_lets_encrypt(
async fn do_update_certificates(
count: u32,
params: Vec<(String, Vec<String>)>,
) -> Result<(), String> {
) -> Result<bool, String> {
// Add 1 every loop
let offset = 10;
if count % offset != 0 {
return Ok(());
return Ok(false);
}
for (name, domains) in params.iter() {
let should_renew_now =
Expand Down Expand Up @@ -123,7 +123,7 @@ async fn do_update_certificates(
),
};
}
Ok(())
Ok(true)
}

/// Create a Let's Encrypt service to generate the certificate,
Expand Down
17 changes: 10 additions & 7 deletions src/cache/http_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,30 +121,33 @@ pub trait HttpCacheStorage: Sync + Send {
}
}

async fn do_file_storage_clear(count: u32, dir: String) -> Result<(), String> {
async fn do_file_storage_clear(
count: u32,
dir: String,
) -> Result<bool, String> {
// Add 1 every loop
let offset = 60;
if count % offset != 0 {
return Ok(());
return Ok(false);
}
let Ok(storage) = file::new_file_cache(&dir) else {
return Ok(());
return Ok(false);
};

let Some(access_before) =
SystemTime::now().checked_sub(Duration::from_secs(24 * 3600))
else {
return Ok(());
return Ok(false);
};

let Ok((success, fail)) = storage.clear(access_before).await else {
return Ok(());
return Ok(true);
};
if success < 0 {
return Ok(());
return Ok(true);
}
info!(dir, success, fail, "cache storage clear");
Ok(())
Ok(true)
}

pub fn new_file_storage_clear_service(
Expand Down
8 changes: 5 additions & 3 deletions src/certificate/self_signed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ type SelfSignedCertificateMap = AHashMap<String, Arc<SelfSignedCertificate>>;
static SELF_SIGNED_CERTIFICATE_MAP: Lazy<ArcSwap<SelfSignedCertificateMap>> =
Lazy::new(|| ArcSwap::from_pointee(AHashMap::new()));

async fn do_self_signed_certificate_validity(count: u32) -> Result<(), String> {
async fn do_self_signed_certificate_validity(
count: u32,
) -> Result<bool, String> {
// Add 1 every loop
let offset = 24 * 60;
if count % offset != 0 {
return Ok(());
return Ok(false);
}
let mut m = AHashMap::new();

Expand Down Expand Up @@ -67,7 +69,7 @@ async fn do_self_signed_certificate_validity(count: u32) -> Result<(), String> {
m.insert(k.to_string(), v.clone());
}
SELF_SIGNED_CERTIFICATE_MAP.store(Arc::new(m));
Ok(())
Ok(true)
}

pub fn new_self_signed_certificate_validity_service(
Expand Down
6 changes: 3 additions & 3 deletions src/certificate/validity_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ fn validity_check(
Ok(())
}

async fn do_validity_check(count: u32) -> Result<(), String> {
async fn do_validity_check(count: u32) -> Result<bool, String> {
// Add 1 every loop
let offset = 24 * 60;
if count % offset != 0 {
return Ok(());
return Ok(false);
}
let certificate_info_list = get_certificate_info_list();
let time_offset = 7 * 24 * 3600_i64;
Expand All @@ -70,7 +70,7 @@ async fn do_validity_check(count: u32) -> Result<(), String> {
})
.await;
}
Ok(())
Ok(true)
}

/// Create certificate validate background service
Expand Down
10 changes: 5 additions & 5 deletions src/logger/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ struct LogCompressParams {
async fn do_compress(
count: u32,
params: LogCompressParams,
) -> Result<(), String> {
) -> Result<bool, String> {
let offset = 60;
if count % offset != 0 {
return Ok(());
return Ok(false);
}
if params.time_point_hour != chrono::Local::now().hour() as u8 {
return Ok(());
return Ok(false);
}
let mut days_ago = params.days_ago;
if days_ago == 0 {
Expand All @@ -97,7 +97,7 @@ async fn do_compress(
let Some(access_before) = SystemTime::now()
.checked_sub(Duration::from_secs(24 * 3600 * days_ago as u64))
else {
return Ok(());
return Ok(false);
};
let compression_exts = [GZIP_EXT.to_string(), ZSTD_EXT.to_string()];
for entry in WalkDir::new(&params.path)
Expand Down Expand Up @@ -147,7 +147,7 @@ async fn do_compress(
},
}
}
Ok(())
Ok(true)
}

fn new_log_compress_service(
Expand Down
6 changes: 5 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ use pingora::services::background::background_service;
use proxy::{new_upstream_health_check_task, Server, ServerConf};
use service::new_simple_service_task;
use service::{new_auto_restart_service, new_observer_service};
use state::{get_admin_addr, get_start_time, set_admin_addr};
use state::{
get_admin_addr, get_start_time, new_performance_metrics_log_service,
set_admin_addr,
};
use std::collections::HashMap;
use std::error::Error;
use std::ffi::OsString;
Expand Down Expand Up @@ -475,6 +478,7 @@ fn run() -> Result<(), Box<dyn Error>> {
let mut simple_tasks = vec![
new_certificate_validity_service(),
new_self_signed_certificate_validity_service(),
new_performance_metrics_log_service(),
];
if let Some(task) = new_file_storage_clear_service() {
simple_tasks.push(task);
Expand Down
46 changes: 33 additions & 13 deletions src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use tracing::{error, info};
pub static LOG_CATEGORY: &str = "service";

pub type SimpleServiceTaskFuture =
Box<dyn Fn(u32) -> BoxFuture<'static, Result<(), String>> + Sync + Send>;
Box<dyn Fn(u32) -> BoxFuture<'static, Result<bool, String>> + Sync + Send>;

pub struct SimpleServiceTask {
name: String,
Expand Down Expand Up @@ -70,22 +70,42 @@ impl BackgroundService for SimpleServiceTask {
_ = period.tick() => {
let now = SystemTime::now();
let count = self.count.fetch_add(1, Ordering::Relaxed);
let mut fails = 0;
for (name, task) in self.tasks.iter() {
if let Err(e) = task(count).await {
fails += 1;
error!(
category = LOG_CATEGORY,
name,
simple_service = self.name,
e,
);
}
let mut success_tasks = vec![];
let mut fail_tasks = vec![];
for (task_name, task) in self.tasks.iter() {
let task_start = SystemTime::now();
match task(count).await {
Err(e) => {
fail_tasks.push(task_name.to_string());
error!(
category = LOG_CATEGORY,
name = self.name,
task = task_name,
e,
);
}
Ok(executed) => {
if executed {
success_tasks.push(task_name.to_string());
info!(
category = LOG_CATEGORY,
name = self.name,
task = task_name,
elapsed = format!(
"{}ms",
task_start.elapsed().unwrap_or_default().as_millis()
),
);
}
}
};
}
info!(
category = LOG_CATEGORY,
name = self.name,
fails,
success_tasks = success_tasks.join(","),
fails = fail_tasks.len(),
fail_tasks = fail_tasks.join(","),
elapsed = format!(
"{}ms",
now.elapsed().unwrap_or_default().as_millis()
Expand Down
26 changes: 26 additions & 0 deletions src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::service::SimpleServiceTaskFuture;
#[cfg(feature = "full")]
use snafu::Snafu;
use tracing::info;

mod ctx;
mod process;
Expand All @@ -37,3 +39,27 @@ pub enum Error {
}
#[cfg(feature = "full")]
pub type Result<T, E = Error> = std::result::Result<T, E>;

pub fn new_performance_metrics_log_service() -> (String, SimpleServiceTaskFuture)
{
let task: SimpleServiceTaskFuture = Box::new(move |_count: u32| {
Box::pin({
async move {
let system_info = get_process_system_info();
let (processing, accepted) = get_processing_accepted();
info!(
threadds = system_info.threads,
accepted,
processing,
used_memory = system_info.memory,
fd_count = system_info.fd_count,
tcp_count = system_info.tcp_count,
tcp6_count = system_info.tcp6_count,
"performance metrics"
);
Ok(true)
}
})
});
("performanceMetricsLog".to_string(), task)
}
38 changes: 33 additions & 5 deletions src/state/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ pub struct Prometheus {
http_requests_total: Box<IntCounterVec>,
http_requests_current: Box<IntGaugeVec>,
http_received: Box<HistogramVec>,
http_received_bytes: Box<IntCounterVec>,
http_responses_codes: Box<IntCounterVec>,
http_response_time: Box<HistogramVec>,
http_sent: Box<HistogramVec>,
http_sent_bytes: Box<IntCounterVec>,
connection_reuses: Box<IntCounter>,
tls_handshake_time: Box<Histogram>,
upstream_connections: Box<IntGaugeVec>,
Expand Down Expand Up @@ -113,7 +115,8 @@ impl Prometheus {
if let Some(status) = &ctx.status {
code = status.as_u16();
}
let sent = session.body_bytes_sent() as f64 / 1024.0;
let sent_bytes = session.body_bytes_sent() as u64;
let sent = sent_bytes as f64 / 1024.0;

// http response code
let code_label = match code {
Expand All @@ -134,13 +137,22 @@ impl Prometheus {
self.http_received
.with_label_values(labels)
.observe(payload_size);
self.http_received_bytes
.with_label_values(labels)
.inc_by(ctx.payload_size as u64);

// response time x second
self.http_response_time
.with_label_values(labels)
.observe(response_time);

// response body size(kb)
self.http_sent.with_label_values(labels).observe(sent);
if sent_bytes > 0 {
self.http_sent_bytes
.with_label_values(labels)
.inc_by(sent_bytes);
}
}

self.http_responses_codes
Expand Down Expand Up @@ -264,9 +276,9 @@ async fn do_push(
count: u32,
offset: u32,
params: PrometheusPushParams,
) -> Result<(), String> {
) -> Result<bool, String> {
if count % offset != 0 {
return Ok(());
return Ok(false);
}
// http push metrics
let encoder = ProtobufEncoder::new();
Expand Down Expand Up @@ -305,7 +317,7 @@ async fn do_push(
);
},
};
Ok(())
Ok(true)
}

/// Create a new prometheus push service
Expand Down Expand Up @@ -341,7 +353,7 @@ pub fn new_prometheus_push_service(
password,
p,
};
let offset = (interval.as_secs() / 60) as u32;
let offset = ((interval.as_secs() / 60) as u32).max(1);

let task: SimpleServiceTaskFuture = Box::new(move |count: u32| {
Box::pin({
Expand Down Expand Up @@ -468,6 +480,12 @@ pub fn new_prometheus(server: &str) -> Result<Prometheus> {
&["location"],
&[1.0, 5.0, 10.0, 50.0, 100.0, 1000.0],
)?);
let http_received_bytes = Box::new(new_int_counter_vec(
server,
"pingap_http_received_bytes",
"pingap http received from clients(bytes)",
&["location"],
)?);
let http_responses_codes = Box::new(new_int_counter_vec(
server,
"pingap_http_responses_codes",
Expand All @@ -490,6 +508,12 @@ pub fn new_prometheus(server: &str) -> Result<Prometheus> {
&["location"],
&[1.0, 5.0, 10.0, 50.0, 100.0, 1000.0, 10000.0],
)?);
let http_sent_bytes = Box::new(new_int_counter_vec(
server,
"pingap_http_sent_bytes",
"pingap http sent to clients(bytes)",
&["location"],
)?);
let connection_reuses = Box::new(new_int_counter(
server,
"pingap_connection_reuses",
Expand Down Expand Up @@ -602,9 +626,11 @@ pub fn new_prometheus(server: &str) -> Result<Prometheus> {
http_requests_total.clone(),
http_requests_current.clone(),
http_received.clone(),
http_received_bytes.clone(),
http_responses_codes.clone(),
http_response_time.clone(),
http_sent.clone(),
http_sent_bytes.clone(),
connection_reuses.clone(),
tls_handshake_time.clone(),
upstream_connections.clone(),
Expand Down Expand Up @@ -637,9 +663,11 @@ pub fn new_prometheus(server: &str) -> Result<Prometheus> {
http_requests_total,
http_requests_current,
http_received,
http_received_bytes,
http_responses_codes,
http_response_time,
http_sent,
http_sent_bytes,
connection_reuses,
tls_handshake_time,
upstream_connections,
Expand Down

0 comments on commit 7dbe176

Please sign in to comment.