Skip to content

Commit

Permalink
Initial reactor complete
Browse files Browse the repository at this point in the history
Cloning the Vm was the correct step. This design isn't very useful
without captures, as it requires passing values through a spawn
function.
  • Loading branch information
ecton committed Aug 1, 2024
2 parents 5838b41 + 0622a74 commit 49997bb
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 54 deletions.
181 changes: 127 additions & 54 deletions src/runtime/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use std::{
any::Any,
collections::VecDeque,
fmt::Debug,
future::Future,
marker::PhantomData,
num::NonZeroUsize,
Expand All @@ -18,16 +19,13 @@ use std::{
use alot::{LotId, Lots};
use flume::{Receiver, Sender, TryRecvError};
use kempt::{Map, Set};
use parking_lot::Mutex;
use parking_lot::{Condvar, Mutex};
use refuse::{CollectionGuard, Trace};

use crate::{
compiler::{self, syntax::Ranged, Compiler},
runtime::{
symbol::SymbolRef,
value::{RustFunction, RustType},
},
vm::{Code, ExecutionError, Fault, Function, Register, Vm},
runtime::value::RustType,
vm::{Code, ExecutionError, Fault, Vm},
};

use super::value::{CustomType, Value};
Expand Down Expand Up @@ -230,12 +228,98 @@ impl Wake for ReactorTaskWaker {
}
}

#[derive(Default, Clone)]
struct ResultHandle(Arc<ResultHandleData>);

impl ResultHandle {
fn send(&self, result: Result<Value, Value>) {
let mut data = self.0.locked.lock();
data.result = Some(result);
for waker in data.wakers.drain(..) {
waker.wake();
}
drop(data);
self.0.sync.notify_all();
}

fn recv(&self) -> Result<Value, Value> {
let mut data = self.0.locked.lock();
loop {
if let Some(result) = &data.result {
return result.clone();
} else {
self.0.sync.wait(&mut data);
}
}
}

fn recv_async(&self) -> ResultHandleFuture<'_> {
ResultHandleFuture(self)
}
}

impl Debug for ResultHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut f = f.debug_tuple("ResultHandle");

if let Some(data) = self.0.locked.try_lock() {
f.field(&data.result);
}

f.finish()
}
}

impl Trace for ResultHandle {
const MAY_CONTAIN_REFERENCES: bool = true;

fn trace(&self, tracer: &mut refuse::Tracer) {
let data = self.0.locked.lock();
match &data.result {
Some(Ok(v) | Err(v)) => v.trace(tracer),
_ => {}
}
}
}

#[derive(Debug)]
struct ResultHandleFuture<'a>(&'a ResultHandle);

impl Future for ResultHandleFuture<'_> {
type Output = Result<Value, Value>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut data = self.0 .0.locked.lock();
if let Some(result) = &data.result {
Poll::Ready(result.clone())
} else {
let will_wake = data.wakers.iter().any(|w| w.will_wake(cx.waker()));
if !will_wake {
data.wakers.push(cx.waker().clone());
}
Poll::Pending
}
}
}

#[derive(Default)]
struct ResultHandleData {
sync: Condvar,
locked: Mutex<ResultHandleResult>,
}

#[derive(Default)]
struct ResultHandleResult {
result: Option<Result<Value, Value>>,
wakers: Vec<Waker>,
}

struct ReactorTask {
vm: Vm,
waker: Waker,
global_id: usize,
executing: bool,
result: Sender<Result<Value, Value>>,
result: ResultHandle,
}

#[derive(Default)]
Expand All @@ -247,7 +331,7 @@ struct ReactorTasks {
}

