diff --git a/Cargo.lock b/Cargo.lock index eb13ab4b76a9..1774de8755d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11991,6 +11991,7 @@ version = "1.0.0" dependencies = [ "always-assert", "assert_matches", + "cfg-if", "futures", "futures-timer", "hex-literal", @@ -12047,6 +12048,7 @@ name = "polkadot-node-core-pvf-common" version = "1.0.0" dependencies = [ "assert_matches", + "cfg-if", "cpu-time", "futures", "landlock", @@ -12088,6 +12090,7 @@ dependencies = [ name = "polkadot-node-core-pvf-prepare-worker" version = "1.0.0" dependencies = [ + "cfg-if", "futures", "libc", "parity-scale-codec", diff --git a/polkadot/node/core/pvf/Cargo.toml b/polkadot/node/core/pvf/Cargo.toml index 478d1952d9d9..27f4df117e57 100644 --- a/polkadot/node/core/pvf/Cargo.toml +++ b/polkadot/node/core/pvf/Cargo.toml @@ -8,6 +8,7 @@ license.workspace = true [dependencies] always-assert = "0.1" +cfg-if = "1.0" futures = "0.3.21" futures-timer = "3.0.2" gum = { package = "tracing-gum", path = "../../gum" } diff --git a/polkadot/node/core/pvf/common/Cargo.toml b/polkadot/node/core/pvf/common/Cargo.toml index 621f7e24f72b..0f7308396d80 100644 --- a/polkadot/node/core/pvf/common/Cargo.toml +++ b/polkadot/node/core/pvf/common/Cargo.toml @@ -7,6 +7,7 @@ edition.workspace = true license.workspace = true [dependencies] +cfg-if = "1.0" cpu-time = "1.0.0" futures = "0.3.21" gum = { package = "tracing-gum", path = "../../../gum" } diff --git a/polkadot/node/core/pvf/common/src/error.rs b/polkadot/node/core/pvf/common/src/error.rs index 6eb0d9b7df42..6fdd06057c8b 100644 --- a/polkadot/node/core/pvf/common/src/error.rs +++ b/polkadot/node/core/pvf/common/src/error.rs @@ -44,7 +44,17 @@ pub enum PrepareError { /// The response from the worker is received, but the file cannot be renamed (moved) to the /// final destination location. This state is reported by the validation host (not by the /// worker). - RenameTmpFileErr(String), + RenameTmpFileErr { + err: String, + // Unfortunately `PathBuf` doesn't implement `Encode`/`Decode`, so we do a fallible + // conversion to `Option`. + src: Option, + dest: Option, + }, + /// The response from the worker is received, but the worker cache could not be cleared. The + /// worker has to be killed to avoid jobs having access to data from other jobs. This state is + /// reported by the validation host (not by the worker). + ClearWorkerDir(String), } impl PrepareError { @@ -58,7 +68,11 @@ impl PrepareError { use PrepareError::*; match self { Prevalidation(_) | Preparation(_) | Panic(_) => true, - TimedOut | IoErr(_) | CreateTmpFileErr(_) | RenameTmpFileErr(_) => false, + TimedOut | + IoErr(_) | + CreateTmpFileErr(_) | + RenameTmpFileErr { .. } | + ClearWorkerDir(_) => false, // Can occur due to issues with the PVF, but also due to local errors. RuntimeConstruction(_) => false, } @@ -76,7 +90,9 @@ impl fmt::Display for PrepareError { TimedOut => write!(f, "prepare: timeout"), IoErr(err) => write!(f, "prepare: io error while receiving response: {}", err), CreateTmpFileErr(err) => write!(f, "prepare: error creating tmp file: {}", err), - RenameTmpFileErr(err) => write!(f, "prepare: error renaming tmp file: {}", err), + RenameTmpFileErr { err, src, dest } => + write!(f, "prepare: error renaming tmp file ({:?} -> {:?}): {}", src, dest, err), + ClearWorkerDir(err) => write!(f, "prepare: error clearing worker cache: {}", err), } } } @@ -89,8 +105,17 @@ impl fmt::Display for PrepareError { pub enum InternalValidationError { /// Some communication error occurred with the host. HostCommunication(String), + /// Host could not create a hard link to the artifact path. + CouldNotCreateLink(String), /// Could not find or open compiled artifact file. CouldNotOpenFile(String), + /// Host could not clear the worker cache after a job. + CouldNotClearWorkerDir { + err: String, + // Unfortunately `PathBuf` doesn't implement `Encode`/`Decode`, so we do a fallible + // conversion to `Option`. + path: Option, + }, /// An error occurred in the CPU time monitor thread. Should be totally unrelated to /// validation. CpuTimeMonitorThread(String), @@ -104,8 +129,18 @@ impl fmt::Display for InternalValidationError { match self { HostCommunication(err) => write!(f, "validation: some communication error occurred with the host: {}", err), + CouldNotCreateLink(err) => write!( + f, + "validation: host could not create a hard link to the artifact path: {}", + err + ), CouldNotOpenFile(err) => write!(f, "validation: could not find or open compiled artifact file: {}", err), + CouldNotClearWorkerDir { err, path } => write!( + f, + "validation: host could not clear the worker cache ({:?}) after a job: {}", + path, err + ), CpuTimeMonitorThread(err) => write!(f, "validation: an error occurred in the CPU time monitor thread: {}", err), NonDeterministicPrepareError(err) => write!(f, "validation: prepare: {}", err), diff --git a/polkadot/node/core/pvf/common/src/execute.rs b/polkadot/node/core/pvf/common/src/execute.rs index 399b847791a9..b89ab089af1c 100644 --- a/polkadot/node/core/pvf/common/src/execute.rs +++ b/polkadot/node/core/pvf/common/src/execute.rs @@ -29,7 +29,7 @@ pub struct Handshake { } /// The response from an execution job on the worker. -#[derive(Encode, Decode)] +#[derive(Debug, Encode, Decode)] pub enum Response { /// The job completed successfully. Ok { diff --git a/polkadot/node/core/pvf/common/src/lib.rs b/polkadot/node/core/pvf/common/src/lib.rs index c358ad6e134d..53c287ea9709 100644 --- a/polkadot/node/core/pvf/common/src/lib.rs +++ b/polkadot/node/core/pvf/common/src/lib.rs @@ -22,6 +22,7 @@ pub mod executor_intf; pub mod prepare; pub mod pvf; pub mod worker; +pub mod worker_dir; pub use cpu_time::ProcessTime; @@ -30,8 +31,11 @@ pub use sp_tracing; const LOG_TARGET: &str = "parachain::pvf-common"; -use std::mem; -use tokio::io::{self, AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _}; +use std::{ + io::{Read, Write}, + mem, +}; +use tokio::io; #[cfg(feature = "test-utils")] pub mod tests { @@ -41,20 +45,31 @@ pub mod tests { pub const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(30); } -/// Write some data prefixed by its length into `w`. -pub async fn framed_send(w: &mut (impl AsyncWrite + Unpin), buf: &[u8]) -> io::Result<()> { +/// Status of security features on the current system. +#[derive(Debug, Clone, Default)] +pub struct SecurityStatus { + /// Whether the landlock features we use are fully available on this system. + pub can_enable_landlock: bool, + // Whether we are able to unshare the user namespace and change the filesystem root. + pub can_unshare_user_namespace_and_change_root: bool, +} + +/// Write some data prefixed by its length into `w`. Sync version of `framed_send` to avoid +/// dependency on tokio. +pub fn framed_send_blocking(w: &mut (impl Write + Unpin), buf: &[u8]) -> io::Result<()> { let len_buf = buf.len().to_le_bytes(); - w.write_all(&len_buf).await?; - w.write_all(buf).await?; + w.write_all(&len_buf)?; + w.write_all(buf)?; Ok(()) } -/// Read some data prefixed by its length from `r`. -pub async fn framed_recv(r: &mut (impl AsyncRead + Unpin)) -> io::Result> { +/// Read some data prefixed by its length from `r`. Sync version of `framed_recv` to avoid +/// dependency on tokio. +pub fn framed_recv_blocking(r: &mut (impl Read + Unpin)) -> io::Result> { let mut len_buf = [0u8; mem::size_of::()]; - r.read_exact(&mut len_buf).await?; + r.read_exact(&mut len_buf)?; let len = usize::from_le_bytes(len_buf); let mut buf = vec![0; len]; - r.read_exact(&mut buf).await?; + r.read_exact(&mut buf)?; Ok(buf) } diff --git a/polkadot/node/core/pvf/common/src/worker/mod.rs b/polkadot/node/core/pvf/common/src/worker/mod.rs index bcdf882f300c..59973f6cbbc6 100644 --- a/polkadot/node/core/pvf/common/src/worker/mod.rs +++ b/polkadot/node/core/pvf/common/src/worker/mod.rs @@ -18,16 +18,18 @@ pub mod security; -use crate::LOG_TARGET; +use crate::{worker_dir, SecurityStatus, LOG_TARGET}; use cpu_time::ProcessTime; use futures::never::Never; use std::{ any::Any, + fmt, + os::unix::net::UnixStream, path::PathBuf, sync::mpsc::{Receiver, RecvTimeoutError}, time::Duration, }; -use tokio::{io, net::UnixStream, runtime::Runtime}; +use tokio::{io, runtime::Runtime}; /// Use this macro to declare a `fn main() {}` that will create an executable that can be used for /// spawning the desired worker. @@ -41,10 +43,15 @@ macro_rules! decl_worker_main { } fn main() { + #[cfg(target_os = "linux")] + use $crate::worker::security; + // TODO: Remove this dependency, and `pub use sp_tracing` in `lib.rs`. // See . $crate::sp_tracing::try_init_simple(); + let worker_pid = std::process::id(); + let args = std::env::args().collect::>(); if args.len() == 1 { print_help($expected_command); @@ -60,10 +67,43 @@ macro_rules! decl_worker_main { println!("{}", $worker_version); return }, + + "--check-can-enable-landlock" => { + #[cfg(target_os = "linux")] + let status = if security::landlock::check_is_fully_enabled() { 0 } else { -1 }; + #[cfg(not(target_os = "linux"))] + let status = -1; + std::process::exit(status) + }, + "--check-can-unshare-user-namespace-and-change-root" => { + #[cfg(target_os = "linux")] + let status = if let Err(err) = security::unshare_user_namespace_and_change_root( + $crate::worker::WorkerKind::CheckPivotRoot, + worker_pid, + // We're not accessing any files, so we can try to pivot_root in the temp + // dir without conflicts with other processes. + &std::env::temp_dir(), + ) { + // Write the error to stderr, log it on the host-side. + eprintln!("{}", err); + -1 + } else { + 0 + }; + #[cfg(not(target_os = "linux"))] + let status = { + // Write the error to stderr, log it on the host-side. + eprintln!("not available on macos"); + -1 + }; + std::process::exit(status) + }, + "test-sleep" => { std::thread::sleep(std::time::Duration::from_secs(5)); return }, + subcommand => { // Must be passed for compatibility with the single-binary test workers. if subcommand != $expected_command { @@ -75,18 +115,39 @@ macro_rules! decl_worker_main { }, } + let mut worker_dir_path = None; let mut node_version = None; - let mut socket_path: &str = ""; + let mut can_enable_landlock = false; + let mut can_unshare_user_namespace_and_change_root = false; - for i in (2..args.len()).step_by(2) { + let mut i = 2; + while i < args.len() { match args[i].as_ref() { - "--socket-path" => socket_path = args[i + 1].as_str(), - "--node-impl-version" => node_version = Some(args[i + 1].as_str()), + "--worker-dir-path" => { + worker_dir_path = Some(args[i + 1].as_str()); + i += 1 + }, + "--node-impl-version" => { + node_version = Some(args[i + 1].as_str()); + i += 1 + }, + "--can-enable-landlock" => can_enable_landlock = true, + "--can-unshare-user-namespace-and-change-root" => + can_unshare_user_namespace_and_change_root = true, arg => panic!("Unexpected argument found: {}", arg), } + i += 1; } + let worker_dir_path = + worker_dir_path.expect("the --worker-dir-path argument is required"); + + let worker_dir_path = std::path::Path::new(worker_dir_path).to_owned(); + let security_status = $crate::SecurityStatus { + can_enable_landlock, + can_unshare_user_namespace_and_change_root, + }; - $entrypoint(&socket_path, node_version, Some($worker_version)); + $entrypoint(worker_dir_path, node_version, Some($worker_version), security_status); } }; } @@ -95,61 +156,181 @@ macro_rules! decl_worker_main { /// child process. pub const JOB_TIMEOUT_OVERHEAD: Duration = Duration::from_millis(50); -/// Interprets the given bytes as a path. Returns `None` if the given bytes do not constitute a -/// a proper utf-8 string. -pub fn bytes_to_path(bytes: &[u8]) -> Option { - std::str::from_utf8(bytes).ok().map(PathBuf::from) +#[derive(Debug, Clone, Copy)] +pub enum WorkerKind { + Prepare, + Execute, + CheckPivotRoot, +} + +impl fmt::Display for WorkerKind { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Prepare => write!(f, "prepare"), + Self::Execute => write!(f, "execute"), + Self::CheckPivotRoot => write!(f, "check pivot root"), + } + } } // The worker version must be passed in so that we accurately get the version of the worker, and not // the version that this crate was compiled with. pub fn worker_event_loop( - debug_id: &'static str, - socket_path: &str, + worker_kind: WorkerKind, + #[cfg_attr(not(target_os = "linux"), allow(unused_mut))] mut worker_dir_path: PathBuf, node_version: Option<&str>, worker_version: Option<&str>, + #[cfg_attr(not(target_os = "linux"), allow(unused_variables))] security_status: &SecurityStatus, mut event_loop: F, ) where - F: FnMut(UnixStream) -> Fut, + F: FnMut(UnixStream, PathBuf) -> Fut, Fut: futures::Future>, { let worker_pid = std::process::id(); - gum::debug!(target: LOG_TARGET, %worker_pid, "starting pvf worker ({})", debug_id); + gum::debug!( + target: LOG_TARGET, + %worker_pid, + ?worker_dir_path, + ?security_status, + "starting pvf worker ({})", + worker_kind + ); // Check for a mismatch between the node and worker versions. if let (Some(node_version), Some(worker_version)) = (node_version, worker_version) { if node_version != worker_version { gum::error!( target: LOG_TARGET, + %worker_kind, %worker_pid, %node_version, %worker_version, "Node and worker version mismatch, node needs restarting, forcing shutdown", ); kill_parent_node_in_emergency(); - let err = io::Error::new(io::ErrorKind::Unsupported, "Version mismatch"); - worker_shutdown_message(debug_id, worker_pid, err); + worker_shutdown_message(worker_kind, worker_pid, "Version mismatch"); return } } - remove_env_vars(debug_id); + // Make sure that we can read the worker dir path, and log its contents. + let entries = || -> Result, io::Error> { + std::fs::read_dir(&worker_dir_path)? + .map(|res| res.map(|e| e.file_name())) + .collect() + }(); + match entries { + Ok(entries) => + gum::trace!(target: LOG_TARGET, %worker_pid, ?worker_dir_path, "content of worker dir: {:?}", entries), + Err(err) => { + gum::error!( + target: LOG_TARGET, + %worker_kind, + %worker_pid, + ?worker_dir_path, + "Could not read worker dir: {}", + err.to_string() + ); + worker_shutdown_message(worker_kind, worker_pid, &err.to_string()); + return + }, + } + + // Connect to the socket. + let socket_path = worker_dir::socket(&worker_dir_path); + let stream = || -> std::io::Result { + let stream = UnixStream::connect(&socket_path)?; + // Remove the socket here. We don't also need to do this on the host-side; on failed + // rendezvous, the host will delete the whole worker dir. + std::fs::remove_file(&socket_path)?; + Ok(stream) + }(); + let stream = match stream { + Ok(s) => s, + Err(err) => { + gum::error!( + target: LOG_TARGET, + %worker_kind, + %worker_pid, + "{}", + err + ); + worker_shutdown_message(worker_kind, worker_pid, &err.to_string()); + return + }, + }; + + // Enable some security features. + { + // Call based on whether we can change root. Error out if it should work but fails. + // + // NOTE: This should not be called in a multi-threaded context (i.e. inside the tokio + // runtime). `unshare(2)`: + // + // > CLONE_NEWUSER requires that the calling process is not threaded. + #[cfg(target_os = "linux")] + if security_status.can_unshare_user_namespace_and_change_root { + if let Err(err) = security::unshare_user_namespace_and_change_root( + worker_kind, + worker_pid, + &worker_dir_path, + ) { + // The filesystem may be in an inconsistent state, bail out. + gum::error!( + target: LOG_TARGET, + %worker_kind, + %worker_pid, + ?worker_dir_path, + "Could not change root to be the worker cache path: {}", + err + ); + worker_shutdown_message(worker_kind, worker_pid, &err); + return + } + worker_dir_path = std::path::Path::new("/").to_owned(); + } + + #[cfg(target_os = "linux")] + if security_status.can_enable_landlock { + let landlock_status = + security::landlock::enable_for_worker(worker_kind, worker_pid, &worker_dir_path); + if !matches!(landlock_status, Ok(landlock::RulesetStatus::FullyEnforced)) { + // We previously were able to enable, so this should never happen. + // + // TODO: Make this a real error in secure-mode. See: + // + gum::error!( + target: LOG_TARGET, + %worker_kind, + %worker_pid, + "could not fully enable landlock: {:?}. This should not happen, please report to the Polkadot devs", + landlock_status + ); + } + } + + if !security::check_env_vars_were_cleared(worker_kind, worker_pid) { + let err = "not all env vars were cleared when spawning the process"; + gum::error!( + target: LOG_TARGET, + %worker_kind, + %worker_pid, + "{}", + err + ); + worker_shutdown_message(worker_kind, worker_pid, err); + return + } + } // Run the main worker loop. let rt = Runtime::new().expect("Creates tokio runtime. If this panics the worker will die and the host will detect that and deal with it."); let err = rt - .block_on(async move { - let stream = UnixStream::connect(socket_path).await?; - let _ = tokio::fs::remove_file(socket_path).await; - - let result = event_loop(stream).await; - - result - }) + .block_on(event_loop(stream, worker_dir_path)) // It's never `Ok` because it's `Ok(Never)`. .unwrap_err(); - worker_shutdown_message(debug_id, worker_pid, err); + worker_shutdown_message(worker_kind, worker_pid, &err.to_string()); // We don't want tokio to wait for the tasks to finish. We want to bring down the worker as fast // as possible and not wait for stalled validation to finish. This isn't strictly necessary now, @@ -157,51 +338,9 @@ pub fn worker_event_loop( rt.shutdown_background(); } -/// Delete all env vars to prevent malicious code from accessing them. -fn remove_env_vars(debug_id: &'static str) { - for (key, value) in std::env::vars_os() { - // TODO: *theoretically* the value (or mere presence) of `RUST_LOG` can be a source of - // randomness for malicious code. In the future we can remove it also and log in the host; - // see . - if key == "RUST_LOG" { - continue - } - - // In case of a key or value that would cause [`env::remove_var` to - // panic](https://doc.rust-lang.org/std/env/fn.remove_var.html#panics), we first log a - // warning and then proceed to attempt to remove the env var. - let mut err_reasons = vec![]; - let (key_str, value_str) = (key.to_str(), value.to_str()); - if key.is_empty() { - err_reasons.push("key is empty"); - } - if key_str.is_some_and(|s| s.contains('=')) { - err_reasons.push("key contains '='"); - } - if key_str.is_some_and(|s| s.contains('\0')) { - err_reasons.push("key contains null character"); - } - if value_str.is_some_and(|s| s.contains('\0')) { - err_reasons.push("value contains null character"); - } - if !err_reasons.is_empty() { - gum::warn!( - target: LOG_TARGET, - %debug_id, - ?key, - ?value, - "Attempting to remove badly-formatted env var, this may cause the PVF worker to crash. Please remove it yourself. Reasons: {:?}", - err_reasons - ); - } - - std::env::remove_var(key); - } -} - /// Provide a consistent message on worker shutdown. -fn worker_shutdown_message(debug_id: &'static str, worker_pid: u32, err: io::Error) { - gum::debug!(target: LOG_TARGET, %worker_pid, "quitting pvf worker ({}): {:?}", debug_id, err); +fn worker_shutdown_message(worker_kind: WorkerKind, worker_pid: u32, err: &str) { + gum::debug!(target: LOG_TARGET, %worker_pid, "quitting pvf worker ({}): {}", worker_kind, err); } /// Loop that runs in the CPU time monitor thread on prepare and execute jobs. Continuously wakes up @@ -305,7 +444,7 @@ pub mod thread { Arc::new((Mutex::new(WaitOutcome::Pending), Condvar::new())) } - /// Runs a worker thread. Will first enable security features, and afterwards notify the threads + /// Runs a worker thread. Will run the requested function, and afterwards notify the threads /// waiting on the condvar. Catches panics during execution and resumes the panics after /// triggering the condvar, so that the waiting thread is notified on panics. /// diff --git a/polkadot/node/core/pvf/common/src/worker/security.rs b/polkadot/node/core/pvf/common/src/worker/security.rs index 6c5f96e0b5db..b7abf028f941 100644 --- a/polkadot/node/core/pvf/common/src/worker/security.rs +++ b/polkadot/node/core/pvf/common/src/worker/security.rs @@ -17,30 +17,189 @@ //! Functionality for securing workers. //! //! This is needed because workers are used to compile and execute untrusted code (PVFs). +//! +//! We currently employ the following security measures: +//! +//! - Restrict filesystem +//! - Use Landlock to remove all unnecessary FS access rights. +//! - Unshare the user and mount namespaces. +//! - Change the root directory to a worker-specific temporary directory. +//! - Remove env vars + +use crate::{worker::WorkerKind, LOG_TARGET}; + +/// Unshare the user namespace and change root to be the artifact directory. +/// +/// NOTE: This should not be called in a multi-threaded context. `unshare(2)`: +/// "CLONE_NEWUSER requires that the calling process is not threaded." +#[cfg(target_os = "linux")] +pub fn unshare_user_namespace_and_change_root( + worker_kind: WorkerKind, + worker_pid: u32, + worker_dir_path: &std::path::Path, +) -> Result<(), String> { + use std::{env, ffi::CString, os::unix::ffi::OsStrExt, path::Path, ptr}; + + // The following was copied from the `cstr_core` crate. + // + // TODO: Remove this once this is stable: https://github.com/rust-lang/rust/issues/105723 + #[inline] + #[doc(hidden)] + const fn cstr_is_valid(bytes: &[u8]) -> bool { + if bytes.is_empty() || bytes[bytes.len() - 1] != 0 { + return false + } + + let mut index = 0; + while index < bytes.len() - 1 { + if bytes[index] == 0 { + return false + } + index += 1; + } + true + } -/// To what degree landlock is enabled. It's a separate struct from `RulesetStatus` because that is -/// only available on Linux, plus this has a nicer name. -pub enum LandlockStatus { - FullyEnforced, - PartiallyEnforced, - NotEnforced, - /// Thread panicked, we don't know what the status is. - Unavailable, + macro_rules! cstr { + ($e:expr) => {{ + const STR: &[u8] = concat!($e, "\0").as_bytes(); + const STR_VALID: bool = cstr_is_valid(STR); + let _ = [(); 0 - (!(STR_VALID) as usize)]; + #[allow(unused_unsafe)] + unsafe { + core::ffi::CStr::from_bytes_with_nul_unchecked(STR) + } + }} + } + + gum::debug!( + target: LOG_TARGET, + %worker_kind, + %worker_pid, + ?worker_dir_path, + "unsharing the user namespace and calling pivot_root", + ); + + let worker_dir_path_c = CString::new(worker_dir_path.as_os_str().as_bytes()) + .expect("on unix; the path will never contain 0 bytes; qed"); + + // Wrapper around all the work to prevent repetitive error handling. + // + // # Errors + // + // It's the caller's responsibility to call `Error::last_os_error`. Note that that alone does + // not give the context of which call failed, so we return a &str error. + || -> Result<(), &'static str> { + // SAFETY: We pass null-terminated C strings and use the APIs as documented. In fact, steps + // (2) and (3) are adapted from the example in pivot_root(2), with the additional + // change described in the `pivot_root(".", ".")` section. + unsafe { + // 1. `unshare` the user and the mount namespaces. + if libc::unshare(libc::CLONE_NEWUSER | libc::CLONE_NEWNS) < 0 { + return Err("unshare user and mount namespaces") + } + + // 2. Setup mounts. + // + // Ensure that new root and its parent mount don't have shared propagation (which would + // cause pivot_root() to return an error), and prevent propagation of mount events to + // the initial mount namespace. + if libc::mount( + ptr::null(), + cstr!("/").as_ptr(), + ptr::null(), + libc::MS_REC | libc::MS_PRIVATE, + ptr::null(), + ) < 0 + { + return Err("mount MS_PRIVATE") + } + // Ensure that the new root is a mount point. + let additional_flags = + if let WorkerKind::Execute | WorkerKind::CheckPivotRoot = worker_kind { + libc::MS_RDONLY + } else { + 0 + }; + if libc::mount( + worker_dir_path_c.as_ptr(), + worker_dir_path_c.as_ptr(), + ptr::null(), // ignored when MS_BIND is used + libc::MS_BIND | + libc::MS_REC | libc::MS_NOEXEC | + libc::MS_NODEV | libc::MS_NOSUID | + libc::MS_NOATIME | additional_flags, + ptr::null(), // ignored when MS_BIND is used + ) < 0 + { + return Err("mount MS_BIND") + } + + // 3. `pivot_root` to the artifact directory. + if libc::chdir(worker_dir_path_c.as_ptr()) < 0 { + return Err("chdir to worker dir path") + } + if libc::syscall(libc::SYS_pivot_root, cstr!(".").as_ptr(), cstr!(".").as_ptr()) < 0 { + return Err("pivot_root") + } + if libc::umount2(cstr!(".").as_ptr(), libc::MNT_DETACH) < 0 { + return Err("umount the old root mount point") + } + } + + Ok(()) + }() + .map_err(|err_ctx| { + let err = std::io::Error::last_os_error(); + format!("{}: {}", err_ctx, err) + })?; + + // Do some assertions. + if env::current_dir().map_err(|err| err.to_string())? != Path::new("/") { + return Err("expected current dir after pivot_root to be `/`".into()) + } + env::set_current_dir("..").map_err(|err| err.to_string())?; + if env::current_dir().map_err(|err| err.to_string())? != Path::new("/") { + return Err("expected not to be able to break out of new root by doing `..`".into()) + } + + Ok(()) } -impl LandlockStatus { - #[cfg(target_os = "linux")] - pub fn from_ruleset_status(ruleset_status: ::landlock::RulesetStatus) -> Self { - use ::landlock::RulesetStatus::*; - match ruleset_status { - FullyEnforced => LandlockStatus::FullyEnforced, - PartiallyEnforced => LandlockStatus::PartiallyEnforced, - NotEnforced => LandlockStatus::NotEnforced, +/// Require env vars to have been removed when spawning the process, to prevent malicious code from +/// accessing them. +pub fn check_env_vars_were_cleared(worker_kind: WorkerKind, worker_pid: u32) -> bool { + let mut ok = true; + + for (key, value) in std::env::vars_os() { + // TODO: *theoretically* the value (or mere presence) of `RUST_LOG` can be a source of + // randomness for malicious code. In the future we can remove it also and log in the host; + // see . + if key == "RUST_LOG" { + continue + } + // An exception for MacOS. This is not a secure platform anyway, so we let it slide. + #[cfg(target_os = "macos")] + if key == "__CF_USER_TEXT_ENCODING" { + continue } + + gum::error!( + target: LOG_TARGET, + %worker_kind, + %worker_pid, + ?key, + ?value, + "env var was present that should have been removed", + ); + + ok = false; } + + ok } -/// The [landlock] docs say it best: +/// The [landlock] docs say it best: /// /// > "Landlock is a security feature available since Linux 5.13. The goal is to enable to restrict /// ambient rights (e.g., global filesystem access) for a set of processes by creating safe security @@ -52,14 +211,21 @@ impl LandlockStatus { /// [landlock]: https://docs.rs/landlock/latest/landlock/index.html #[cfg(target_os = "linux")] pub mod landlock { - use landlock::{Access, AccessFs, Ruleset, RulesetAttr, RulesetError, RulesetStatus, ABI}; + pub use landlock::RulesetStatus; + + use crate::{worker::WorkerKind, LOG_TARGET}; + use landlock::*; + use std::{ + fmt, + path::{Path, PathBuf}, + }; /// Landlock ABI version. We use ABI V1 because: /// /// 1. It is supported by our reference kernel version. /// 2. Later versions do not (yet) provide additional security. /// - /// # Versions (June 2023) + /// # Versions (as of June 2023) /// /// - Polkadot reference kernel version: 5.16+ /// - ABI V1: 5.13 - introduces landlock, including full restrictions on file reads @@ -83,46 +249,103 @@ pub mod landlock { /// supports it or if it introduces some new feature that is beneficial to security. pub const LANDLOCK_ABI: ABI = ABI::V1; - // TODO: - /// Returns to what degree landlock is enabled with the given ABI on the current Linux - /// environment. - pub fn get_status() -> Result> { - match std::thread::spawn(|| try_restrict_thread()).join() { - Ok(Ok(status)) => Ok(status), - Ok(Err(ruleset_err)) => Err(ruleset_err.into()), - Err(_err) => Err("a panic occurred in try_restrict_thread".into()), + #[derive(Debug)] + pub enum TryRestrictError { + InvalidExceptionPath(PathBuf), + RulesetError(RulesetError), + } + + impl From for TryRestrictError { + fn from(err: RulesetError) -> Self { + Self::RulesetError(err) } } - /// Based on the given `status`, returns a single bool indicating whether the given landlock - /// ABI is fully enabled on the current Linux environment. - pub fn status_is_fully_enabled( - status: &Result>, - ) -> bool { - matches!(status, Ok(RulesetStatus::FullyEnforced)) + impl fmt::Display for TryRestrictError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::InvalidExceptionPath(path) => write!(f, "invalid exception path: {:?}", path), + Self::RulesetError(err) => write!(f, "ruleset error: {}", err.to_string()), + } + } } + impl std::error::Error for TryRestrictError {} + + /// Try to enable landlock for the given kind of worker. + pub fn enable_for_worker( + worker_kind: WorkerKind, + worker_pid: u32, + worker_dir_path: &Path, + ) -> Result> { + let exceptions: Vec<(PathBuf, BitFlags)> = match worker_kind { + WorkerKind::Prepare => { + vec![(worker_dir_path.to_owned(), AccessFs::WriteFile.into())] + }, + WorkerKind::Execute => { + vec![(worker_dir_path.to_owned(), AccessFs::ReadFile.into())] + }, + WorkerKind::CheckPivotRoot => + panic!("this should only be passed for checking pivot_root; qed"), + }; + + gum::debug!( + target: LOG_TARGET, + %worker_kind, + %worker_pid, + ?worker_dir_path, + "enabling landlock with exceptions: {:?}", + exceptions, + ); + + Ok(try_restrict(exceptions)?) + } + + // TODO: /// Runs a check for landlock and returns a single bool indicating whether the given landlock /// ABI is fully enabled on the current Linux environment. pub fn check_is_fully_enabled() -> bool { - status_is_fully_enabled(&get_status()) + let status_from_thread: Result> = + match std::thread::spawn(|| try_restrict(std::iter::empty::<(PathBuf, AccessFs)>())) + .join() + { + Ok(Ok(status)) => Ok(status), + Ok(Err(ruleset_err)) => Err(ruleset_err.into()), + Err(_err) => Err("a panic occurred in try_restrict".into()), + }; + + matches!(status_from_thread, Ok(RulesetStatus::FullyEnforced)) } - /// Tries to restrict the current thread with the following landlock access controls: + /// Tries to restrict the current thread (should only be called in a process' main thread) with + /// the following landlock access controls: /// - /// 1. all global filesystem access - /// 2. ... more may be supported in the future. + /// 1. all global filesystem access restricted, with optional exceptions + /// 2. ... more sandbox types (e.g. networking) may be supported in the future. /// /// If landlock is not supported in the current environment this is simply a noop. /// /// # Returns /// /// The status of the restriction (whether it was fully, partially, or not-at-all enforced). - pub fn try_restrict_thread() -> Result { - let status = Ruleset::new() - .handle_access(AccessFs::from_all(LANDLOCK_ABI))? - .create()? - .restrict_self()?; + fn try_restrict(fs_exceptions: I) -> Result + where + I: IntoIterator, + P: AsRef, + A: Into>, + { + let mut ruleset = + Ruleset::new().handle_access(AccessFs::from_all(LANDLOCK_ABI))?.create()?; + for (fs_path, access_bits) in fs_exceptions { + let paths = &[fs_path.as_ref().to_owned()]; + let mut rules = path_beneath_rules(paths, access_bits).peekable(); + if rules.peek().is_none() { + // `path_beneath_rules` silently ignores missing paths, so check for it manually. + return Err(TryRestrictError::InvalidExceptionPath(fs_path.as_ref().to_owned())) + } + ruleset = ruleset.add_rules(rules)?; + } + let status = ruleset.restrict_self()?; Ok(status.ruleset) } @@ -132,55 +355,114 @@ pub mod landlock { use std::{fs, io::ErrorKind, thread}; #[test] - fn restricted_thread_cannot_access_fs() { + fn restricted_thread_cannot_read_file() { // TODO: This would be nice: . if !check_is_fully_enabled() { return } // Restricted thread cannot read from FS. - let handle = thread::spawn(|| { - // Write to a tmp file, this should succeed before landlock is applied. - let text = "foo"; - let tmpfile = tempfile::NamedTempFile::new().unwrap(); - let path = tmpfile.path(); - fs::write(path, text).unwrap(); - let s = fs::read_to_string(path).unwrap(); - assert_eq!(s, text); - - let status = try_restrict_thread().unwrap(); - if !matches!(status, RulesetStatus::FullyEnforced) { - panic!("Ruleset should be enforced since we checked if landlock is enabled"); - } - - // Try to read from the tmp file after landlock. - let result = fs::read_to_string(path); - assert!(matches!( - result, - Err(err) if matches!(err.kind(), ErrorKind::PermissionDenied) - )); - }); + let handle = + thread::spawn(|| { + // Create, write, and read two tmp files. This should succeed before any + // landlock restrictions are applied. + const TEXT: &str = "foo"; + let tmpfile1 = tempfile::NamedTempFile::new().unwrap(); + let path1 = tmpfile1.path(); + let tmpfile2 = tempfile::NamedTempFile::new().unwrap(); + let path2 = tmpfile2.path(); + + fs::write(path1, TEXT).unwrap(); + let s = fs::read_to_string(path1).unwrap(); + assert_eq!(s, TEXT); + fs::write(path2, TEXT).unwrap(); + let s = fs::read_to_string(path2).unwrap(); + assert_eq!(s, TEXT); + + // Apply Landlock with a read exception for only one of the files. + let status = try_restrict(vec![(path1, AccessFs::ReadFile)]); + if !matches!(status, Ok(RulesetStatus::FullyEnforced)) { + panic!("Ruleset should be enforced since we checked if landlock is enabled: {:?}", status); + } + + // Try to read from both files, only tmpfile1 should succeed. + let result = fs::read_to_string(path1); + assert!(matches!( + result, + Ok(s) if s == TEXT + )); + let result = fs::read_to_string(path2); + assert!(matches!( + result, + Err(err) if matches!(err.kind(), ErrorKind::PermissionDenied) + )); + + // Apply Landlock for all files. + let status = try_restrict(std::iter::empty::<(PathBuf, AccessFs)>()); + if !matches!(status, Ok(RulesetStatus::FullyEnforced)) { + panic!("Ruleset should be enforced since we checked if landlock is enabled: {:?}", status); + } + + // Try to read from tmpfile1 after landlock, it should fail. + let result = fs::read_to_string(path1); + assert!(matches!( + result, + Err(err) if matches!(err.kind(), ErrorKind::PermissionDenied) + )); + }); assert!(handle.join().is_ok()); + } + + #[test] + fn restricted_thread_cannot_write_file() { + // TODO: This would be nice: . + if !check_is_fully_enabled() { + return + } // Restricted thread cannot write to FS. - let handle = thread::spawn(|| { - let text = "foo"; - let tmpfile = tempfile::NamedTempFile::new().unwrap(); - let path = tmpfile.path(); - - let status = try_restrict_thread().unwrap(); - if !matches!(status, RulesetStatus::FullyEnforced) { - panic!("Ruleset should be enforced since we checked if landlock is enabled"); - } - - // Try to write to the tmp file after landlock. - let result = fs::write(path, text); - assert!(matches!( - result, - Err(err) if matches!(err.kind(), ErrorKind::PermissionDenied) - )); - }); + let handle = + thread::spawn(|| { + // Create and write two tmp files. This should succeed before any landlock + // restrictions are applied. + const TEXT: &str = "foo"; + let tmpfile1 = tempfile::NamedTempFile::new().unwrap(); + let path1 = tmpfile1.path(); + let tmpfile2 = tempfile::NamedTempFile::new().unwrap(); + let path2 = tmpfile2.path(); + + fs::write(path1, TEXT).unwrap(); + fs::write(path2, TEXT).unwrap(); + + // Apply Landlock with a write exception for only one of the files. + let status = try_restrict(vec![(path1, AccessFs::WriteFile)]); + if !matches!(status, Ok(RulesetStatus::FullyEnforced)) { + panic!("Ruleset should be enforced since we checked if landlock is enabled: {:?}", status); + } + + // Try to write to both files, only tmpfile1 should succeed. + let result = fs::write(path1, TEXT); + assert!(matches!(result, Ok(_))); + let result = fs::write(path2, TEXT); + assert!(matches!( + result, + Err(err) if matches!(err.kind(), ErrorKind::PermissionDenied) + )); + + // Apply Landlock for all files. + let status = try_restrict(std::iter::empty::<(PathBuf, AccessFs)>()); + if !matches!(status, Ok(RulesetStatus::FullyEnforced)) { + panic!("Ruleset should be enforced since we checked if landlock is enabled: {:?}", status); + } + + // Try to write to tmpfile1 after landlock, it should fail. + let result = fs::write(path1, TEXT); + assert!(matches!( + result, + Err(err) if matches!(err.kind(), ErrorKind::PermissionDenied) + )); + }); assert!(handle.join().is_ok()); } diff --git a/polkadot/node/core/pvf/common/src/worker_dir.rs b/polkadot/node/core/pvf/common/src/worker_dir.rs new file mode 100644 index 000000000000..c2610a4d1128 --- /dev/null +++ b/polkadot/node/core/pvf/common/src/worker_dir.rs @@ -0,0 +1,35 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Shared functions for getting the known worker files. + +use std::path::{Path, PathBuf}; + +const WORKER_EXECUTE_ARTIFACT_NAME: &str = "artifact"; +const WORKER_PREPARE_TMP_ARTIFACT_NAME: &str = "tmp-artifact"; +const WORKER_SOCKET_NAME: &str = "socket"; + +pub fn execute_artifact(worker_dir_path: &Path) -> PathBuf { + worker_dir_path.join(WORKER_EXECUTE_ARTIFACT_NAME) +} + +pub fn prepare_tmp_artifact(worker_dir_path: &Path) -> PathBuf { + worker_dir_path.join(WORKER_PREPARE_TMP_ARTIFACT_NAME) +} + +pub fn socket(worker_dir_path: &Path) -> PathBuf { + worker_dir_path.join(WORKER_SOCKET_NAME) +} diff --git a/polkadot/node/core/pvf/execute-worker/src/lib.rs b/polkadot/node/core/pvf/execute-worker/src/lib.rs index 36793a5c71ec..9d7bfdf28669 100644 --- a/polkadot/node/core/pvf/execute-worker/src/lib.rs +++ b/polkadot/node/core/pvf/execute-worker/src/lib.rs @@ -16,7 +16,7 @@ //! Contains the logic for executing PVFs. Used by the polkadot-execute-worker binary. -pub use polkadot_node_core_pvf_common::executor_intf::Executor; +pub use polkadot_node_core_pvf_common::{executor_intf::Executor, worker_dir, SecurityStatus}; // NOTE: Initializing logging in e.g. tests will not have an effect in the workers, as they are // separate spawned processes. Run with e.g. `RUST_LOG=parachain::pvf-execute-worker=trace`. @@ -28,22 +28,21 @@ use polkadot_node_core_pvf_common::{ error::InternalValidationError, execute::{Handshake, Response}, executor_intf::NATIVE_STACK_MAX, - framed_recv, framed_send, + framed_recv_blocking, framed_send_blocking, worker::{ - bytes_to_path, cpu_time_monitor_loop, - security::LandlockStatus, - stringify_panic_payload, + cpu_time_monitor_loop, stringify_panic_payload, thread::{self, WaitOutcome}, - worker_event_loop, + worker_event_loop, WorkerKind, }, }; use polkadot_parachain_primitives::primitives::ValidationResult; use std::{ + os::unix::net::UnixStream, path::PathBuf, sync::{mpsc::channel, Arc}, time::Duration, }; -use tokio::{io, net::UnixStream}; +use tokio::io; // Wasmtime powers the Substrate Executor. It compiles the wasm bytecode into native code. // That native code does not create any stacks and just reuses the stack of the thread that @@ -81,8 +80,8 @@ use tokio::{io, net::UnixStream}; /// The stack size for the execute thread. pub const EXECUTE_THREAD_STACK_SIZE: usize = 2 * 1024 * 1024 + NATIVE_STACK_MAX as usize; -async fn recv_handshake(stream: &mut UnixStream) -> io::Result { - let handshake_enc = framed_recv(stream).await?; +fn recv_handshake(stream: &mut UnixStream) -> io::Result { + let handshake_enc = framed_recv_blocking(stream)?; let handshake = Handshake::decode(&mut &handshake_enc[..]).map_err(|_| { io::Error::new( io::ErrorKind::Other, @@ -92,57 +91,58 @@ async fn recv_handshake(stream: &mut UnixStream) -> io::Result { Ok(handshake) } -async fn recv_request(stream: &mut UnixStream) -> io::Result<(PathBuf, Vec, Duration)> { - let artifact_path = framed_recv(stream).await?; - let artifact_path = bytes_to_path(&artifact_path).ok_or_else(|| { - io::Error::new( - io::ErrorKind::Other, - "execute pvf recv_request: non utf-8 artifact path".to_string(), - ) - })?; - let params = framed_recv(stream).await?; - let execution_timeout = framed_recv(stream).await?; +fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec, Duration)> { + let params = framed_recv_blocking(stream)?; + let execution_timeout = framed_recv_blocking(stream)?; let execution_timeout = Duration::decode(&mut &execution_timeout[..]).map_err(|_| { io::Error::new( io::ErrorKind::Other, "execute pvf recv_request: failed to decode duration".to_string(), ) })?; - Ok((artifact_path, params, execution_timeout)) + Ok((params, execution_timeout)) } -async fn send_response(stream: &mut UnixStream, response: Response) -> io::Result<()> { - framed_send(stream, &response.encode()).await +fn send_response(stream: &mut UnixStream, response: Response) -> io::Result<()> { + framed_send_blocking(stream, &response.encode()) } /// The entrypoint that the spawned execute worker should start with. /// /// # Parameters /// -/// The `socket_path` specifies the path to the socket used to communicate with the host. The -/// `node_version`, if `Some`, is checked against the worker version. A mismatch results in -/// immediate worker termination. `None` is used for tests and in other situations when version -/// check is not necessary. +/// - `worker_dir_path`: specifies the path to the worker-specific temporary directory. +/// +/// - `node_version`: if `Some`, is checked against the `worker_version`. A mismatch results in +/// immediate worker termination. `None` is used for tests and in other situations when version +/// check is not necessary. +/// +/// - `worker_version`: see above +/// +/// - `security_status`: contains the detected status of security features. pub fn worker_entrypoint( - socket_path: &str, + worker_dir_path: PathBuf, node_version: Option<&str>, worker_version: Option<&str>, + security_status: SecurityStatus, ) { worker_event_loop( - "execute", - socket_path, + WorkerKind::Execute, + worker_dir_path, node_version, worker_version, - |mut stream| async move { + &security_status, + |mut stream, worker_dir_path| async move { let worker_pid = std::process::id(); + let artifact_path = worker_dir::execute_artifact(&worker_dir_path); - let handshake = recv_handshake(&mut stream).await?; - let executor = Executor::new(handshake.executor_params).map_err(|e| { + let Handshake { executor_params } = recv_handshake(&mut stream)?; + let executor = Executor::new(executor_params).map_err(|e| { io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e)) })?; loop { - let (artifact_path, params, execution_timeout) = recv_request(&mut stream).await?; + let (params, execution_timeout) = recv_request(&mut stream)?; gum::debug!( target: LOG_TARGET, %worker_pid, @@ -151,15 +151,13 @@ pub fn worker_entrypoint( ); // Get the artifact bytes. - // - // We do this outside the thread so that we can lock down filesystem access there. - let compiled_artifact_blob = match std::fs::read(artifact_path) { + let compiled_artifact_blob = match std::fs::read(&artifact_path) { Ok(bytes) => bytes, Err(err) => { let response = Response::InternalError( InternalValidationError::CouldNotOpenFile(err.to_string()), ); - send_response(&mut stream, response).await?; + send_response(&mut stream, response)?; continue }, }; @@ -187,22 +185,11 @@ pub fn worker_entrypoint( let execute_thread = thread::spawn_worker_thread_with_stack_size( "execute thread", move || { - // Try to enable landlock. - #[cfg(target_os = "linux")] - let landlock_status = polkadot_node_core_pvf_common::worker::security::landlock::try_restrict_thread() - .map(LandlockStatus::from_ruleset_status) - .map_err(|e| e.to_string()); - #[cfg(not(target_os = "linux"))] - let landlock_status: Result = Ok(LandlockStatus::NotEnforced); - - ( - validate_using_artifact( - &compiled_artifact_blob, - ¶ms, - executor_2, - cpu_time_start, - ), - landlock_status, + validate_using_artifact( + &compiled_artifact_blob, + ¶ms, + executor_2, + cpu_time_start, ) }, Arc::clone(&condvar), @@ -215,24 +202,9 @@ pub fn worker_entrypoint( let response = match outcome { WaitOutcome::Finished => { let _ = cpu_time_monitor_tx.send(()); - let (result, landlock_status) = execute_thread.join().unwrap_or_else(|e| { - ( - Response::Panic(stringify_panic_payload(e)), - Ok(LandlockStatus::Unavailable), - ) - }); - - // Log if landlock threw an error. - if let Err(err) = landlock_status { - gum::warn!( - target: LOG_TARGET, - %worker_pid, - "error enabling landlock: {}", - err - ); - } - - result + execute_thread + .join() + .unwrap_or_else(|e| Response::Panic(stringify_panic_payload(e))) }, // If the CPU thread is not selected, we signal it to end, the join handle is // dropped and the thread will finish in the background. @@ -267,7 +239,13 @@ pub fn worker_entrypoint( ), }; - send_response(&mut stream, response).await?; + gum::trace!( + target: LOG_TARGET, + %worker_pid, + "worker: sending response to host: {:?}", + response + ); + send_response(&mut stream, response)?; } }, ); diff --git a/polkadot/node/core/pvf/prepare-worker/Cargo.toml b/polkadot/node/core/pvf/prepare-worker/Cargo.toml index e7a12cd9a809..886209b78c32 100644 --- a/polkadot/node/core/pvf/prepare-worker/Cargo.toml +++ b/polkadot/node/core/pvf/prepare-worker/Cargo.toml @@ -7,6 +7,7 @@ edition.workspace = true license.workspace = true [dependencies] +cfg-if = "1.0" futures = "0.3.21" gum = { package = "tracing-gum", path = "../../../gum" } libc = "0.2.139" diff --git a/polkadot/node/core/pvf/prepare-worker/src/lib.rs b/polkadot/node/core/pvf/prepare-worker/src/lib.rs index caa7d33df12a..a24f5024722b 100644 --- a/polkadot/node/core/pvf/prepare-worker/src/lib.rs +++ b/polkadot/node/core/pvf/prepare-worker/src/lib.rs @@ -33,25 +33,24 @@ use parity_scale_codec::{Decode, Encode}; use polkadot_node_core_pvf_common::{ error::{PrepareError, PrepareResult}, executor_intf::Executor, - framed_recv, framed_send, + framed_recv_blocking, framed_send_blocking, prepare::{MemoryStats, PrepareJobKind, PrepareStats}, pvf::PvfPrepData, worker::{ - bytes_to_path, cpu_time_monitor_loop, - security::LandlockStatus, - stringify_panic_payload, + cpu_time_monitor_loop, stringify_panic_payload, thread::{self, WaitOutcome}, - worker_event_loop, + worker_event_loop, WorkerKind, }, - ProcessTime, + worker_dir, ProcessTime, SecurityStatus, }; use polkadot_primitives::ExecutorParams; use std::{ + os::unix::net::UnixStream, path::PathBuf, sync::{mpsc::channel, Arc}, time::Duration, }; -use tokio::{io, net::UnixStream}; +use tokio::io; /// Contains the bytes for a successfully compiled artifact. pub struct CompiledArtifact(Vec); @@ -69,36 +68,34 @@ impl AsRef<[u8]> for CompiledArtifact { } } -async fn recv_request(stream: &mut UnixStream) -> io::Result<(PvfPrepData, PathBuf)> { - let pvf = framed_recv(stream).await?; +fn recv_request(stream: &mut UnixStream) -> io::Result { + let pvf = framed_recv_blocking(stream)?; let pvf = PvfPrepData::decode(&mut &pvf[..]).map_err(|e| { io::Error::new( io::ErrorKind::Other, format!("prepare pvf recv_request: failed to decode PvfPrepData: {}", e), ) })?; - let tmp_file = framed_recv(stream).await?; - let tmp_file = bytes_to_path(&tmp_file).ok_or_else(|| { - io::Error::new( - io::ErrorKind::Other, - "prepare pvf recv_request: non utf-8 artifact path".to_string(), - ) - })?; - Ok((pvf, tmp_file)) + Ok(pvf) } -async fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Result<()> { - framed_send(stream, &result.encode()).await +fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Result<()> { + framed_send_blocking(stream, &result.encode()) } /// The entrypoint that the spawned prepare worker should start with. /// /// # Parameters /// -/// The `socket_path` specifies the path to the socket used to communicate with the host. The -/// `node_version`, if `Some`, is checked against the worker version. A mismatch results in -/// immediate worker termination. `None` is used for tests and in other situations when version -/// check is not necessary. +/// - `worker_dir_path`: specifies the path to the worker-specific temporary directory. +/// +/// - `node_version`: if `Some`, is checked against the `worker_version`. A mismatch results in +/// immediate worker termination. `None` is used for tests and in other situations when version +/// check is not necessary. +/// +/// - `worker_version`: see above +/// +/// - `security_status`: contains the detected status of security features. /// /// # Flow /// @@ -119,20 +116,23 @@ async fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Re /// 7. Send the result of preparation back to the host. If any error occurred in the above steps, we /// send that in the `PrepareResult`. pub fn worker_entrypoint( - socket_path: &str, + worker_dir_path: PathBuf, node_version: Option<&str>, worker_version: Option<&str>, + security_status: SecurityStatus, ) { worker_event_loop( - "prepare", - socket_path, + WorkerKind::Prepare, + worker_dir_path, node_version, worker_version, - |mut stream| async move { + &security_status, + |mut stream, worker_dir_path| async move { let worker_pid = std::process::id(); + let temp_artifact_dest = worker_dir::prepare_tmp_artifact(&worker_dir_path); loop { - let (pvf, temp_artifact_dest) = recv_request(&mut stream).await?; + let pvf = recv_request(&mut stream)?; gum::debug!( target: LOG_TARGET, %worker_pid, @@ -172,14 +172,6 @@ pub fn worker_entrypoint( let prepare_thread = thread::spawn_worker_thread( "prepare thread", move || { - // Try to enable landlock. - #[cfg(target_os = "linux")] - let landlock_status = polkadot_node_core_pvf_common::worker::security::landlock::try_restrict_thread() - .map(LandlockStatus::from_ruleset_status) - .map_err(|e| e.to_string()); - #[cfg(not(target_os = "linux"))] - let landlock_status: Result = Ok(LandlockStatus::NotEnforced); - #[allow(unused_mut)] let mut result = prepare_artifact(pvf, cpu_time_start); @@ -200,7 +192,7 @@ pub fn worker_entrypoint( }); } - (result, landlock_status) + result }, Arc::clone(&condvar), WaitOutcome::Finished, @@ -213,20 +205,20 @@ pub fn worker_entrypoint( let _ = cpu_time_monitor_tx.send(()); match prepare_thread.join().unwrap_or_else(|err| { - ( - Err(PrepareError::Panic(stringify_panic_payload(err))), - Ok(LandlockStatus::Unavailable), - ) + Err(PrepareError::Panic(stringify_panic_payload(err))) }) { - (Err(err), _) => { + Err(err) => { // Serialized error will be written into the socket. Err(err) }, - (Ok(ok), landlock_status) => { - #[cfg(not(target_os = "linux"))] - let (artifact, cpu_time_elapsed) = ok; - #[cfg(target_os = "linux")] - let (artifact, cpu_time_elapsed, max_rss) = ok; + Ok(ok) => { + cfg_if::cfg_if! { + if #[cfg(target_os = "linux")] { + let (artifact, cpu_time_elapsed, max_rss) = ok; + } else { + let (artifact, cpu_time_elapsed) = ok; + } + } // Stop the memory stats worker and get its observed memory stats. #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] @@ -242,16 +234,6 @@ pub fn worker_entrypoint( max_rss: extract_max_rss_stat(max_rss, worker_pid), }; - // Log if landlock threw an error. - if let Err(err) = landlock_status { - gum::warn!( - target: LOG_TARGET, - %worker_pid, - "error enabling landlock: {}", - err - ); - } - // Write the serialized artifact into a temp file. // // PVF host only keeps artifacts statuses in its memory, @@ -300,7 +282,13 @@ pub fn worker_entrypoint( ), }; - send_response(&mut stream, result).await?; + gum::trace!( + target: LOG_TARGET, + %worker_pid, + "worker: sending response to host: {:?}", + result + ); + send_response(&mut stream, result)?; } }, ); diff --git a/polkadot/node/core/pvf/src/artifacts.rs b/polkadot/node/core/pvf/src/artifacts.rs index dc5921df9688..5a1767af75b7 100644 --- a/polkadot/node/core/pvf/src/artifacts.rs +++ b/polkadot/node/core/pvf/src/artifacts.rs @@ -172,9 +172,10 @@ impl Artifacts { /// /// The recognized artifacts will be filled in the table and unrecognized will be removed. pub async fn new(cache_path: &Path) -> Self { - // Make sure that the cache path directory and all its parents are created. - // First delete the entire cache. Nodes are long-running so this should populate shortly. + // First delete the entire cache. This includes artifacts and any leftover worker dirs (see + // [`WorkerDir`]). Nodes are long-running so this should populate shortly. let _ = tokio::fs::remove_dir_all(cache_path).await; + // Make sure that the cache path directory and all its parents are created. let _ = tokio::fs::create_dir_all(cache_path).await; Self { artifacts: HashMap::new() } @@ -295,7 +296,7 @@ mod tests { #[tokio::test] async fn artifacts_removes_cache_on_startup() { - let fake_cache_path = crate::worker_intf::tmpfile("test-cache").await.unwrap(); + let fake_cache_path = crate::worker_intf::tmppath("test-cache").await.unwrap(); let fake_artifact_path = { let mut p = fake_cache_path.clone(); p.push("wasmtime_0x1234567890123456789012345678901234567890123456789012345678901234"); diff --git a/polkadot/node/core/pvf/src/execute/queue.rs b/polkadot/node/core/pvf/src/execute/queue.rs index acb260e25693..aca604f0de21 100644 --- a/polkadot/node/core/pvf/src/execute/queue.rs +++ b/polkadot/node/core/pvf/src/execute/queue.rs @@ -30,6 +30,7 @@ use futures::{ stream::{FuturesUnordered, StreamExt as _}, Future, FutureExt, }; +use polkadot_node_core_pvf_common::SecurityStatus; use polkadot_primitives::{ExecutorParams, ExecutorParamsHash}; use slotmap::HopSlotMap; use std::{ @@ -139,8 +140,10 @@ struct Queue { // Some variables related to the current session. program_path: PathBuf, + cache_path: PathBuf, spawn_timeout: Duration, node_version: Option, + security_status: SecurityStatus, /// The queue of jobs that are waiting for a worker to pick up. queue: VecDeque, @@ -152,16 +155,20 @@ impl Queue { fn new( metrics: Metrics, program_path: PathBuf, + cache_path: PathBuf, worker_capacity: usize, spawn_timeout: Duration, node_version: Option, + security_status: SecurityStatus, to_queue_rx: mpsc::Receiver, ) -> Self { Self { metrics, program_path, + cache_path, spawn_timeout, node_version, + security_status, to_queue_rx, queue: VecDeque::new(), mux: Mux::new(), @@ -405,9 +412,11 @@ fn spawn_extra_worker(queue: &mut Queue, job: ExecuteJob) { queue.mux.push( spawn_worker_task( queue.program_path.clone(), + queue.cache_path.clone(), job, queue.spawn_timeout, queue.node_version.clone(), + queue.security_status.clone(), ) .boxed(), ); @@ -423,18 +432,22 @@ fn spawn_extra_worker(queue: &mut Queue, job: ExecuteJob) { /// execute other jobs with a compatible execution environment. async fn spawn_worker_task( program_path: PathBuf, + cache_path: PathBuf, job: ExecuteJob, spawn_timeout: Duration, node_version: Option, + security_status: SecurityStatus, ) -> QueueEvent { use futures_timer::Delay; loop { match super::worker_intf::spawn( &program_path, + &cache_path, job.executor_params.clone(), spawn_timeout, node_version.as_deref(), + security_status.clone(), ) .await { @@ -496,17 +509,21 @@ fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) { pub fn start( metrics: Metrics, program_path: PathBuf, + cache_path: PathBuf, worker_capacity: usize, spawn_timeout: Duration, node_version: Option, + security_status: SecurityStatus, ) -> (mpsc::Sender, impl Future) { let (to_queue_tx, to_queue_rx) = mpsc::channel(20); let run = Queue::new( metrics, program_path, + cache_path, worker_capacity, spawn_timeout, node_version, + security_status, to_queue_rx, ) .run(); diff --git a/polkadot/node/core/pvf/src/execute/worker_intf.rs b/polkadot/node/core/pvf/src/execute/worker_intf.rs index d66444a8102d..783c7c7abbc8 100644 --- a/polkadot/node/core/pvf/src/execute/worker_intf.rs +++ b/polkadot/node/core/pvf/src/execute/worker_intf.rs @@ -19,8 +19,8 @@ use crate::{ artifacts::ArtifactPathId, worker_intf::{ - path_to_bytes, spawn_with_program_path, IdleWorker, SpawnErr, WorkerHandle, - JOB_TIMEOUT_WALL_CLOCK_FACTOR, + clear_worker_dir_path, framed_recv, framed_send, spawn_with_program_path, IdleWorker, + SpawnErr, WorkerDir, WorkerHandle, JOB_TIMEOUT_WALL_CLOCK_FACTOR, }, LOG_TARGET, }; @@ -30,7 +30,7 @@ use parity_scale_codec::{Decode, Encode}; use polkadot_node_core_pvf_common::{ error::InternalValidationError, execute::{Handshake, Response}, - framed_recv, framed_send, + worker_dir, SecurityStatus, }; use polkadot_parachain_primitives::primitives::ValidationResult; use polkadot_primitives::ExecutorParams; @@ -38,21 +38,30 @@ use std::{path::Path, time::Duration}; use tokio::{io, net::UnixStream}; /// Spawns a new worker with the given program path that acts as the worker and the spawn timeout. -/// Sends a handshake message to the worker as soon as it is spawned. /// -/// The program should be able to handle ` execute-worker ` invocation. +/// Sends a handshake message to the worker as soon as it is spawned. pub async fn spawn( program_path: &Path, + cache_path: &Path, executor_params: ExecutorParams, spawn_timeout: Duration, node_version: Option<&str>, + security_status: SecurityStatus, ) -> Result<(IdleWorker, WorkerHandle), SpawnErr> { let mut extra_args = vec!["execute-worker"]; if let Some(node_version) = node_version { extra_args.extend_from_slice(&["--node-impl-version", node_version]); } - let (mut idle_worker, worker_handle) = - spawn_with_program_path("execute", program_path, &extra_args, spawn_timeout).await?; + + let (mut idle_worker, worker_handle) = spawn_with_program_path( + "execute", + program_path, + cache_path, + &extra_args, + spawn_timeout, + security_status, + ) + .await?; send_handshake(&mut idle_worker.stream, Handshake { executor_params }) .await .map_err(|error| { @@ -104,89 +113,151 @@ pub async fn start_work( execution_timeout: Duration, validation_params: Vec, ) -> Outcome { - let IdleWorker { mut stream, pid } = worker; + let IdleWorker { mut stream, pid, worker_dir } = worker; gum::debug!( target: LOG_TARGET, worker_pid = %pid, + ?worker_dir, validation_code_hash = ?artifact.id.code_hash, "starting execute for {}", artifact.path.display(), ); - if let Err(error) = - send_request(&mut stream, &artifact.path, &validation_params, execution_timeout).await - { + with_worker_dir_setup(worker_dir, pid, &artifact.path, |worker_dir| async move { + if let Err(error) = send_request(&mut stream, &validation_params, execution_timeout).await { + gum::warn!( + target: LOG_TARGET, + worker_pid = %pid, + validation_code_hash = ?artifact.id.code_hash, + ?error, + "failed to send an execute request", + ); + return Outcome::IoErr + } + + // We use a generous timeout here. This is in addition to the one in the child process, in + // case the child stalls. We have a wall clock timeout here in the host, but a CPU timeout + // in the child. We want to use CPU time because it varies less than wall clock time under + // load, but the CPU resources of the child can only be measured from the parent after the + // child process terminates. + let timeout = execution_timeout * JOB_TIMEOUT_WALL_CLOCK_FACTOR; + let response = futures::select! { + response = recv_response(&mut stream).fuse() => { + match response { + Err(error) => { + gum::warn!( + target: LOG_TARGET, + worker_pid = %pid, + validation_code_hash = ?artifact.id.code_hash, + ?error, + "failed to recv an execute response", + ); + return Outcome::IoErr + }, + Ok(response) => { + if let Response::Ok{duration, ..} = response { + if duration > execution_timeout { + // The job didn't complete within the timeout. + gum::warn!( + target: LOG_TARGET, + worker_pid = %pid, + "execute job took {}ms cpu time, exceeded execution timeout {}ms.", + duration.as_millis(), + execution_timeout.as_millis(), + ); + + // Return a timeout error. + return Outcome::HardTimeout; + } + } + + response + }, + } + }, + _ = Delay::new(timeout).fuse() => { + gum::warn!( + target: LOG_TARGET, + worker_pid = %pid, + validation_code_hash = ?artifact.id.code_hash, + "execution worker exceeded lenient timeout for execution, child worker likely stalled", + ); + Response::TimedOut + }, + }; + + match response { + Response::Ok { result_descriptor, duration } => Outcome::Ok { + result_descriptor, + duration, + idle_worker: IdleWorker { stream, pid, worker_dir }, + }, + Response::InvalidCandidate(err) => Outcome::InvalidCandidate { + err, + idle_worker: IdleWorker { stream, pid, worker_dir }, + }, + Response::TimedOut => Outcome::HardTimeout, + Response::Panic(err) => Outcome::Panic { err }, + Response::InternalError(err) => Outcome::InternalError { err }, + } + }) + .await +} + +/// Create a temporary file for an artifact in the worker cache, execute the given future/closure +/// passing the file path in, and clean up the worker cache. +/// +/// Failure to clean up the worker cache results in an error - leaving any files here could be a +/// security issue, and we should shut down the worker. This should be very rare. +async fn with_worker_dir_setup( + worker_dir: WorkerDir, + pid: u32, + artifact_path: &Path, + f: F, +) -> Outcome +where + Fut: futures::Future, + F: FnOnce(WorkerDir) -> Fut, +{ + // Cheaply create a hard link to the artifact. The artifact is always at a known location in the + // worker cache, and the child can't access any other artifacts or gain any information from the + // original filename. + let link_path = worker_dir::execute_artifact(&worker_dir.path); + if let Err(err) = tokio::fs::hard_link(artifact_path, link_path).await { gum::warn!( target: LOG_TARGET, worker_pid = %pid, - validation_code_hash = ?artifact.id.code_hash, - ?error, - "failed to send an execute request", + ?worker_dir, + "failed to clear worker cache after the job: {:?}", + err, ); - return Outcome::IoErr + return Outcome::InternalError { + err: InternalValidationError::CouldNotCreateLink(format!("{:?}", err)), + } } - // We use a generous timeout here. This is in addition to the one in the child process, in - // case the child stalls. We have a wall clock timeout here in the host, but a CPU timeout - // in the child. We want to use CPU time because it varies less than wall clock time under - // load, but the CPU resources of the child can only be measured from the parent after the - // child process terminates. - let timeout = execution_timeout * JOB_TIMEOUT_WALL_CLOCK_FACTOR; - let response = futures::select! { - response = recv_response(&mut stream).fuse() => { - match response { - Err(error) => { - gum::warn!( - target: LOG_TARGET, - worker_pid = %pid, - validation_code_hash = ?artifact.id.code_hash, - ?error, - "failed to recv an execute response", - ); - return Outcome::IoErr - }, - Ok(response) => { - if let Response::Ok{duration, ..} = response { - if duration > execution_timeout { - // The job didn't complete within the timeout. - gum::warn!( - target: LOG_TARGET, - worker_pid = %pid, - "execute job took {}ms cpu time, exceeded execution timeout {}ms.", - duration.as_millis(), - execution_timeout.as_millis(), - ); - - // Return a timeout error. - return Outcome::HardTimeout; - } - } + let worker_dir_path = worker_dir.path.clone(); + let outcome = f(worker_dir).await; - response - }, - } - }, - _ = Delay::new(timeout).fuse() => { - gum::warn!( - target: LOG_TARGET, - worker_pid = %pid, - validation_code_hash = ?artifact.id.code_hash, - "execution worker exceeded lenient timeout for execution, child worker likely stalled", - ); - Response::TimedOut - }, - }; - - match response { - Response::Ok { result_descriptor, duration } => - Outcome::Ok { result_descriptor, duration, idle_worker: IdleWorker { stream, pid } }, - Response::InvalidCandidate(err) => - Outcome::InvalidCandidate { err, idle_worker: IdleWorker { stream, pid } }, - Response::TimedOut => Outcome::HardTimeout, - Response::Panic(err) => Outcome::Panic { err }, - Response::InternalError(err) => Outcome::InternalError { err }, + // Try to clear the worker dir. + if let Err(err) = clear_worker_dir_path(&worker_dir_path) { + gum::warn!( + target: LOG_TARGET, + worker_pid = %pid, + ?worker_dir_path, + "failed to clear worker cache after the job: {:?}", + err, + ); + return Outcome::InternalError { + err: InternalValidationError::CouldNotClearWorkerDir { + err: format!("{:?}", err), + path: worker_dir_path.to_str().map(String::from), + }, + } } + + outcome } async fn send_handshake(stream: &mut UnixStream, handshake: Handshake) -> io::Result<()> { @@ -195,11 +266,9 @@ async fn send_handshake(stream: &mut UnixStream, handshake: Handshake) -> io::Re async fn send_request( stream: &mut UnixStream, - artifact_path: &Path, validation_params: &[u8], execution_timeout: Duration, ) -> io::Result<()> { - framed_send(stream, path_to_bytes(artifact_path)).await?; framed_send(stream, validation_params).await?; framed_send(stream, &execution_timeout.encode()).await } diff --git a/polkadot/node/core/pvf/src/host.rs b/polkadot/node/core/pvf/src/host.rs index 5290b2760f42..81695829122b 100644 --- a/polkadot/node/core/pvf/src/host.rs +++ b/polkadot/node/core/pvf/src/host.rs @@ -34,6 +34,7 @@ use futures::{ use polkadot_node_core_pvf_common::{ error::{PrepareError, PrepareResult}, pvf::PvfPrepData, + SecurityStatus, }; use polkadot_parachain_primitives::primitives::ValidationResult; use std::{ @@ -202,8 +203,13 @@ impl Config { pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future) { gum::debug!(target: LOG_TARGET, ?config, "starting PVF validation host"); - // Run checks for supported security features once per host startup. - warn_if_no_landlock(); + // Run checks for supported security features once per host startup. Warn here if not enabled. + let security_status = { + let can_enable_landlock = check_landlock(&config.prepare_worker_program_path); + let can_unshare_user_namespace_and_change_root = + check_can_unshare_user_namespace_and_change_root(&config.prepare_worker_program_path); + SecurityStatus { can_enable_landlock, can_unshare_user_namespace_and_change_root } + }; let (to_host_tx, to_host_rx) = mpsc::channel(10); @@ -215,6 +221,7 @@ pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future (ValidationHost, impl Future impl futures::Stream .map(|_| ()) } -/// Check if landlock is supported and emit a warning if not. -fn warn_if_no_landlock() { - #[cfg(target_os = "linux")] - { - use polkadot_node_core_pvf_common::worker::security::landlock; - let status = landlock::get_status(); - if !landlock::status_is_fully_enabled(&status) { - let abi = landlock::LANDLOCK_ABI as u8; +/// Check if we can sandbox the root and emit a warning if not. +/// +/// We do this check by spawning a new process and trying to sandbox it. To get as close as possible +/// to running the check in a worker, we try it... in a worker. The expected return status is 0 on +/// success and -1 on failure. +fn check_can_unshare_user_namespace_and_change_root( + #[cfg_attr(not(target_os = "linux"), allow(unused_variables))] + prepare_worker_program_path: &Path, +) -> bool { + cfg_if::cfg_if! { + if #[cfg(target_os = "linux")] { + let output = std::process::Command::new(prepare_worker_program_path) + .arg("--check-can-unshare-user-namespace-and-change-root") + .output(); + + match output { + Ok(output) if output.status.success() => true, + Ok(output) => { + let stderr = std::str::from_utf8(&output.stderr) + .expect("child process writes a UTF-8 string to stderr; qed") + .trim(); + gum::warn!( + target: LOG_TARGET, + ?prepare_worker_program_path, + // Docs say to always print status using `Display` implementation. + status = %output.status, + %stderr, + "Cannot unshare user namespace and change root, which are Linux-specific kernel security features. Running validation of malicious PVF code has a higher risk of compromising this machine. Consider running with support for unsharing user namespaces for maximum security." + ); + false + }, + Err(err) => { + gum::warn!( + target: LOG_TARGET, + ?prepare_worker_program_path, + "Could not start child process: {}", + err + ); + false + }, + } + } else { gum::warn!( target: LOG_TARGET, - ?status, - %abi, - "Cannot fully enable landlock, a Linux kernel security feature. Running validation of malicious PVF code has a higher risk of compromising this machine. Consider upgrading the kernel version for maximum security." + "Cannot unshare user namespace and change root, which are Linux-specific kernel security features. Running validation of malicious PVF code has a higher risk of compromising this machine. Consider running on Linux with support for unsharing user namespaces for maximum security." ); + false } } +} - #[cfg(not(target_os = "linux"))] - gum::warn!( - target: LOG_TARGET, - "Cannot enable landlock, a Linux kernel security feature. Running validation of malicious PVF code has a higher risk of compromising this machine. Consider running on Linux with landlock support for maximum security." - ); +/// Check if landlock is supported and emit a warning if not. +/// +/// We do this check by spawning a new process and trying to sandbox it. To get as close as possible +/// to running the check in a worker, we try it... in a worker. The expected return status is 0 on +/// success and -1 on failure. +fn check_landlock( + #[cfg_attr(not(target_os = "linux"), allow(unused_variables))] + prepare_worker_program_path: &Path, +) -> bool { + cfg_if::cfg_if! { + if #[cfg(target_os = "linux")] { + match std::process::Command::new(prepare_worker_program_path) + .arg("--check-can-enable-landlock") + .status() + { + Ok(status) if status.success() => true, + Ok(status) => { + let abi = + polkadot_node_core_pvf_common::worker::security::landlock::LANDLOCK_ABI as u8; + gum::warn!( + target: LOG_TARGET, + ?prepare_worker_program_path, + ?status, + %abi, + "Cannot fully enable landlock, a Linux-specific kernel security feature. Running validation of malicious PVF code has a higher risk of compromising this machine. Consider upgrading the kernel version for maximum security." + ); + false + }, + Err(err) => { + gum::warn!( + target: LOG_TARGET, + ?prepare_worker_program_path, + "Could not start child process: {}", + err + ); + false + }, + } + } else { + gum::warn!( + target: LOG_TARGET, + "Cannot enable landlock, a Linux-specific kernel security feature. Running validation of malicious PVF code has a higher risk of compromising this machine. Consider running on Linux with landlock support for maximum security." + ); + false + } + } } #[cfg(test)] diff --git a/polkadot/node/core/pvf/src/lib.rs b/polkadot/node/core/pvf/src/lib.rs index 0e4f2444adf7..1b8d83773813 100644 --- a/polkadot/node/core/pvf/src/lib.rs +++ b/polkadot/node/core/pvf/src/lib.rs @@ -111,6 +111,7 @@ pub use polkadot_node_core_pvf_common::{ error::{InternalValidationError, PrepareError}, prepare::{PrepareJobKind, PrepareStats}, pvf::PvfPrepData, + SecurityStatus, }; /// The log target for this crate. diff --git a/polkadot/node/core/pvf/src/prepare/pool.rs b/polkadot/node/core/pvf/src/prepare/pool.rs index 92aa4896c263..7933b0319a6f 100644 --- a/polkadot/node/core/pvf/src/prepare/pool.rs +++ b/polkadot/node/core/pvf/src/prepare/pool.rs @@ -27,6 +27,7 @@ use futures::{ use polkadot_node_core_pvf_common::{ error::{PrepareError, PrepareResult}, pvf::PvfPrepData, + SecurityStatus, }; use slotmap::HopSlotMap; use std::{ @@ -110,10 +111,12 @@ enum PoolEvent { type Mux = FuturesUnordered>; struct Pool { + // Some variables related to the current session. program_path: PathBuf, cache_path: PathBuf, spawn_timeout: Duration, node_version: Option, + security_status: SecurityStatus, to_pool: mpsc::Receiver, from_pool: mpsc::UnboundedSender, @@ -132,6 +135,7 @@ async fn run( cache_path, spawn_timeout, node_version, + security_status, to_pool, mut from_pool, mut spawned, @@ -160,6 +164,7 @@ async fn run( &cache_path, spawn_timeout, node_version.clone(), + security_status.clone(), &mut spawned, &mut mux, to_pool, @@ -207,6 +212,7 @@ fn handle_to_pool( cache_path: &Path, spawn_timeout: Duration, node_version: Option, + security_status: SecurityStatus, spawned: &mut HopSlotMap, mux: &mut Mux, to_pool: ToPool, @@ -216,7 +222,14 @@ fn handle_to_pool( gum::debug!(target: LOG_TARGET, "spawning a new prepare worker"); metrics.prepare_worker().on_begin_spawn(); mux.push( - spawn_worker_task(program_path.to_owned(), spawn_timeout, node_version).boxed(), + spawn_worker_task( + program_path.to_owned(), + cache_path.to_owned(), + spawn_timeout, + node_version, + security_status, + ) + .boxed(), ); }, ToPool::StartWork { worker, pvf, artifact_path } => { @@ -229,7 +242,6 @@ fn handle_to_pool( worker, idle, pvf, - cache_path.to_owned(), artifact_path, preparation_timer, ) @@ -258,13 +270,23 @@ fn handle_to_pool( async fn spawn_worker_task( program_path: PathBuf, + cache_path: PathBuf, spawn_timeout: Duration, node_version: Option, + security_status: SecurityStatus, ) -> PoolEvent { use futures_timer::Delay; loop { - match worker_intf::spawn(&program_path, spawn_timeout, node_version.as_deref()).await { + match worker_intf::spawn( + &program_path, + &cache_path, + spawn_timeout, + node_version.as_deref(), + security_status.clone(), + ) + .await + { Ok((idle, handle)) => break PoolEvent::Spawn(idle, handle), Err(err) => { gum::warn!(target: LOG_TARGET, "failed to spawn a prepare worker: {:?}", err); @@ -281,11 +303,10 @@ async fn start_work_task( worker: Worker, idle: IdleWorker, pvf: PvfPrepData, - cache_path: PathBuf, artifact_path: PathBuf, _preparation_timer: Option, ) -> PoolEvent { - let outcome = worker_intf::start_work(&metrics, idle, pvf, &cache_path, artifact_path).await; + let outcome = worker_intf::start_work(&metrics, idle, pvf, artifact_path).await; PoolEvent::StartWork(worker, outcome) } @@ -322,14 +343,29 @@ fn handle_mux( ), // Return `Concluded`, but do not kill the worker since the error was on the host // side. - Outcome::RenameTmpFileErr { worker: idle, result: _, err } => + Outcome::RenameTmpFileErr { worker: idle, result: _, err, src, dest } => handle_concluded_no_rip( from_pool, spawned, worker, idle, - Err(PrepareError::RenameTmpFileErr(err)), + Err(PrepareError::RenameTmpFileErr { err, src, dest }), ), + // Could not clear worker cache. Kill the worker so other jobs can't see the data. + Outcome::ClearWorkerDir { err } => { + if attempt_retire(metrics, spawned, worker) { + reply( + from_pool, + FromPool::Concluded { + worker, + rip: true, + result: Err(PrepareError::ClearWorkerDir(err)), + }, + )?; + } + + Ok(()) + }, Outcome::Unreachable => { if attempt_retire(metrics, spawned, worker) { reply(from_pool, FromPool::Rip(worker))?; @@ -434,6 +470,7 @@ pub fn start( cache_path: PathBuf, spawn_timeout: Duration, node_version: Option, + security_status: SecurityStatus, ) -> (mpsc::Sender, mpsc::UnboundedReceiver, impl Future) { let (to_pool_tx, to_pool_rx) = mpsc::channel(10); let (from_pool_tx, from_pool_rx) = mpsc::unbounded(); @@ -444,6 +481,7 @@ pub fn start( cache_path, spawn_timeout, node_version, + security_status, to_pool: to_pool_rx, from_pool: from_pool_tx, spawned: HopSlotMap::with_capacity_and_key(20), diff --git a/polkadot/node/core/pvf/src/prepare/worker_intf.rs b/polkadot/node/core/pvf/src/prepare/worker_intf.rs index 5280ab6b42a2..b66c36044343 100644 --- a/polkadot/node/core/pvf/src/prepare/worker_intf.rs +++ b/polkadot/node/core/pvf/src/prepare/worker_intf.rs @@ -19,17 +19,17 @@ use crate::{ metrics::Metrics, worker_intf::{ - path_to_bytes, spawn_with_program_path, tmpfile_in, IdleWorker, SpawnErr, WorkerHandle, - JOB_TIMEOUT_WALL_CLOCK_FACTOR, + clear_worker_dir_path, framed_recv, framed_send, spawn_with_program_path, IdleWorker, + SpawnErr, WorkerDir, WorkerHandle, JOB_TIMEOUT_WALL_CLOCK_FACTOR, }, LOG_TARGET, }; use parity_scale_codec::{Decode, Encode}; use polkadot_node_core_pvf_common::{ error::{PrepareError, PrepareResult}, - framed_recv, framed_send, prepare::PrepareStats, pvf::PvfPrepData, + worker_dir, SecurityStatus, }; use sp_core::hexdisplay::HexDisplay; @@ -41,19 +41,33 @@ use tokio::{io, net::UnixStream}; /// Spawns a new worker with the given program path that acts as the worker and the spawn timeout. /// -/// The program should be able to handle ` prepare-worker ` invocation. +/// Sends a handshake message to the worker as soon as it is spawned. pub async fn spawn( program_path: &Path, + cache_path: &Path, spawn_timeout: Duration, node_version: Option<&str>, + security_status: SecurityStatus, ) -> Result<(IdleWorker, WorkerHandle), SpawnErr> { let mut extra_args = vec!["prepare-worker"]; if let Some(node_version) = node_version { extra_args.extend_from_slice(&["--node-impl-version", node_version]); } - spawn_with_program_path("prepare", program_path, &extra_args, spawn_timeout).await + + spawn_with_program_path( + "prepare", + program_path, + cache_path, + &extra_args, + spawn_timeout, + security_status, + ) + .await } +/// Outcome of PVF preparation. +/// +/// If the idle worker token is not returned, it means the worker must be terminated. pub enum Outcome { /// The worker has finished the work assigned to it. Concluded { worker: IdleWorker, result: PrepareResult }, @@ -62,9 +76,19 @@ pub enum Outcome { Unreachable, /// The temporary file for the artifact could not be created at the given cache path. CreateTmpFileErr { worker: IdleWorker, err: String }, - /// The response from the worker is received, but the file cannot be renamed (moved) to the + /// The response from the worker is received, but the tmp file cannot be renamed (moved) to the /// final destination location. - RenameTmpFileErr { worker: IdleWorker, result: PrepareResult, err: String }, + RenameTmpFileErr { + worker: IdleWorker, + result: PrepareResult, + err: String, + // Unfortunately `PathBuf` doesn't implement `Encode`/`Decode`, so we do a fallible + // conversion to `Option`. + src: Option, + dest: Option, + }, + /// The worker cache could not be cleared for the given reason. + ClearWorkerDir { err: String }, /// The worker failed to finish the job until the given deadline. /// /// The worker is no longer usable and should be killed. @@ -84,83 +108,88 @@ pub async fn start_work( metrics: &Metrics, worker: IdleWorker, pvf: PvfPrepData, - cache_path: &Path, artifact_path: PathBuf, ) -> Outcome { - let IdleWorker { stream, pid } = worker; + let IdleWorker { stream, pid, worker_dir } = worker; gum::debug!( target: LOG_TARGET, worker_pid = %pid, + ?worker_dir, "starting prepare for {}", artifact_path.display(), ); - with_tmp_file(stream, pid, cache_path, |tmp_file, mut stream| async move { - let preparation_timeout = pvf.prep_timeout(); - if let Err(err) = send_request(&mut stream, pvf, &tmp_file).await { - gum::warn!( - target: LOG_TARGET, - worker_pid = %pid, - "failed to send a prepare request: {:?}", - err, - ); - return Outcome::Unreachable - } - - // Wait for the result from the worker, keeping in mind that there may be a timeout, the - // worker may get killed, or something along these lines. In that case we should propagate - // the error to the pool. - // - // We use a generous timeout here. This is in addition to the one in the child process, in - // case the child stalls. We have a wall clock timeout here in the host, but a CPU timeout - // in the child. We want to use CPU time because it varies less than wall clock time under - // load, but the CPU resources of the child can only be measured from the parent after the - // child process terminates. - let timeout = preparation_timeout * JOB_TIMEOUT_WALL_CLOCK_FACTOR; - let result = tokio::time::timeout(timeout, recv_response(&mut stream, pid)).await; - - match result { - // Received bytes from worker within the time limit. - Ok(Ok(prepare_result)) => - handle_response( - metrics, - IdleWorker { stream, pid }, - prepare_result, - pid, - tmp_file, - artifact_path, - preparation_timeout, - ) - .await, - Ok(Err(err)) => { - // Communication error within the time limit. + with_worker_dir_setup( + worker_dir, + stream, + pid, + |tmp_artifact_file, mut stream, worker_dir| async move { + let preparation_timeout = pvf.prep_timeout(); + if let Err(err) = send_request(&mut stream, pvf).await { gum::warn!( target: LOG_TARGET, worker_pid = %pid, - "failed to recv a prepare response: {:?}", + "failed to send a prepare request: {:?}", err, ); - Outcome::IoErr(err.to_string()) - }, - Err(_) => { - // Timed out here on the host. - gum::warn!( - target: LOG_TARGET, - worker_pid = %pid, - "did not recv a prepare response within the time limit", - ); - Outcome::TimedOut - }, - } - }) + return Outcome::Unreachable + } + + // Wait for the result from the worker, keeping in mind that there may be a timeout, the + // worker may get killed, or something along these lines. In that case we should + // propagate the error to the pool. + // + // We use a generous timeout here. This is in addition to the one in the child process, + // in case the child stalls. We have a wall clock timeout here in the host, but a CPU + // timeout in the child. We want to use CPU time because it varies less than wall clock + // time under load, but the CPU resources of the child can only be measured from the + // parent after the child process terminates. + let timeout = preparation_timeout * JOB_TIMEOUT_WALL_CLOCK_FACTOR; + let result = tokio::time::timeout(timeout, recv_response(&mut stream, pid)).await; + + match result { + // Received bytes from worker within the time limit. + Ok(Ok(prepare_result)) => + handle_response( + metrics, + IdleWorker { stream, pid, worker_dir }, + prepare_result, + pid, + tmp_artifact_file, + artifact_path, + preparation_timeout, + ) + .await, + Ok(Err(err)) => { + // Communication error within the time limit. + gum::warn!( + target: LOG_TARGET, + worker_pid = %pid, + "failed to recv a prepare response: {:?}", + err, + ); + Outcome::IoErr(err.to_string()) + }, + Err(_) => { + // Timed out here on the host. + gum::warn!( + target: LOG_TARGET, + worker_pid = %pid, + "did not recv a prepare response within the time limit", + ); + Outcome::TimedOut + }, + } + }, + ) .await } /// Handles the case where we successfully received response bytes on the host from the child. /// -/// NOTE: Here we know the artifact exists, but is still located in a temporary file which will be -/// cleared by `with_tmp_file`. +/// Here we know the artifact exists, but is still located in a temporary file which will be cleared +/// by [`with_worker_dir_setup`]. async fn handle_response( metrics: &Metrics, worker: IdleWorker, @@ -209,7 +238,13 @@ async fn handle_response( artifact_path.display(), err, ); - Outcome::RenameTmpFileErr { worker, result, err: format!("{:?}", err) } + Outcome::RenameTmpFileErr { + worker, + result, + err: format!("{:?}", err), + src: tmp_file.to_str().map(String::from), + dest: artifact_path.to_str().map(String::from), + } }, }; @@ -220,61 +255,58 @@ async fn handle_response( outcome } -/// Create a temporary file for an artifact at the given cache path and execute the given -/// future/closure passing the file path in. +/// Create a temporary file for an artifact in the worker cache, execute the given future/closure +/// passing the file path in, and clean up the worker cache. /// -/// The function will try best effort to not leave behind the temporary file. -async fn with_tmp_file(stream: UnixStream, pid: u32, cache_path: &Path, f: F) -> Outcome +/// Failure to clean up the worker cache results in an error - leaving any files here could be a +/// security issue, and we should shut down the worker. This should be very rare. +async fn with_worker_dir_setup( + worker_dir: WorkerDir, + stream: UnixStream, + pid: u32, + f: F, +) -> Outcome where Fut: futures::Future, - F: FnOnce(PathBuf, UnixStream) -> Fut, + F: FnOnce(PathBuf, UnixStream, WorkerDir) -> Fut, { - let tmp_file = match tmpfile_in("prepare-artifact-", cache_path).await { - Ok(f) => f, - Err(err) => { - gum::warn!( - target: LOG_TARGET, - worker_pid = %pid, - "failed to create a temp file for the artifact: {:?}", - err, - ); - return Outcome::CreateTmpFileErr { - worker: IdleWorker { stream, pid }, - err: format!("{:?}", err), - } - }, + // Create the tmp file here so that the child doesn't need any file creation rights. This will + // be cleared at the end of this function. + let tmp_file = worker_dir::prepare_tmp_artifact(&worker_dir.path); + if let Err(err) = tokio::fs::File::create(&tmp_file).await { + gum::warn!( + target: LOG_TARGET, + worker_pid = %pid, + ?worker_dir, + "failed to create a temp file for the artifact: {:?}", + err, + ); + return Outcome::CreateTmpFileErr { + worker: IdleWorker { stream, pid, worker_dir }, + err: format!("{:?}", err), + } }; - let outcome = f(tmp_file.clone(), stream).await; + let worker_dir_path = worker_dir.path.clone(); + let outcome = f(tmp_file, stream, worker_dir).await; - // The function called above is expected to move `tmp_file` to a new location upon success. - // However, the function may as well fail and in that case we should remove the tmp file here. - // - // In any case, we try to remove the file here so that there are no leftovers. We only report - // errors that are different from the `NotFound`. - match tokio::fs::remove_file(tmp_file).await { - Ok(()) => (), - Err(err) if err.kind() == std::io::ErrorKind::NotFound => (), - Err(err) => { - gum::warn!( - target: LOG_TARGET, - worker_pid = %pid, - "failed to remove the tmp file: {:?}", - err, - ); - }, + // Try to clear the worker dir. + if let Err(err) = clear_worker_dir_path(&worker_dir_path) { + gum::warn!( + target: LOG_TARGET, + worker_pid = %pid, + ?worker_dir_path, + "failed to clear worker cache after the job: {:?}", + err, + ); + return Outcome::ClearWorkerDir { err: format!("{:?}", err) } } outcome } -async fn send_request( - stream: &mut UnixStream, - pvf: PvfPrepData, - tmp_file: &Path, -) -> io::Result<()> { +async fn send_request(stream: &mut UnixStream, pvf: PvfPrepData) -> io::Result<()> { framed_send(stream, &pvf.encode()).await?; - framed_send(stream, path_to_bytes(tmp_file)).await?; Ok(()) } diff --git a/polkadot/node/core/pvf/src/worker_intf.rs b/polkadot/node/core/pvf/src/worker_intf.rs index 795ad4524443..9825506ba88f 100644 --- a/polkadot/node/core/pvf/src/worker_intf.rs +++ b/polkadot/node/core/pvf/src/worker_intf.rs @@ -20,6 +20,7 @@ use crate::LOG_TARGET; use futures::FutureExt as _; use futures_timer::Delay; use pin_project::pin_project; +use polkadot_node_core_pvf_common::{worker_dir, SecurityStatus}; use rand::Rng; use std::{ fmt, mem, @@ -39,99 +40,106 @@ use tokio::{ pub const JOB_TIMEOUT_WALL_CLOCK_FACTOR: u32 = 4; /// This is publicly exposed only for integration tests. +/// +/// # Parameters +/// +/// - `debug_id`: An identifier for the process (e.g. "execute" or "prepare"). +/// +/// - `program_path`: The path to the program. +/// +/// - `cache_path`: The path to the artifact cache. +/// +/// - `extra_args`: Optional extra CLI arguments to the program. NOTE: Should only contain data +/// required before the handshake, like node/worker versions for the version check. Other data +/// should go through the handshake. +/// +/// - `spawn_timeout`: The amount of time to wait for the child process to spawn. +/// +/// - `security_status`: contains the detected status of security features. #[doc(hidden)] pub async fn spawn_with_program_path( debug_id: &'static str, program_path: impl Into, + cache_path: &Path, extra_args: &[&str], spawn_timeout: Duration, + security_status: SecurityStatus, ) -> Result<(IdleWorker, WorkerHandle), SpawnErr> { let program_path = program_path.into(); - with_transient_socket_path(debug_id, |socket_path| { - let socket_path = socket_path.to_owned(); - let extra_args: Vec = extra_args.iter().map(|arg| arg.to_string()).collect(); - - async move { - let listener = UnixListener::bind(&socket_path).map_err(|err| { + let worker_dir = WorkerDir::new(debug_id, cache_path).await?; + let socket_path = worker_dir::socket(&worker_dir.path); + + let extra_args: Vec = extra_args.iter().map(|arg| arg.to_string()).collect(); + + let listener = UnixListener::bind(&socket_path).map_err(|err| { + gum::warn!( + target: LOG_TARGET, + %debug_id, + ?program_path, + ?extra_args, + ?worker_dir, + ?socket_path, + "cannot bind unix socket: {:?}", + err, + ); + SpawnErr::Bind + })?; + + let handle = WorkerHandle::spawn(&program_path, &extra_args, &worker_dir.path, security_status) + .map_err(|err| { + gum::warn!( + target: LOG_TARGET, + %debug_id, + ?program_path, + ?extra_args, + ?worker_dir.path, + ?socket_path, + "cannot spawn a worker: {:?}", + err, + ); + SpawnErr::ProcessSpawn + })?; + + let worker_dir_path = worker_dir.path.clone(); + futures::select! { + accept_result = listener.accept().fuse() => { + let (stream, _) = accept_result.map_err(|err| { gum::warn!( target: LOG_TARGET, %debug_id, ?program_path, ?extra_args, - "cannot bind unix socket: {:?}", + ?worker_dir_path, + ?socket_path, + "cannot accept a worker: {:?}", err, ); - SpawnErr::Bind + SpawnErr::Accept })?; - - let handle = - WorkerHandle::spawn(&program_path, &extra_args, socket_path).map_err(|err| { - gum::warn!( - target: LOG_TARGET, - %debug_id, - ?program_path, - ?extra_args, - "cannot spawn a worker: {:?}", - err, - ); - SpawnErr::ProcessSpawn - })?; - - futures::select! { - accept_result = listener.accept().fuse() => { - let (stream, _) = accept_result.map_err(|err| { - gum::warn!( - target: LOG_TARGET, - %debug_id, - ?program_path, - ?extra_args, - "cannot accept a worker: {:?}", - err, - ); - SpawnErr::Accept - })?; - Ok((IdleWorker { stream, pid: handle.id() }, handle)) - } - _ = Delay::new(spawn_timeout).fuse() => { - gum::warn!( - target: LOG_TARGET, - %debug_id, - ?program_path, - ?extra_args, - ?spawn_timeout, - "spawning and connecting to socket timed out", - ); - Err(SpawnErr::AcceptTimeout) - } - } + Ok((IdleWorker { stream, pid: handle.id(), worker_dir }, handle)) } - }) - .await -} - -async fn with_transient_socket_path(debug_id: &'static str, f: F) -> Result -where - F: FnOnce(&Path) -> Fut, - Fut: futures::Future> + 'static, -{ - let socket_path = tmpfile(&format!("pvf-host-{}", debug_id)) - .await - .map_err(|_| SpawnErr::TmpFile)?; - let result = f(&socket_path).await; - - // Best effort to remove the socket file. Under normal circumstances the socket will be removed - // by the worker. We make sure that it is removed here, just in case a failed rendezvous. - let _ = tokio::fs::remove_file(socket_path).await; - - result + _ = Delay::new(spawn_timeout).fuse() => { + gum::warn!( + target: LOG_TARGET, + %debug_id, + ?program_path, + ?extra_args, + ?worker_dir_path, + ?socket_path, + ?spawn_timeout, + "spawning and connecting to socket timed out", + ); + Err(SpawnErr::AcceptTimeout) + } + } } -/// Returns a path under the given `dir`. The file name will start with the given prefix. +/// Returns a path under the given `dir`. The path name will start with the given prefix. /// /// There is only a certain number of retries. If exceeded this function will give up and return an /// error. -pub async fn tmpfile_in(prefix: &str, dir: &Path) -> io::Result { - fn tmppath(prefix: &str, dir: &Path) -> PathBuf { +pub async fn tmppath_in(prefix: &str, dir: &Path) -> io::Result { + fn make_tmppath(prefix: &str, dir: &Path) -> PathBuf { use rand::distributions::Alphanumeric; const DESCRIMINATOR_LEN: usize = 10; @@ -143,27 +151,28 @@ pub async fn tmpfile_in(prefix: &str, dir: &Path) -> io::Result { let s = std::str::from_utf8(&buf) .expect("the string is collected from a valid utf-8 sequence; qed"); - let mut file = dir.to_owned(); - file.push(s); - file + let mut path = dir.to_owned(); + path.push(s); + path } const NUM_RETRIES: usize = 50; for _ in 0..NUM_RETRIES { - let candidate_path = tmppath(prefix, dir); - if !candidate_path.exists() { - return Ok(candidate_path) + let tmp_path = make_tmppath(prefix, dir); + if !tmp_path.exists() { + return Ok(tmp_path) } } - Err(io::Error::new(io::ErrorKind::Other, "failed to create a temporary file")) + Err(io::Error::new(io::ErrorKind::Other, "failed to create a temporary path")) } -/// The same as [`tmpfile_in`], but uses [`std::env::temp_dir`] as the directory. -pub async fn tmpfile(prefix: &str) -> io::Result { +/// The same as [`tmppath_in`], but uses [`std::env::temp_dir`] as the directory. +#[cfg(test)] +pub async fn tmppath(prefix: &str) -> io::Result { let temp_dir = PathBuf::from(std::env::temp_dir()); - tmpfile_in(prefix, &temp_dir).await + tmppath_in(prefix, &temp_dir).await } /// A struct that represents an idle worker. @@ -177,13 +186,19 @@ pub struct IdleWorker { /// The identifier of this process. Used to reset the niceness. pub pid: u32, + + /// The temporary per-worker path. We clean up the worker dir between jobs and delete it when + /// the worker dies. + pub worker_dir: WorkerDir, } /// An error happened during spawning a worker process. #[derive(Clone, Debug)] pub enum SpawnErr { - /// Cannot obtain a temporary file location. - TmpFile, + /// Cannot obtain a temporary path location. + TmpPath, + /// An FS error occurred. + Fs(String), /// Cannot bind the socket to the given path. Bind, /// An error happened during accepting a connection to the socket. @@ -219,12 +234,32 @@ impl WorkerHandle { fn spawn( program: impl AsRef, extra_args: &[String], - socket_path: impl AsRef, + worker_dir_path: impl AsRef, + security_status: SecurityStatus, ) -> io::Result { - let mut child = process::Command::new(program.as_ref()) + let security_args = { + let mut args = vec![]; + if security_status.can_enable_landlock { + args.push("--can-enable-landlock".to_string()); + } + if security_status.can_unshare_user_namespace_and_change_root { + args.push("--can-unshare-user-namespace-and-change-root".to_string()); + } + args + }; + + // Clear all env vars from the spawned process. + let mut command = process::Command::new(program.as_ref()); + command.env_clear(); + // Add back any env vars we want to keep. + if let Ok(value) = std::env::var("RUST_LOG") { + command.env("RUST_LOG", value); + } + let mut child = command .args(extra_args) - .arg("--socket-path") - .arg(socket_path.as_ref().as_os_str()) + .arg("--worker-dir-path") + .arg(worker_dir_path.as_ref().as_os_str()) + .args(&security_args) .stdout(std::process::Stdio::piped()) .kill_on_drop(true) .spawn()?; @@ -306,16 +341,6 @@ impl fmt::Debug for WorkerHandle { } } -/// Convert the given path into a byte buffer. -pub fn path_to_bytes(path: &Path) -> &[u8] { - // Ideally, we take the `OsStr` of the path, send that and reconstruct this on the other side. - // However, libstd doesn't provide us with such an option. There are crates out there that - // allow for extraction of a path, but TBH it doesn't seem to be a real issue. - // - // However, should be there reports we can incorporate such a crate here. - path.to_str().expect("non-UTF-8 path").as_bytes() -} - /// Write some data prefixed by its length into `w`. pub async fn framed_send(w: &mut (impl AsyncWrite + Unpin), buf: &[u8]) -> io::Result<()> { let len_buf = buf.len().to_le_bytes(); @@ -333,3 +358,84 @@ pub async fn framed_recv(r: &mut (impl AsyncRead + Unpin)) -> io::Result r.read_exact(&mut buf).await?; Ok(buf) } + +/// A temporary worker dir that contains only files needed by the worker. The worker will change its +/// root (the `/` directory) to this directory; it should have access to no other paths on its +/// filesystem. +/// +/// NOTE: This struct cleans up its associated directory when it is dropped. Therefore it should not +/// implement `Clone`. +/// +/// # File structure +/// +/// The overall file structure for the PVF system is as follows. The `worker-dir-X`s are managed by +/// this struct. +/// +/// ```nocompile +/// + // +/// - artifact-1 +/// - artifact-2 +/// - [...] +/// - worker-dir-1/ (new `/` for worker-1) +/// + socket (created by host) +/// + tmp-artifact (created by host) (prepare-only) +/// + artifact (link -> artifact-1) (created by host) (execute-only) +/// - worker-dir-2/ (new `/` for worker-2) +/// + [...] +/// ``` +#[derive(Debug)] +pub struct WorkerDir { + pub path: PathBuf, +} + +impl WorkerDir { + /// Creates a new, empty worker dir with a random name in the given cache dir. + pub async fn new(debug_id: &'static str, cache_dir: &Path) -> Result { + let prefix = format!("worker-dir-{}-", debug_id); + let path = tmppath_in(&prefix, cache_dir).await.map_err(|_| SpawnErr::TmpPath)?; + tokio::fs::create_dir(&path) + .await + .map_err(|err| SpawnErr::Fs(err.to_string()))?; + Ok(Self { path }) + } +} + +// Try to clean up the temporary worker dir at the end of the worker's lifetime. It should be wiped +// on startup, but we make a best effort not to leave it around. +impl Drop for WorkerDir { + fn drop(&mut self) { + let _ = std::fs::remove_dir_all(&self.path); + } +} + +// Not async since Rust has trouble with async recursion. There should be few files here anyway. +// +// TODO: A lingering malicious job can still access future files in this dir. See +// for how to fully secure this. +/// Clear the temporary worker dir without deleting it. Not deleting is important because the worker +/// has mounted its own separate filesystem here. +/// +/// Should be called right after a job has finished. We don't want jobs to have access to +/// artifacts from previous jobs. +pub fn clear_worker_dir_path(worker_dir_path: &Path) -> io::Result<()> { + fn remove_dir_contents(path: &Path) -> io::Result<()> { + for entry in std::fs::read_dir(&path)? { + let entry = entry?; + let path = entry.path(); + + if entry.file_type()?.is_dir() { + remove_dir_contents(&path)?; + std::fs::remove_dir(path)?; + } else { + std::fs::remove_file(path)?; + } + } + Ok(()) + } + + // Note the worker dir may not exist anymore because of the worker dying and being cleaned up. + match remove_dir_contents(worker_dir_path) { + Err(err) if matches!(err.kind(), io::ErrorKind::NotFound) => Ok(()), + result => result, + } +} diff --git a/polkadot/node/core/pvf/tests/it/adder.rs b/polkadot/node/core/pvf/tests/it/adder.rs index bad7a66054c9..8bdd09db208a 100644 --- a/polkadot/node/core/pvf/tests/it/adder.rs +++ b/polkadot/node/core/pvf/tests/it/adder.rs @@ -100,7 +100,7 @@ async fn execute_bad_block_on_parent() { let host = TestHost::new(); - let _ret = host + let _err = host .validate_candidate( adder::wasm_binary_unwrap(), ValidationParams { @@ -145,3 +145,37 @@ async fn stress_spawn() { futures::future::join_all((0..100).map(|_| execute(host.clone()))).await; } + +// With one worker, run multiple execution jobs serially. They should not conflict. +#[tokio::test] +async fn execute_can_run_serially() { + let host = std::sync::Arc::new(TestHost::new_with_config(|cfg| { + cfg.execute_workers_max_num = 1; + })); + + async fn execute(host: std::sync::Arc) { + let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) }; + let block_data = BlockData { state: 0, add: 512 }; + let ret = host + .validate_candidate( + adder::wasm_binary_unwrap(), + ValidationParams { + parent_head: GenericHeadData(parent_head.encode()), + block_data: GenericBlockData(block_data.encode()), + relay_parent_number: 1, + relay_parent_storage_root: Default::default(), + }, + Default::default(), + ) + .await + .unwrap(); + + let new_head = HeadData::decode(&mut &ret.head_data.0[..]).unwrap(); + + assert_eq!(new_head.number, 1); + assert_eq!(new_head.parent_hash, parent_head.hash()); + assert_eq!(new_head.post_state, hash_state(512)); + } + + futures::future::join_all((0..5).map(|_| execute(host.clone()))).await; +} diff --git a/polkadot/node/core/pvf/tests/it/main.rs b/polkadot/node/core/pvf/tests/it/main.rs index dc8f00098ec5..f699b5840d8f 100644 --- a/polkadot/node/core/pvf/tests/it/main.rs +++ b/polkadot/node/core/pvf/tests/it/main.rs @@ -18,8 +18,8 @@ use assert_matches::assert_matches; use parity_scale_codec::Encode as _; use polkadot_node_core_pvf::{ - start, Config, InvalidCandidate, Metrics, PrepareJobKind, PvfPrepData, ValidationError, - ValidationHost, JOB_TIMEOUT_WALL_CLOCK_FACTOR, + start, Config, InvalidCandidate, Metrics, PrepareError, PrepareJobKind, PrepareStats, + PvfPrepData, ValidationError, ValidationHost, JOB_TIMEOUT_WALL_CLOCK_FACTOR, }; use polkadot_parachain_primitives::primitives::{BlockData, ValidationParams, ValidationResult}; use polkadot_primitives::ExecutorParams; @@ -70,6 +70,33 @@ impl TestHost { Self { cache_dir, host: Mutex::new(host) } } + async fn precheck_pvf( + &self, + code: &[u8], + executor_params: ExecutorParams, + ) -> Result { + let (result_tx, result_rx) = futures::channel::oneshot::channel(); + + let code = sp_maybe_compressed_blob::decompress(code, 16 * 1024 * 1024) + .expect("Compression works"); + + self.host + .lock() + .await + .precheck_pvf( + PvfPrepData::from_code( + code.into(), + executor_params, + TEST_PREPARATION_TIMEOUT, + PrepareJobKind::Prechecking, + ), + result_tx, + ) + .await + .unwrap(); + result_rx.await.unwrap() + } + async fn validate_candidate( &self, code: &[u8], @@ -291,8 +318,12 @@ async fn deleting_prepared_artifact_does_not_dispute() { { // Get the artifact path (asserting it exists). let mut cache_dir: Vec<_> = std::fs::read_dir(cache_dir).unwrap().collect(); - assert_eq!(cache_dir.len(), 1); - let artifact_path = cache_dir.pop().unwrap().unwrap(); + // Should contain the artifact and the worker dir. + assert_eq!(cache_dir.len(), 2); + let mut artifact_path = cache_dir.pop().unwrap().unwrap(); + if artifact_path.path().is_dir() { + artifact_path = cache_dir.pop().unwrap().unwrap(); + } // Delete the artifact. std::fs::remove_file(artifact_path.path()).unwrap(); @@ -317,3 +348,19 @@ async fn deleting_prepared_artifact_does_not_dispute() { r => panic!("{:?}", r), } } + +// With one worker, run multiple preparation jobs serially. They should not conflict. +#[tokio::test] +async fn prepare_can_run_serially() { + let host = TestHost::new_with_config(|cfg| { + cfg.prepare_workers_hard_max_num = 1; + }); + + let _stats = host + .precheck_pvf(::adder::wasm_binary_unwrap(), Default::default()) + .await + .unwrap(); + + // Prepare a different wasm blob to prevent skipping work. + let _stats = host.precheck_pvf(halt::wasm_binary_unwrap(), Default::default()).await.unwrap(); +} diff --git a/polkadot/node/core/pvf/tests/it/worker_common.rs b/polkadot/node/core/pvf/tests/it/worker_common.rs index 875ae79af097..5379d29556c7 100644 --- a/polkadot/node/core/pvf/tests/it/worker_common.rs +++ b/polkadot/node/core/pvf/tests/it/worker_common.rs @@ -14,8 +14,11 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use polkadot_node_core_pvf::testing::{spawn_with_program_path, SpawnErr}; -use std::time::Duration; +use polkadot_node_core_pvf::{ + testing::{spawn_with_program_path, SpawnErr}, + SecurityStatus, +}; +use std::{env, time::Duration}; fn worker_path(name: &str) -> std::path::PathBuf { let mut worker_path = std::env::current_exe().unwrap(); @@ -33,8 +36,10 @@ async fn spawn_immediate_exit() { let result = spawn_with_program_path( "integration-test", worker_path("polkadot-prepare-worker"), + &env::temp_dir(), &["exit"], Duration::from_secs(2), + SecurityStatus::default(), ) .await; assert!(matches!(result, Err(SpawnErr::AcceptTimeout))); @@ -45,8 +50,10 @@ async fn spawn_timeout() { let result = spawn_with_program_path( "integration-test", worker_path("polkadot-execute-worker"), + &env::temp_dir(), &["test-sleep"], Duration::from_secs(2), + SecurityStatus::default(), ) .await; assert!(matches!(result, Err(SpawnErr::AcceptTimeout))); @@ -57,8 +64,10 @@ async fn should_connect() { let _ = spawn_with_program_path( "integration-test", worker_path("polkadot-prepare-worker"), + &env::temp_dir(), &["prepare-worker"], Duration::from_secs(2), + SecurityStatus::default(), ) .await .unwrap(); diff --git a/polkadot/roadmap/implementers-guide/src/node/utility/pvf-host-and-workers.md b/polkadot/roadmap/implementers-guide/src/node/utility/pvf-host-and-workers.md index bcf01b61f217..6a14a3a013d4 100644 --- a/polkadot/roadmap/implementers-guide/src/node/utility/pvf-host-and-workers.md +++ b/polkadot/roadmap/implementers-guide/src/node/utility/pvf-host-and-workers.md @@ -121,10 +121,10 @@ So what are we actually worried about? Things that come to mind: ### Restricting file-system access -A basic security mechanism is to make sure that any thread directly interfacing -with untrusted code does not have access to the file-system. This provides some -protection against attackers accessing sensitive data or modifying data on the -host machine. +A basic security mechanism is to make sure that any process directly interfacing +with untrusted code does not have unnecessary access to the file-system. This +provides some protection against attackers accessing sensitive data or modifying +data on the host machine. ### Clearing env vars