diff --git a/src/main.rs b/src/main.rs index 197e031..cf04c0c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -530,10 +530,7 @@ fn run() -> Result<(), Box> { ps.enable_lets_encrypt(); } if let Some(service) = ps.get_prometheus_push_service() { - my_server.add_service(background_service( - "prometheus push service", - service, - )); + simple_tasks.push(service); } let services = ps.run(&my_server.configuration)?; my_server.add_service(services.lb); diff --git a/src/proxy/server.rs b/src/proxy/server.rs index 69fb6da..0fa2126 100644 --- a/src/proxy/server.rs +++ b/src/proxy/server.rs @@ -24,7 +24,7 @@ use crate::http_extra::{HttpResponse, HTTP_HEADER_NAME_X_REQUEST_ID}; use crate::otel; use crate::plugin::{get_plugin, ADMIN_SERVER_PLUGIN}; use crate::proxy::location::get_location; -use crate::service::CommonServiceTask; +use crate::service::SimpleServiceTaskFuture; #[cfg(feature = "full")] use crate::state::OtelTracer; use crate::state::{accept_request, end_request}; @@ -233,7 +233,9 @@ impl Server { self.lets_encrypt_enabled = true; } /// Get the prometheus push service - pub fn get_prometheus_push_service(&self) -> Option { + pub fn get_prometheus_push_service( + &self, + ) -> Option<(String, SimpleServiceTaskFuture)> { if !self.prometheus_push_mode { return None; } diff --git a/src/state/prom.rs b/src/state/prom.rs index afc18cb..d4e981c 100644 --- a/src/state/prom.rs +++ b/src/state/prom.rs @@ -13,9 +13,8 @@ // limitations under the License. use super::{get_hostname, get_process_system_info, Error, Result, State}; -use crate::service::{CommonServiceTask, ServiceTask}; +use crate::service::SimpleServiceTaskFuture; use crate::util; -use async_trait::async_trait; use humantime::parse_duration; use once_cell::sync::Lazy; use pingora::proxy::Session; @@ -213,12 +212,69 @@ impl Prometheus { } } +#[derive(Clone)] +struct PrometheusPushParams { + name: String, + url: String, + p: Arc, + username: String, + password: Option, +} + +async fn do_push( + count: u32, + offset: u32, + params: PrometheusPushParams, +) -> Result<(), String> { + if count % offset != 0 { + return Ok(()); + } + // http push metrics + let encoder = ProtobufEncoder::new(); + let mut buf = Vec::new(); + + for mf in params.p.gather() { + let _ = encoder.encode(&[mf], &mut buf); + } + let client = reqwest::Client::new(); + let mut builder = client + .post(¶ms.url) + .header(http::header::CONTENT_TYPE, encoder.format_type()) + .body(buf); + + if !params.username.is_empty() { + builder = builder.basic_auth(¶ms.username, params.password.clone()); + } + + match builder.timeout(Duration::from_secs(60)).send().await { + Ok(res) => { + if res.status().as_u16() >= 400 { + error!( + name = params.name, + status = res.status().to_string(), + "push prometheus fail" + ); + } else { + info!(name = params.name, "push prometheus success"); + } + }, + Err(e) => { + error!( + name = params.name, + error = e.to_string(), + "push prometheus fail" + ); + }, + }; + Ok(()) +} + /// Create a new prometheus push service pub fn new_prometheus_push_service( name: &str, url: &str, p: Arc, -) -> Result { +) -> Result<(String, SimpleServiceTaskFuture)> { let mut info = Url::parse(url).map_err(|e| Error::Url { source: e })?; let username = info.username().to_string(); @@ -239,70 +295,25 @@ pub fn new_prometheus_push_service( url = url.replace(HOST_NAME_TAG, get_hostname()); } - let push = PrometheusPush { + let params = PrometheusPushParams { name: name.to_string(), url, username, password, p, }; - Ok(CommonServiceTask::new(interval, push)) -} - -struct PrometheusPush { - name: String, - url: String, - p: Arc, - username: String, - password: Option, -} - -#[async_trait] -impl ServiceTask for PrometheusPush { - async fn run(&self) -> Option { - // http push metrics - let encoder = ProtobufEncoder::new(); - let mut buf = Vec::new(); - - for mf in self.p.gather() { - let _ = encoder.encode(&[mf], &mut buf); - } - let client = reqwest::Client::new(); - let mut builder = client - .post(&self.url) - .header(http::header::CONTENT_TYPE, encoder.format_type()) - .body(buf); - - if !self.username.is_empty() { - builder = builder.basic_auth(&self.username, self.password.clone()); - } - - match builder.timeout(Duration::from_secs(60)).send().await { - Ok(res) => { - if res.status().as_u16() >= 400 { - error!( - name = self.name, - status = res.status().to_string(), - "push prometheus fail" - ); - } else { - info!(name = self.name, "push prometheus success"); - } - }, - Err(e) => { - error!( - name = self.name, - error = e.to_string(), - "push prometheus fail" - ); - }, - } - - None - } - fn description(&self) -> String { - "PrometheusPush".to_string() - } + let offset = (interval.as_secs() / 60) as u32; + + let task: SimpleServiceTaskFuture = Box::new(move |count: u32| { + Box::pin({ + let value = params.clone(); + async move { + let value = value.clone(); + do_push(count, offset, value).await + } + }) + }); + Ok(("prometheusPush".to_string(), task)) } fn new_int_counter(server: &str, name: &str, help: &str) -> Result {