Skip to content

Commit

Permalink
refactor: adjust location of server
Browse files Browse the repository at this point in the history
  • Loading branch information
vicanso committed Dec 6, 2024
1 parent 073c6c2 commit 43fb471
Show file tree
Hide file tree
Showing 13 changed files with 148 additions and 36 deletions.
29 changes: 28 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ clap = { version = "4.5.21", features = [
cookie = "0.18.1"
crc32fast = "1.4.2"
crossbeam-channel = "0.5.13"
derivative = "2.2.0"
derive_more = { version = "1.0.0", default-features = false, features = [
"debug",
] }
dhat = { version = "0.3.3", optional = true }
diff = "0.1.13"
dirs = "5.0.1"
Expand Down
1 change: 1 addition & 0 deletions src/config/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ pub struct LocationConf {
pub weight: Option<u16>,
pub plugins: Option<Vec<String>>,
pub client_max_body_size: Option<ByteSize>,
pub max_processing: Option<i32>,
pub includes: Option<Vec<String>>,
pub grpc_web: Option<bool>,
pub remark: Option<String>,
Expand Down
21 changes: 11 additions & 10 deletions src/logger/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use walkdir::WalkDir;
static GZIP_EXT: &str = "gz";
static ZSTD_EXT: &str = "zst";

fn zstd_compress(file: &Path, level: u8) -> Result<u64, Box<dyn Error>> {
fn zstd_compress(file: &Path, level: u8) -> Result<(u64, u64), Box<dyn Error>> {
let level = if level == 0 { 9 } else { level as i32 };
let zst_file = file.with_extension(ZSTD_EXT);
let mut original_file = fs::File::open(file)?;
Expand All @@ -44,13 +44,14 @@ fn zstd_compress(file: &Path, level: u8) -> Result<u64, Box<dyn Error>> {
.create_new(true)
.open(&zst_file)?;

let mut encoder = zstd::stream::Encoder::new(file, level)?;
let size = io::copy(&mut original_file, &mut encoder)?;
let mut encoder = zstd::stream::Encoder::new(&file, level)?;
let original_size = io::copy(&mut original_file, &mut encoder)?;
encoder.finish()?;
Ok(size)
let size = file.metadata().map(|item| item.size()).unwrap_or_default();
Ok((size, original_size))
}

fn gzip_compress(file: &Path, level: u8) -> Result<u64, Box<dyn Error>> {
fn gzip_compress(file: &Path, level: u8) -> Result<(u64, u64), Box<dyn Error>> {
let gzip_file = file.with_extension(GZIP_EXT);
let mut original_file = fs::File::open(file)?;
let file = fs::OpenOptions::new()
Expand All @@ -63,10 +64,11 @@ fn gzip_compress(file: &Path, level: u8) -> Result<u64, Box<dyn Error>> {
} else {
Compression::new(level as u32)
};
let mut encoder = GzEncoder::new(file, level);
let size = io::copy(&mut original_file, &mut encoder)?;
let mut encoder = GzEncoder::new(&file, level);
let original_size = io::copy(&mut original_file, &mut encoder)?;
encoder.finish()?;
Ok(size)
let size = file.metadata().map(|item| item.size()).unwrap_or_default();
Ok((size, original_size))
}

pub struct LogCompressionTask {
Expand Down Expand Up @@ -113,7 +115,6 @@ impl ServiceTask for LogCompressionTask {
if accessed > access_before {
continue;
}
let original_size = metadata.size();
let start = SystemTime::now();
let result = if self.compression == "gzip" {
gzip_compress(entry.path(), self.level)
Expand All @@ -125,7 +126,7 @@ impl ServiceTask for LogCompressionTask {
Err(e) => {
error!(err = e.to_string(), file, "compress log fail");
},
Ok(size) => {
Ok((size, original_size)) => {
let elapsed = format!("{}ms", util::elapsed_ms(start));
info!(
file,
Expand Down
66 changes: 55 additions & 11 deletions src/proxy/location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::config::{LocationConf, PluginStep};
use crate::http_extra::{convert_header_value, convert_headers, HttpHeader};
use crate::plugin::get_plugin;
use crate::state::State;
use crate::util;
use crate::util::{self, get_content_length};
use ahash::AHashMap;
use arc_swap::ArcSwap;
use once_cell::sync::Lazy;
Expand All @@ -25,7 +25,7 @@ use pingora::proxy::Session;
use regex::Regex;
use snafu::{ResultExt, Snafu};
use std::collections::HashMap;
use std::sync::atomic::{AtomicI32, AtomicU64};
use std::sync::atomic::{AtomicI32, AtomicU64, Ordering};
use std::sync::Arc;
use substring::Substring;
use tracing::{debug, error};
Expand All @@ -36,6 +36,10 @@ pub enum Error {
Invalid { message: String },
#[snafu(display("Regex value: {value}, {source}"))]
Regex { value: String, source: regex::Error },
#[snafu(display("Too Many Requests, max:{max}"))]
TooManyRequest { max: i32 },
#[snafu(display("Request Entity Too Large, max:{max}"))]
BodyTooLarge { max: usize },
}
type Result<T, E = Error> = std::result::Result<T, E>;

Expand Down Expand Up @@ -138,17 +142,18 @@ fn new_host_selector(host: &str) -> Result<HostSelector> {
pub struct Location {
pub name: String,
pub key: String,
pub upstream: String,
path: String,
path_selector: PathSelector,
hosts: Vec<HostSelector>,
reg_rewrite: Option<(Regex, String)>,
proxy_add_headers: Option<Vec<HttpHeader>>,
proxy_set_headers: Option<Vec<HttpHeader>>,
plugins: Option<Vec<String>>,
pub accepted: AtomicU64,
pub processing: AtomicI32,
pub upstream: String,
pub grpc_web: bool,
accepted: AtomicU64,
processing: AtomicI32,
max_processing: i32,
grpc_web: bool,
client_max_body_size: usize,
}

Expand Down Expand Up @@ -206,6 +211,7 @@ impl Location {
plugins: conf.plugins.clone(),
accepted: AtomicU64::new(0),
processing: AtomicI32::new(0),
max_processing: conf.max_processing.unwrap_or_default(),
grpc_web: conf.grpc_web.unwrap_or_default(),
proxy_add_headers: format_headers(&conf.proxy_add_headers)?,
proxy_set_headers: format_headers(&conf.proxy_set_headers)?,
Expand All @@ -218,6 +224,45 @@ impl Location {

Ok(location)
}
#[inline]
pub fn enable_grpc(&self) -> bool {
self.grpc_web
}
#[inline]
pub fn validate_content_length(
&self,
header: &RequestHeader,
) -> Result<()> {
if self.client_max_body_size == 0 {
return Ok(());
}
if get_content_length(header).unwrap_or_default()
> self.client_max_body_size
{
return Err(Error::BodyTooLarge {
max: self.client_max_body_size,
});
}

Ok(())
}
/// Add processing and accepted count of location.
#[inline]
pub fn add_processing(&self) -> Result<(u64, i32)> {
let accepted = self.accepted.fetch_add(1, Ordering::Relaxed) + 1;
let processing = self.processing.fetch_add(1, Ordering::Relaxed) + 1;
if self.max_processing != 0 && processing > self.max_processing {
return Err(Error::TooManyRequest {
max: self.max_processing,
});
}
Ok((accepted, processing))
}
/// Sub processing count of location.
#[inline]
pub fn sub_processing(&self) {
self.processing.fetch_sub(1, Ordering::Relaxed);
}
/// Return `true` if the host and path match location.
#[inline]
pub fn matched(
Expand Down Expand Up @@ -262,15 +307,14 @@ impl Location {
/// If the size in a request exceeds the configured value, the 413 (Request Entity Too Large) error
/// is returned to the client.
#[inline]
pub fn client_body_size_limit(&self, ctx: &State) -> pingora::Result<()> {
pub fn client_body_size_limit(&self, ctx: &State) -> Result<()> {
if self.client_max_body_size == 0 {
return Ok(());
}
if ctx.payload_size > self.client_max_body_size {
return Err(util::new_internal_error(
413,
"Request Entity Too Large".to_string(),
));
return Err(Error::BodyTooLarge {
max: self.client_max_body_size,
});
}
Ok(())
}
Expand Down
25 changes: 18 additions & 7 deletions src/proxy/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,11 @@ impl ProxyHttp for Server {
}
}
if let Some(location) = &ctx.location {
if location.grpc_web {
location
.validate_content_length(header)
.map_err(|e| util::new_internal_error(413, e.to_string()))?;

if location.enable_grpc() {
// Initialize gRPC module for this request
let grpc = session
.downstream_modules_ctx
Expand All @@ -552,10 +556,15 @@ impl ProxyHttp for Server {
grpc.init();
}

ctx.location_accepted =
location.accepted.fetch_add(1, Ordering::Relaxed) + 1;
ctx.location_processing =
location.processing.fetch_add(1, Ordering::Relaxed) + 1;
match location.add_processing() {
Ok((accepted, processing)) => {
ctx.location_accepted = accepted;
ctx.location_processing = processing;
},
Err(e) => {
return Err(util::new_internal_error(429, e.to_string()));
},
};
let _ = location
.clone()
.handle_request_plugin(PluginStep::EarlyRequest, session, ctx)
Expand Down Expand Up @@ -762,7 +771,9 @@ impl ProxyHttp for Server {
if let Some(buf) = body {
ctx.payload_size += buf.len();
if let Some(location) = &ctx.location {
location.client_body_size_limit(ctx)?;
location.client_body_size_limit(ctx).map_err(|e| {
util::new_internal_error(413, e.to_string())
})?;
}
}
Ok(())
Expand Down Expand Up @@ -1088,7 +1099,7 @@ impl ProxyHttp for Server {
end_request();
self.processing.fetch_sub(1, Ordering::Relaxed);
if let Some(location) = &ctx.location {
location.processing.fetch_sub(1, Ordering::Relaxed);
location.sub_processing();
if let Some(up) = get_upstream(&location.upstream) {
ctx.upstream_processing = Some(up.completed());
}
Expand Down
9 changes: 4 additions & 5 deletions src/proxy/upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::util;
use ahash::AHashMap;
use arc_swap::ArcSwap;
use async_trait::async_trait;
use derivative::Derivative;
use derive_more::Debug;
use futures_util::FutureExt;
use once_cell::sync::Lazy;
use pingora::lb::selection::{Consistent, RoundRobin};
Expand Down Expand Up @@ -78,16 +78,15 @@ impl Tracing for UpstreamPeerTracer {
}
}

#[derive(Derivative)]
#[derivative(Debug)]
#[derive(Debug)]
pub struct Upstream {
pub name: String,
pub key: String,
hash: String,
hash_key: String,
tls: bool,
sni: String,
#[derivative(Debug = "ignore")]
#[debug("lb")]
lb: SelectionLb,
connection_timeout: Option<Duration>,
total_connection_timeout: Option<Duration>,
Expand Down Expand Up @@ -291,7 +290,7 @@ impl Upstream {
tracer,
processing: AtomicI32::new(0),
};
debug!("new upstream: {up:?}");
debug!(name = up.name, "new upstream: {up:?}");
Ok(up)
}

Expand Down
14 changes: 14 additions & 0 deletions src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,20 @@ pub fn get_cache_key(prefix: &str, method: &str, uri: &Uri) -> CacheKey {
CacheKey::new(namespace, uri.to_string(), "")
}

/// Get the content length from http request header.
pub fn get_content_length(header: &RequestHeader) -> Option<usize> {
if let Some(content_length) =
header.headers.get(http::header::CONTENT_LENGTH)
{
if let Ok(size) =
content_length.to_str().unwrap_or_default().parse::<usize>()
{
return Some(size);
}
}
None
}

#[inline]
pub fn get_latency(value: &Option<u64>) -> Option<u64> {
let current = now().as_millis() as u64;
Expand Down
2 changes: 2 additions & 0 deletions web/src/i18n/en.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ export default {
weightPlaceholder: "Input the weight of location",
clientMaxBodySize: "Client Max Body Size",
clientMaxBodySizePlaceholder: "Input the max body size(e.g. 1mb)",
maxProcessing: "Max Processing Requests",
maxProcessingPlaceholder: "Input the max processing request count",
plugins: "Plugins",
pluginsPlaceholder: "Select the plugins for location",
grpcWeb: "Grpc Web",
Expand Down
2 changes: 2 additions & 0 deletions web/src/i18n/zh.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ export default {
weightPlaceholder: "输入location的权重",
clientMaxBodySize: "请求实体限制大小",
clientMaxBodySizePlaceholder: "输入请求实体限制大小(如1mb)",
maxProcessing: "最大正在处理请求数",
maxProcessingPlaceholder: "输入限制的最大正在处理请求数",
plugins: "插件列表",
pluginsPlaceholder: "选择location使用的相关插件",
grpcWeb: "Grpc Web",
Expand Down
8 changes: 8 additions & 0 deletions web/src/pages/Locations.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,14 @@ export default function Locations() {
span: 3,
category: ExFormItemCategory.TEXT,
},
{
name: "max_processing",
label: locationI18n("maxProcessing"),
placeholder: locationI18n("maxProcessingPlaceholder"),
defaultValue: locationConfig.max_processing,
span: 3,
category: ExFormItemCategory.NUMBER,
},
{
name: "plugins",
label: locationI18n("plugins"),
Expand Down
Loading

0 comments on commit 43fb471

Please sign in to comment.