Skip to content

Commit

Permalink
refactor: make background service more simplified
Browse files Browse the repository at this point in the history
  • Loading branch information
vicanso committed Dec 18, 2024
1 parent a75af2a commit be8652d
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 152 deletions.
2 changes: 1 addition & 1 deletion src/acme/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ mod lets_encrypt;
mod validity_checker;

pub use lets_encrypt::{handle_lets_encrypt, new_lets_encrypt_service};
pub use validity_checker::new_tls_validity_service;
pub use validity_checker::new_certificate_validity_service;

#[cfg(test)]
mod tests {
Expand Down
70 changes: 22 additions & 48 deletions src/acme/validity_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,11 @@

use super::{Certificate, LOG_CATEGORY};
use crate::proxy::get_certificate_info_list;
use crate::service::{CommonServiceTask, ServiceTask};
use crate::service::SimpleServiceTaskFuture;
use crate::util;
use crate::webhook;
use async_trait::async_trait;
use std::time::Duration;
use tracing::warn;

struct ValidityChecker {
time_offset: i64,
}

// Verify the validity period of tls certificate,
// include not after and not before.
fn validity_check(
Expand Down Expand Up @@ -57,51 +51,31 @@ fn validity_check(
Ok(())
}

#[async_trait]
impl ServiceTask for ValidityChecker {
async fn run(&self) -> Option<bool> {
let certificate_info_list = get_certificate_info_list();
if let Err(message) =
validity_check(&certificate_info_list, self.time_offset)
{
// certificate will be expired
warn!(category = LOG_CATEGORY, message);
webhook::send(webhook::SendNotificationParams {
level: webhook::NotificationLevel::Warn,
category: webhook::NotificationCategory::TlsValidity,
msg: message,
..Default::default()
});
}
None
async fn do_validity_check(count: u32) -> Result<(), String> {
// Add 1 every loop
let offset = 24 * 60;

This comment has been minimized.

Copy link
@atinm

atinm Dec 18, 2024

@vicanso I think you will want to run the check every 12 hours but check if the certificate will expire in 24 hours so that you don't have expired certificates that expire before the next time your run the check.

This comment has been minimized.

Copy link
@vicanso

vicanso Dec 19, 2024

Author Owner

This is just for regular certificate verification, which is checked once a day. If the certificate will be expired 7 days later, a webhook notification is called.
Not for self signed certificate.

if count % offset != 0 {
return Ok(());
}
fn description(&self) -> String {
let mut names = vec![];
for (name, _) in get_certificate_info_list().iter() {
if !names.contains(name) {
names.push(name.clone());
}
}

let offset_human: humantime::Duration =
Duration::from_secs(self.time_offset as u64).into();
format!("ValidityChecker: {names:?}, {offset_human}")
let certificate_info_list = get_certificate_info_list();
let time_offset = 7 * 24 * 3600_i64;
if let Err(message) = validity_check(&certificate_info_list, time_offset) {
// certificate will be expired
warn!(category = LOG_CATEGORY, message);
webhook::send(webhook::SendNotificationParams {
level: webhook::NotificationLevel::Warn,
category: webhook::NotificationCategory::TlsValidity,
msg: message,
..Default::default()
});
}
Ok(())
}

/// Create a tls certificate validity checker service,
/// if the certificate will be expired or not valid,
/// it will send webhook notificateion message.
pub fn new_tls_validity_service() -> CommonServiceTask {
let checker = ValidityChecker {
// cert will be expired 7 days later
time_offset: 7 * 24 * 3600_i64,
};
CommonServiceTask::new(
// check interval: one day
Duration::from_secs(24 * 60 * 60),
checker,
)
pub fn new_certificate_validity_service() -> (String, SimpleServiceTaskFuture) {
let task: SimpleServiceTaskFuture =
Box::new(|count: u32| Box::pin(do_validity_check(count)));
("validityChecker".to_string(), task)
}

#[cfg(test)]
Expand Down
88 changes: 35 additions & 53 deletions src/cache/http_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,10 @@
// limitations under the License.

use super::{file, Error, Result, PAGE_SIZE};
use crate::service::CommonServiceTask;
use crate::service::ServiceTask;
use crate::config::get_current_config;
use crate::service::SimpleServiceTaskFuture;
use async_trait::async_trait;
use bytes::Buf;
use bytes::BufMut;
use bytes::Bytes;
use bytes::BytesMut;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use pingora::cache::key::CacheHashKey;
use pingora::cache::key::CompactCacheKey;
use pingora::cache::storage::{HandleHit, HandleMiss};
Expand Down Expand Up @@ -115,45 +112,41 @@ pub trait HttpCacheStorage: Sync + Send {
}
}

struct CacheStorageClearTask {
storage: Box<dyn HttpCacheStorage>,
}

pub fn new_file_storage_clear_service(dir: &str) -> Option<CommonServiceTask> {
let Ok(c) = file::new_file_cache(dir) else {
return None;
async fn do_file_storage_clear(count: u32) -> Result<(), String> {
// Add 1 every loop
let offset = 60;
if count % offset != 0 {
return Ok(());
}
let Some(dir) = &get_current_config().basic.cache_directory else {
return Ok(());
};
let Ok(storage) = file::new_file_cache(dir) else {
return Ok(());
};
Some(CommonServiceTask::new(
Duration::from_secs(3600),
CacheStorageClearTask {
storage: Box::new(c),
},
))
}

#[async_trait]
impl ServiceTask for CacheStorageClearTask {
async fn run(&self) -> Option<bool> {
let Some(access_before) =
SystemTime::now().checked_sub(Duration::from_secs(24 * 3600))
else {
return Some(false);
};

let Ok((success, fail)) = self.storage.clear(access_before).await
else {
return Some(false);
};
if success < 0 {
return Some(true);
}
info!(success, fail, "cache storage clear");
let Some(access_before) =
SystemTime::now().checked_sub(Duration::from_secs(24 * 3600))
else {
return Ok(());
};

Some(false)
}
fn description(&self) -> String {
"CacheStorageClear".to_string()
let Ok((success, fail)) = storage.clear(access_before).await else {
return Ok(());
};
if success < 0 {
return Ok(());
}
info!(success, fail, "cache storage clear");
Ok(())
}

pub fn new_file_storage_clear_service(
) -> Option<(String, SimpleServiceTaskFuture)> {
let _ = get_current_config().basic.cache_directory.as_ref()?;
let task: SimpleServiceTaskFuture =
Box::new(|count: u32| Box::pin(do_file_storage_clear(count)));
Some(("cacheStorageClear".to_string(), task))
}

pub struct HttpCache {
Expand Down Expand Up @@ -388,16 +381,12 @@ impl Storage for HttpCache {

#[cfg(test)]
mod tests {
use super::{
new_file_storage_clear_service, CompleteHit, HttpCacheStorage,
ObjectMissHandler,
};
use super::{CompleteHit, HttpCacheStorage, ObjectMissHandler};
use crate::cache::tiny::new_tiny_ufo_cache;
use bytes::{Bytes, BytesMut};
use pingora::cache::storage::{HitHandler, MissHandler};
use pretty_assertions::assert_eq;
use std::sync::Arc;
use tempfile::TempDir;

#[tokio::test]
async fn test_complete_hit() {
Expand Down Expand Up @@ -442,11 +431,4 @@ mod tests {
let data = cache.get(key).await.unwrap().unwrap();
assert_eq!("Hello World!", std::str::from_utf8(&data.body).unwrap());
}

#[tokio::test]
async fn test_file_storage_clear_service() {
let dir = TempDir::new().unwrap();
let dir = dir.into_path().to_string_lossy().to_string();
let _ = new_file_storage_clear_service(&dir).unwrap();
}
}
6 changes: 3 additions & 3 deletions src/config/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1529,19 +1529,19 @@ basic

let result = conf.remove("plugin", "stats");
assert_eq!(
"Invalid error proxy plugin(stats) is in used",
"Invalid error proxy plugin(stats) is in used by location(lo)",
result.err().unwrap().to_string()
);

let result = conf.remove("upstream", "charts");
assert_eq!(
"Invalid error upstream(charts) is in used",
"Invalid error upstream(charts) is in used by location(lo)",
result.err().unwrap().to_string()
);

let result = conf.remove("location", "lo");
assert_eq!(
"Invalid error location(lo) is in used",
"Invalid error location(lo) is in used by server(test)",
result.err().unwrap().to_string()
);

Expand Down
29 changes: 16 additions & 13 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::acme::{new_lets_encrypt_service, new_tls_validity_service};
use crate::acme::{new_certificate_validity_service, new_lets_encrypt_service};
use crate::cache::new_file_storage_clear_service;
use crate::config::ETCD_PROTOCOL;
use crate::service::{new_auto_restart_service, new_observer_service};
Expand All @@ -28,6 +28,7 @@ use proxy::{
new_self_signed_cert_validity_service, new_upstream_health_check_task,
Server, ServerConf,
};
use service::new_simple_service_task;
use state::{get_admin_addr, get_start_time, set_admin_addr};
use std::collections::HashMap;
use std::error::Error;
Expand Down Expand Up @@ -440,12 +441,6 @@ fn run() -> Result<(), Box<dyn Error>> {
));
}

if let Some(dir) = &conf.basic.cache_directory {
if let Some(task) = new_file_storage_clear_service(dir) {
my_server.add_service(background_service("StorageClear", task));
}
}

if let Err(e) = plugin::try_init_plugins(&conf.plugins) {
error!(error = e.to_string(), "init plugins fail",);
}
Expand Down Expand Up @@ -553,15 +548,23 @@ fn run() -> Result<(), Box<dyn Error>> {
));
}
}
let mut simple_tasks = vec![
new_certificate_validity_service(),
new_self_signed_cert_validity_service(),
];
if let Some(task) = new_file_storage_clear_service() {
simple_tasks.push(task);
}

my_server.add_service(background_service(
"TlsValidity",
new_tls_validity_service(),
));
my_server.add_service(background_service(
"SelfSignedStale",
new_self_signed_cert_validity_service(),
"SimpleTask",
new_simple_service_task(
"simpleTask",
Duration::from_secs(60),
simple_tasks,
),
));

my_server.add_service(background_service(
"UpstreamHc",
new_upstream_health_check_task(Duration::from_secs(10)),
Expand Down
60 changes: 28 additions & 32 deletions src/proxy/dynamic_certificate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use crate::acme::Certificate;
use crate::config::CertificateConf;
use crate::service::{CommonServiceTask, ServiceTask};
use crate::service::SimpleServiceTaskFuture;
use crate::{util, webhook};
use ahash::AHashMap;
use arc_swap::ArcSwap;
Expand All @@ -29,7 +29,6 @@ use snafu::Snafu;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
use substring::Substring;
use tracing::{debug, error, info};

Expand All @@ -56,40 +55,37 @@ type SelfSignedCertKey = AHashMap<String, Arc<SelfSignedCert>>;
static SELF_SIGNED_CERT_KEY_MAP: Lazy<ArcSwap<SelfSignedCertKey>> =
Lazy::new(|| ArcSwap::from_pointee(AHashMap::new()));

struct SelfSiginedStaleCertChecker {}

#[async_trait]
impl ServiceTask for SelfSiginedStaleCertChecker {
async fn run(&self) -> Option<bool> {
let mut m = AHashMap::new();
for (k, v) in SELF_SIGNED_CERT_KEY_MAP.load().iter() {
let count = v.count.load(Ordering::Relaxed);
let stale = v.stale.load(Ordering::Relaxed);
if stale && count == 0 {
continue;
}
if count == 0 {
v.stale.store(true, Ordering::Relaxed);
} else {
v.stale.store(false, Ordering::Relaxed);
v.count.store(0, Ordering::Relaxed);
}
m.insert(k.to_string(), v.clone());
}
SELF_SIGNED_CERT_KEY_MAP.store(Arc::new(m));
Some(false)
async fn do_self_signed_cert_validity(count: u32) -> Result<(), String> {
// Add 1 every loop
let offset = 24 * 60;
if count % offset != 0 {
return Ok(());
}
fn description(&self) -> String {
"Self signed certificate stale checker".to_string()
let mut m = AHashMap::new();
for (k, v) in SELF_SIGNED_CERT_KEY_MAP.load().iter() {
let count = v.count.load(Ordering::Relaxed);
let stale = v.stale.load(Ordering::Relaxed);
if stale && count == 0 {
continue;
}
if count == 0 {
v.stale.store(true, Ordering::Relaxed);
} else {
v.stale.store(false, Ordering::Relaxed);
v.count.store(0, Ordering::Relaxed);
}
m.insert(k.to_string(), v.clone());
}
SELF_SIGNED_CERT_KEY_MAP.store(Arc::new(m));
Ok(())
}

pub fn new_self_signed_cert_validity_service() -> CommonServiceTask {
CommonServiceTask::new(
// check interval: one day
Duration::from_secs(24 * 60 * 60),
SelfSiginedStaleCertChecker {},
)
pub fn new_self_signed_cert_validity_service(
) -> (String, SimpleServiceTaskFuture) {
let task: SimpleServiceTaskFuture =
Box::new(|count: u32| Box::pin(do_self_signed_cert_validity(count)));

("selfSignedCertificateStale".to_string(), task)
}

// https://letsencrypt.org/certificates/
Expand Down
Loading

0 comments on commit be8652d

Please sign in to comment.