impl ReactorTasks {
fn push(&mut self, global_id: usize, vm: Vm, result: Sender<Result<Value, Value>>) -> LotId {
fn push(&mut self, global_id: usize, vm: Vm, result: ResultHandle) -> LotId {
let id = self.all.push(ReactorTask {
vm,
waker: Waker::from(Arc::new(ReactorTaskWaker {
Expand All @@ -267,8 +351,8 @@ impl ReactorTasks {
fn complete_running_task(&mut self, result: Result<Value, Value>) {
let task_id = self.executing.pop_front().expect("no running task");
let task = self.all.remove(task_id).expect("task missing");
println!("Finished task {} with result {result:?}", task.global_id);
let _result = task.result.send(result);
self.registered.remove(&task.global_id);
}

fn wake_woken(&mut self) {
Expand All @@ -281,7 +365,6 @@ impl ReactorTasks {
continue;
};
if !task.executing {
println!("Waking {woken}");
task.executing = true;
self.executing.push_back(id)
}
Expand All @@ -290,18 +373,19 @@ impl ReactorTasks {
}
}

#[derive(Debug)]
#[derive(Trace)]
pub struct TaskHandle {
result: Receiver<Result<Value, Value>>,
global_id: usize,
result: ResultHandle,
}

impl TaskHandle {
pub fn join(&self) -> Option<Result<Value, Value>> {
self.result.recv().ok()
pub fn join(&self) -> Result<Value, Value> {
self.result.recv()
}

pub async fn join_async(&self) -> Option<Result<Value, Value>> {
self.result.recv_async().await.ok()
pub async fn join_async(&self) -> Result<Value, Value> {
self.result.recv_async().await
}
}

Expand All @@ -314,35 +398,22 @@ impl CustomType for TaskHandle {
let mut context = Context::from_waker(&waker);
let mut future = this.result.recv_async();
match Pin::new(&mut future).poll(&mut context) {
Poll::Ready(Ok(result)) => result.map_err(Fault::Exception),
Poll::Ready(Err(_)) => Err(Fault::Exception(Value::Symbol(
SymbolRef::from("result-already-read"),
))),
Poll::Ready(result) => result.map_err(Fault::Exception),
Poll::Pending => Err(Fault::Waiting),
}
// if let Some(body) = this.bodies.get(&arity).or_else(|| {
// this.varg_bodies
// .iter()
// .rev()
// .find_map(|va| (va.key() <= &arity).then_some(&va.value))
// }) {
// let module = this.module.ok_or(Fault::NotAModule)?;
// vm.execute_function(body, &this, module)
// } else {
// Err(Fault::IncorrectNumberOfArguments)
// }
}
})
});
&TYPE
}
}

impl Trace for TaskHandle {
const MAY_CONTAIN_REFERENCES: bool = true;

fn trace(&self, tracer: &mut refuse::Tracer) {
// todo!()
impl Debug for TaskHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TaskHandle")
.field("global_id", &self.global_id)
.field("result", &self.result)
.finish()
}
}

Expand All @@ -356,8 +427,11 @@ pub struct ReactorHandle<Work = NoWork> {

impl<Work> ReactorHandle<Work> {
fn spawn_spawnable(&self, spawnable: Spawnable<Work>) -> Result<TaskHandle, ReactorShutdown> {
let (command, result) = Command::new(&self.data.next_task_id, spawnable);
let handle = TaskHandle { result };
let command = Command::new(&self.data.next_task_id, spawnable);
let handle = TaskHandle {
result: command.result.clone(),
global_id: command.id,
};
self.data
.sender
.send(command)
Expand Down Expand Up @@ -414,20 +488,18 @@ struct HandleData<Work> {
struct Command<Work> {
id: usize,
kind: Spawnable<Work>,
result: Sender<Result<Value, Value>>,
result: ResultHandle,
}

impl<Work> Command<Work> {
fn new(ids: &AtomicUsize, kind: Spawnable<Work>) -> (Self, Receiver<Result<Value, Value>>) {
let (sender, receiver) = flume::bounded(1);
(
Self {
id: ids.fetch_add(1, Ordering::Acquire),
kind,
result: sender,
},
receiver,
)
fn new(ids: &AtomicUsize, kind: Spawnable<Work>) -> Self {
let result = ResultHandle::default();

Self {
id: ids.fetch_add(1, Ordering::Acquire),
kind,
result,
}
}
}

Expand Down Expand Up @@ -525,13 +597,14 @@ impl WorkUnit for NoWork {
fn works() {
let reactor = Reactor::new();
let task = reactor.spawn_source("1 + 2").unwrap();
let result = task.join().unwrap().unwrap();
let result = task.join().unwrap();
assert_eq!(result, Value::Int(3));
}

#[test]
fn spawning() {
use crate::vm::{Arity, VmContext};
use crate::runtime::value::RustFunction;
use crate::vm::{Arity, Function, Register, VmContext};

fn add_spawn_function(vm: Vm, guard: &mut CollectionGuard<'_>, reactor: &ReactorHandle) -> Vm {
let reactor = reactor.clone();
Expand All @@ -541,9 +614,9 @@ fn spawning() {
RustFunction::new(move |ctx: &mut VmContext<'_, '_>, arity: Arity| {
if arity == 2 {
println!("Spawn called with {:?}", ctx[Register(0)]);
let vm = ctx.cloned_vm();
vm.set_register(Register(0), ctx[Register(0)]);
let f = ctx[Register(1)].as_rooted::<Function>(ctx.guard()).unwrap();
let vm =
add_spawn_function(Vm::new(ctx.guard()), ctx.guard_mut(), &reactor);
vm.prepare_call(&f, Arity(1), &mut CollectionGuard::acquire())
.unwrap();
Ok(Value::dynamic(reactor.spawn(vm).unwrap(), ctx.guard()))
Expand Down Expand Up @@ -582,6 +655,6 @@ fn spawning() {
",
)
.unwrap();
let result = task.join().unwrap().unwrap();
let result = task.join().unwrap();
assert_eq!(result, Value::Int(5 + 4 + 3 + 2 + 1));
}
1 change: 1 addition & 0 deletions src/runtime/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ pub struct RuntimeEnum {
}

impl RuntimeEnum {
#[cfg(feature = "dispatched")]
pub(crate) fn to_bitcode_type(&self, guard: &CollectionGuard) -> BitcodeEnum {
BitcodeEnum {
name: self.ty.name.clone(),
Expand Down
4 changes: 4 additions & 0 deletions src/runtime/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ use crate::runtime::string::MuseString;
use crate::runtime::symbol::{Symbol, SymbolList, SymbolRef};
use crate::vm::{Arity, ExecutionError, Fault, VmContext};

#[cfg(feature = "dispatched")]
use crate::runtime::types::{RuntimeEnum, RuntimeStruct};
#[cfg(feature = "dispatched")]
use crate::vm::bitcode::{BitcodeFunction, ValueOrSource};
#[cfg(feature = "dispatched")]
use crate::vm::Function;

/// A Muse virtual machine value.
Expand Down Expand Up @@ -1081,6 +1084,7 @@ impl Value {
}
}

#[cfg(feature = "dispatched")]
pub(crate) fn as_source(&self, guard: &CollectionGuard<'_>) -> ValueOrSource {
match self {
Value::Nil => ValueOrSource::Nil,
Expand Down
Loading

0 comments on commit 49997bb

Please sign in to comment.