From 82b5a03bd7b8d3e0924c88d2f709094414eda69b Mon Sep 17 00:00:00 2001 From: jpelkonen <38443921+jpelkonen@users.noreply.github.com> Date: Wed, 10 Apr 2024 06:37:01 -0500 Subject: [PATCH] assets: add support for configurable bufsize for FileHandler (#649) * assets: add support for configurable bufsize for FileHandler The current implementation is using the blocksize as buffer size. In many use cases, the size is too small and limiting throughput. This commit adds an new option, buffer_size in the FileOptions, which, when set, overrides the default value. Also included is a cargo criterion benchmark, which can be run with ``` cargo bench --bench file_handler ``` The benchmark shows major performance improvements with larger buffers when serving large files. Signed-off-by: Janne Pelkonen * Fix formatting Signed-off-by: Janne Pelkonen --------- Signed-off-by: Janne Pelkonen --- gotham/Cargo.toml | 7 ++ gotham/benches/file_handler.rs | 152 +++++++++++++++++++++++++++++++ gotham/src/handler/assets/mod.rs | 15 ++- 3 files changed, 172 insertions(+), 2 deletions(-) create mode 100644 gotham/benches/file_handler.rs diff --git a/gotham/Cargo.toml b/gotham/Cargo.toml index 4ecc8cd8..734040f0 100644 --- a/gotham/Cargo.toml +++ b/gotham/Cargo.toml @@ -55,8 +55,15 @@ tokio-rustls = { version = "0.23", optional = true } uuid = { version = "1.0", features = ["v4"] } [dev-dependencies] +criterion = { version = "0.5.1", features = ["cargo_bench_support", "plotters", "rayon", "async_futures", "async_tokio"] } futures-executor = "0.3.14" +reqwest = "0.12.2" +tempfile = "3.10.1" tokio = { version = "1.11.0", features = ["macros", "test-util"] } [package.metadata.docs.rs] all-features = true + +[[bench]] +name = "file_handler" +harness = false diff --git a/gotham/benches/file_handler.rs b/gotham/benches/file_handler.rs new file mode 100644 index 00000000..41aba248 --- /dev/null +++ b/gotham/benches/file_handler.rs @@ -0,0 +1,152 @@ +use std::{ + collections::HashMap, + fs::File, + io::{BufWriter, Write}, + net::{SocketAddr, ToSocketAddrs}, + sync::atomic::{AtomicU64, Ordering::Relaxed}, + time::{Duration, SystemTime}, +}; + +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +use futures_util::future; +use gotham::{ + bind_server, + handler::FileOptions, + router::{ + build_simple_router, + builder::{DefineSingleRoute, DrawRoutes}, + }, +}; +use tempfile::TempDir; +use tokio::{ + net::TcpListener, + runtime::{self, Runtime}, +}; + +struct BenchServer { + runtime: Runtime, + addr: SocketAddr, + #[allow(dead_code)] + tmp: TempDir, + // sizes of test files + sizes: Vec, + buf_paths: HashMap>, +} + +impl BenchServer { + fn new() -> anyhow::Result { + let tmp = TempDir::new()?; + // temporary datafiles + let sizes = [10, 17, 24] + .iter() + .filter_map(|sz| { + let size = 1 << sz; + mk_tmp(&tmp, size).ok() + }) + .collect(); + let buf_paths = HashMap::from([ + ("default".to_string(), None), + ("128k".to_string(), Some(1 << 17)), + ]); + + let router = build_simple_router(|route| { + for (path, sz) in &buf_paths { + let mut opts = FileOptions::from(tmp.path().to_owned()); + if let Some(size) = sz { + opts.with_buffer_size(*size); + } + route + .get(format!("/{path}/*").as_str()) + .to_dir(opts.to_owned()) + } + }); + let runtime = runtime::Builder::new_multi_thread() + .worker_threads(num_cpus::get()) + .thread_name("file_handler-bench") + .enable_all() + .build() + .unwrap(); + // build server manually so that we can capture the actual port instead of 0 + let addr: std::net::SocketAddr = "127.0.0.1:0".to_socket_addrs().unwrap().next().unwrap(); + let listener = runtime.block_on(TcpListener::bind(addr)).unwrap(); + // use any free port + let addr = listener.local_addr().unwrap(); + let _ = runtime.spawn(async move { + bind_server(listener, router, future::ok).await; + }); + std::thread::sleep(Duration::from_millis(100)); + Ok(Self { + runtime, + addr, + tmp, + sizes, + buf_paths, + }) + } +} + +fn mk_tmp(tmp: &TempDir, size: u64) -> anyhow::Result { + let filename = tmp.path().join(format!("{size}")); + let file = File::create(filename)?; + let mut w = BufWriter::with_capacity(2 << 16, file); + // pseudo random data: time stamp as bytes + let ts_data = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_nanos() + .to_le_bytes(); + for _ in (0..size).step_by(ts_data.len()) { + w.write_all(&ts_data)?; + } + Ok(size) +} + +pub fn filehandler_benchmark(c: &mut Criterion) { + let server = BenchServer::new().unwrap(); + + let runtime = server.runtime; + let client = reqwest::Client::builder().build().unwrap(); + let counter = AtomicU64::new(0); + let failed = AtomicU64::new(0); + + for file_size in server.sizes { + let mut group = c.benchmark_group("server_bench"); + group.throughput(Throughput::Bytes(file_size)); + for (path, buf_size) in &server.buf_paths { + let url = format!("http://{}/{path}/{file_size}", server.addr); + let req = client.get(url).build().unwrap(); + group.bench_with_input( + BenchmarkId::new( + "test_file_handler", + format!("filesize: {file_size}, bufsize: {buf_size:?}"), + ), + &req, + |b, req| { + b.to_async(&runtime).iter(|| async { + let r = client.execute(req.try_clone().unwrap()).await; + counter.fetch_add(1, Relaxed); + match r { + Err(_) => { + failed.fetch_add(1, Relaxed); + } + Ok(res) => { + // sanity check: did we get what was expected? + assert_eq!(res.content_length().unwrap(), file_size); + let _ = res.bytes().await.unwrap(); + } + } + }); + }, + ); + } + } + println!("Errors {}/{}", failed.load(Relaxed), counter.load(Relaxed)); +} + +criterion_group! { + name = file_handler; + config = Criterion::default().measurement_time(Duration::from_millis(10_000)).warm_up_time(Duration::from_millis(10)); + targets = filehandler_benchmark +} + +criterion_main!(file_handler); diff --git a/gotham/src/handler/assets/mod.rs b/gotham/src/handler/assets/mod.rs index 018f273a..e20293df 100644 --- a/gotham/src/handler/assets/mod.rs +++ b/gotham/src/handler/assets/mod.rs @@ -75,6 +75,7 @@ pub struct FileOptions { cache_control: String, gzip: bool, brotli: bool, + buffer_size: Option, } impl FileOptions { @@ -88,6 +89,7 @@ impl FileOptions { cache_control: "public".to_string(), gzip: false, brotli: false, + buffer_size: None, } } @@ -111,6 +113,13 @@ impl FileOptions { self } + /// Sets the maximum buffer size to be used when serving the file. + /// If unset, the default maximum buffer size corresponding to file system block size will be used. + pub fn with_buffer_size(&mut self, buf_sz: usize) -> &mut Self { + self.buffer_size = Some(buf_sz); + self + } + /// Clones `self` to return an owned value for passing to a handler. pub fn build(&mut self) -> Self { self.clone() @@ -215,7 +224,9 @@ fn create_file_response(options: FileOptions, state: State) -> Pin (len, range_start), Err(e) => { @@ -229,7 +240,7 @@ fn create_file_response(options: FileOptions, state: State) -> Pin