Skip to content

Commit

Permalink
refactor: merge prometheus push service to simple background service
Browse files Browse the repository at this point in the history
  • Loading branch information
vicanso committed Dec 22, 2024
1 parent 6100d36 commit 3c27551
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 67 deletions.
5 changes: 1 addition & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,10 +530,7 @@ fn run() -> Result<(), Box<dyn Error>> {
ps.enable_lets_encrypt();
}
if let Some(service) = ps.get_prometheus_push_service() {
my_server.add_service(background_service(
"prometheus push service",
service,
));
simple_tasks.push(service);
}
let services = ps.run(&my_server.configuration)?;
my_server.add_service(services.lb);
Expand Down
6 changes: 4 additions & 2 deletions src/proxy/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::http_extra::{HttpResponse, HTTP_HEADER_NAME_X_REQUEST_ID};
use crate::otel;
use crate::plugin::{get_plugin, ADMIN_SERVER_PLUGIN};
use crate::proxy::location::get_location;
use crate::service::CommonServiceTask;
use crate::service::SimpleServiceTaskFuture;
#[cfg(feature = "full")]
use crate::state::OtelTracer;
use crate::state::{accept_request, end_request};
Expand Down Expand Up @@ -233,7 +233,9 @@ impl Server {
self.lets_encrypt_enabled = true;
}
/// Get the prometheus push service
pub fn get_prometheus_push_service(&self) -> Option<CommonServiceTask> {
pub fn get_prometheus_push_service(
&self,
) -> Option<(String, SimpleServiceTaskFuture)> {
if !self.prometheus_push_mode {
return None;
}
Expand Down
133 changes: 72 additions & 61 deletions src/state/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@
// limitations under the License.

use super::{get_hostname, get_process_system_info, Error, Result, State};
use crate::service::{CommonServiceTask, ServiceTask};
use crate::service::SimpleServiceTaskFuture;
use crate::util;
use async_trait::async_trait;
use humantime::parse_duration;
use once_cell::sync::Lazy;
use pingora::proxy::Session;
Expand Down Expand Up @@ -213,12 +212,69 @@ impl Prometheus {
}
}

#[derive(Clone)]
struct PrometheusPushParams {
name: String,
url: String,
p: Arc<Prometheus>,
username: String,
password: Option<String>,
}

async fn do_push(
count: u32,
offset: u32,
params: PrometheusPushParams,
) -> Result<(), String> {
if count % offset != 0 {
return Ok(());
}
// http push metrics
let encoder = ProtobufEncoder::new();
let mut buf = Vec::new();

for mf in params.p.gather() {
let _ = encoder.encode(&[mf], &mut buf);
}
let client = reqwest::Client::new();
let mut builder = client
.post(&params.url)
.header(http::header::CONTENT_TYPE, encoder.format_type())
.body(buf);

if !params.username.is_empty() {
builder = builder.basic_auth(&params.username, params.password.clone());
}

match builder.timeout(Duration::from_secs(60)).send().await {
Ok(res) => {
if res.status().as_u16() >= 400 {
error!(
name = params.name,
status = res.status().to_string(),
"push prometheus fail"
);
} else {
info!(name = params.name, "push prometheus success");
}
},
Err(e) => {
error!(
name = params.name,
error = e.to_string(),
"push prometheus fail"
);
},
};
Ok(())
}

/// Create a new prometheus push service
pub fn new_prometheus_push_service(
name: &str,
url: &str,
p: Arc<Prometheus>,
) -> Result<CommonServiceTask> {
) -> Result<(String, SimpleServiceTaskFuture)> {
let mut info = Url::parse(url).map_err(|e| Error::Url { source: e })?;

let username = info.username().to_string();
Expand All @@ -239,70 +295,25 @@ pub fn new_prometheus_push_service(
url = url.replace(HOST_NAME_TAG, get_hostname());
}

let push = PrometheusPush {
let params = PrometheusPushParams {
name: name.to_string(),
url,
username,
password,
p,
};
Ok(CommonServiceTask::new(interval, push))
}

struct PrometheusPush {
name: String,
url: String,
p: Arc<Prometheus>,
username: String,
password: Option<String>,
}

#[async_trait]
impl ServiceTask for PrometheusPush {
async fn run(&self) -> Option<bool> {
// http push metrics
let encoder = ProtobufEncoder::new();
let mut buf = Vec::new();

for mf in self.p.gather() {
let _ = encoder.encode(&[mf], &mut buf);
}
let client = reqwest::Client::new();
let mut builder = client
.post(&self.url)
.header(http::header::CONTENT_TYPE, encoder.format_type())
.body(buf);

if !self.username.is_empty() {
builder = builder.basic_auth(&self.username, self.password.clone());
}

match builder.timeout(Duration::from_secs(60)).send().await {
Ok(res) => {
if res.status().as_u16() >= 400 {
error!(
name = self.name,
status = res.status().to_string(),
"push prometheus fail"
);
} else {
info!(name = self.name, "push prometheus success");
}
},
Err(e) => {
error!(
name = self.name,
error = e.to_string(),
"push prometheus fail"
);
},
}

None
}
fn description(&self) -> String {
"PrometheusPush".to_string()
}
let offset = (interval.as_secs() / 60) as u32;

let task: SimpleServiceTaskFuture = Box::new(move |count: u32| {
Box::pin({
let value = params.clone();
async move {
let value = value.clone();
do_push(count, offset, value).await
}
})
});
Ok(("prometheusPush".to_string(), task))
}

fn new_int_counter(server: &str, name: &str, help: &str) -> Result<IntCounter> {
Expand Down

0 comments on commit 3c27551

Please sign in to comment.