From 51f52830db9b63d5c3ba83dba56dd7c0d1244892 Mon Sep 17 00:00:00 2001 From: Jonathan Johnson Date: Sat, 10 Aug 2024 09:47:41 -0700 Subject: [PATCH] Reactor panic safety --- muse-reactor/src/lib.rs | 27 +++++++++++++++++++++------ muse-reactor/src/tests.rs | 26 +++++++++++++++++++++++++- 2 files changed, 46 insertions(+), 7 deletions(-) diff --git a/muse-reactor/src/lib.rs b/muse-reactor/src/lib.rs index f9be1ae..cbb667c 100644 --- a/muse-reactor/src/lib.rs +++ b/muse-reactor/src/lib.rs @@ -72,6 +72,7 @@ use std::fmt::Debug; use std::future::Future; use std::marker::PhantomData; use std::num::NonZeroUsize; +use std::panic::{self, AssertUnwindSafe}; use std::pin::Pin; use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; use std::sync::Arc; @@ -516,30 +517,30 @@ where let pinned_future = Pin::new(&mut future); let mut context = Context::from_waker(&task.waker); - match pinned_future.poll(&mut context) { - Poll::Ready(Ok(result)) => { + match panic::catch_unwind(AssertUnwindSafe(|| pinned_future.poll(&mut context))) { + Ok(Poll::Ready(Ok(result))) => { drop(future); let result = root_result(Ok(result), &mut vm_context); drop(vm_context); tasks.complete_running_task(result, &mut self.budgets); } - Poll::Ready(Err(ExecutionError::Exception(err))) => { + Ok(Poll::Ready(Err(ExecutionError::Exception(err)))) => { drop(future); let result = root_result(Err(err), &mut vm_context); drop(vm_context); tasks.complete_running_task(result, &mut self.budgets); } - Poll::Ready(Err(ExecutionError::Waiting)) | Poll::Pending => { + Ok(Poll::Ready(Err(ExecutionError::Waiting)) | Poll::Pending) => { task.executing = false; tasks.executing.pop_front(); } - Poll::Ready(Err(ExecutionError::Timeout)) => { + Ok(Poll::Ready(Err(ExecutionError::Timeout))) => { // Task is still executing, but took longer than its // time slice. Keep it in queue for the next iteration // of the loop. tasks.executing.rotate_left(1); } - Poll::Ready(Err(ExecutionError::NoBudget)) => { + Ok(Poll::Ready(Err(ExecutionError::NoBudget))) => { if let Some(budget) = task .budget_pool .and_then(|pool_id| self.budgets.get_mut(&pool_id)) @@ -578,6 +579,20 @@ where tasks.complete_running_task(result, &mut self.budgets); } } + Err(mut panic) => { + drop(future); + let result = root_result( + Err(Value::from(SymbolRef::from("panic"))), + &mut vm_context, + ); + drop(vm_context); + tasks.complete_running_task(result, &mut self.budgets); + while let Err(new_panic) = + panic::catch_unwind(AssertUnwindSafe(move || drop(panic))) + { + panic = new_panic; + } + } } } diff --git a/muse-reactor/src/tests.rs b/muse-reactor/src/tests.rs index 652e38a..9b28746 100644 --- a/muse-reactor/src/tests.rs +++ b/muse-reactor/src/tests.rs @@ -3,7 +3,8 @@ use std::time::{Duration, Instant}; use muse_lang::compiler::syntax::Ranged; use muse_lang::compiler::{self}; -use muse_lang::runtime::value::{Primitive, RootedValue}; +use muse_lang::runtime::symbol::Symbol; +use muse_lang::runtime::value::{Primitive, RootedValue, RustFunction, Value}; use muse_lang::vm::Vm; use refuse::CollectionGuard; use tracing_subscriber::filter::LevelFilter; @@ -177,3 +178,26 @@ fn pool_cancellation() { other => unreachable!("unexpected result: {other:?}"), } } + +#[test] +fn task_panic() { + let reactor = Reactor::build() + .new_vm( + |guard: &mut CollectionGuard<'_>, _reactor: &ReactorHandle| { + let vm = Vm::new(&guard); + vm.declare( + "panics", + Value::dynamic(RustFunction::new(|_vm, _arity| panic!()), &guard), + guard, + )?; + Ok(vm) + }, + ) + .finish(); + let task = reactor.spawn_source("panics()").unwrap(); + let error = task.join().unwrap_err(); + match error { + TaskError::Exception(exc) if exc == RootedValue::from(Symbol::from("panic")) => {} + other => unreachable!("Unexpected result: {other:?}"), + } +}