Skip to content

Commit

Permalink
More cleanup / breaking changes to event bus, timer, ws and executor …
Browse files Browse the repository at this point in the history
…traits
  • Loading branch information
ivmarkov committed Oct 7, 2023
1 parent d723bd9 commit 0822b00
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 151 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* Breaking change: Upgraded to `embedded-io` 0.5 and `embedded-io-async` 0.5
* Upgraded `strum` and `strum-macros` to 0.25
* OTA: New method: `Ota::finish` that allows to postpone/avoid setting the updated partition as a boot one
* TimerService: `TimerService::timer` now takes `&self` instead of `&mut self`
* TimerService: `TimerService::timer` now takes `&self` instead of `&mut self` - for both blocking and async traits
* Breaking change: TimerService: scoped handler: the timer callback now only needs to live as long as the `TimerService::Timer` associated type. Therefore, `TimerService::Timer` is now lifetimed: `TimerService::Timer<'a>`
* Breaking change: TimerService: `TimerService::Timer` now borrows from `TimerService`. Therefore, that's another reason why `TimerService::Timer` is now lifetimed: `TimerService::Timer<'a>`
* Breaking change: EventBus: scoped handler: the subscription callback now only needs to live as long as the `EventBus::Subscription` associated type. Therefore, `EventBus::Subscription` is now lifetimed: `EventBus::Subscription<'a>`
* Breaking change: EventBus: `EventBus::Subscription` now borrows from `EventBus`. Therefore, that's another reason why `EventBus::Subscription` is now lifetimed: `EventBus::Subscription<'a>`
* Breaking change: ws::Acceptor: the blocking as well as the async version now return `Connection` / `Sender` / `Receiver` instances which borrow from `Acceptor`
* Breaking change: Unblocker: scoped handler: the callback now only needs to live as long as the `Unblocker::UnblockFuture` associated type. Therefore, `Unblocker::UnblockFuture` is now lifetimed: `Unblocker::UnblockFuture<'a, ...>`
* Breaking change: Unblocker: `Unblocker::UnblockFuture` now borrows from `Unblocker::UnblockFuture`. Therefore, that's another reason why `Unblocker::UnblockFuture` is now lifetimed: `Unblocker::UnblockFuture<'a, ...>`
* Breaking change: OTA: GAT `Ota::Update` now parametric over lifetime and no longer returned by `&mut` ref
* Breaking change: OTA: `OtaUpdate::abort` and `OtaUpdate::complete` now take `self` instead of `&mut self`
* Breaking change: MQTT: GAT `Connection::Message` now parametric over lifetime
Expand Down
68 changes: 38 additions & 30 deletions src/event_bus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@ where
}

pub trait EventBus<P>: ErrorType {
type Subscription<'a>;
type Subscription<'a>
where
Self: 'a;

fn subscribe<'a, F>(&self, callback: F) -> Result<Self::Subscription<'a>, Self::Error>
fn subscribe<'a, F>(&'a self, callback: F) -> Result<Self::Subscription<'a>, Self::Error>
where
F: FnMut(&P) + Send + 'a;
}
Expand All @@ -58,9 +60,9 @@ impl<'e, P, E> EventBus<P> for &'e E
where
E: EventBus<P>,
{
type Subscription<'a> = E::Subscription<'a>;
type Subscription<'a> = E::Subscription<'a> where Self: 'a;

fn subscribe<'a, F>(&self, callback: F) -> Result<Self::Subscription<'a>, Self::Error>
fn subscribe<'a, F>(&'a self, callback: F) -> Result<Self::Subscription<'a>, Self::Error>
where
F: FnMut(&P) + Send + 'a,
{
Expand All @@ -72,9 +74,9 @@ impl<'e, P, E> EventBus<P> for &'e mut E
where
E: EventBus<P>,
{
type Subscription<'a> = E::Subscription<'a>;
type Subscription<'a> = E::Subscription<'a> where Self: 'a;

fn subscribe<'a, F>(&self, callback: F) -> Result<Self::Subscription<'a>, Self::Error>
fn subscribe<'a, F>(&'a self, callback: F) -> Result<Self::Subscription<'a>, Self::Error>
where
F: FnMut(&P) + Send + 'a,
{
Expand All @@ -83,29 +85,31 @@ where
}

pub trait PostboxProvider<P>: ErrorType {
type Postbox: Postbox<P, Error = Self::Error>;
type Postbox<'a>: Postbox<P, Error = Self::Error>
where
Self: 'a;

fn postbox(&self) -> Result<Self::Postbox, Self::Error>;
fn postbox(&self) -> Result<Self::Postbox<'_>, Self::Error>;
}

impl<'a, P, PP> PostboxProvider<P> for &'a mut PP
impl<'p, P, PP> PostboxProvider<P> for &'p mut PP
where
PP: PostboxProvider<P>,
{
type Postbox = PP::Postbox;
type Postbox<'a> = PP::Postbox<'a> where Self: 'a;

fn postbox(&self) -> Result<Self::Postbox, Self::Error> {
fn postbox(&self) -> Result<Self::Postbox<'_>, Self::Error> {
(**self).postbox()
}
}

impl<'a, P, PP> PostboxProvider<P> for &'a PP
impl<'p, P, PP> PostboxProvider<P> for &'p PP
where
PP: PostboxProvider<P>,
{
type Postbox = PP::Postbox;
type Postbox<'a> = PP::Postbox<'a> where Self: 'a;

fn postbox(&self) -> Result<Self::Postbox, Self::Error> {
fn postbox(&self) -> Result<Self::Postbox<'_>, Self::Error> {
(*self).postbox()
}
}
Expand Down Expand Up @@ -174,58 +178,62 @@ pub mod asynch {
}

pub trait EventBus<P>: ErrorType {
type Subscription: Receiver<Result = P>;
type Subscription<'a>: Receiver<Result = P>
where
Self: 'a;

fn subscribe(&self) -> Result<Self::Subscription, Self::Error>;
async fn subscribe(&self) -> Result<Self::Subscription<'_>, Self::Error>;
}

impl<E, P> EventBus<P> for &mut E
where
E: EventBus<P>,
{
type Subscription = E::Subscription;
type Subscription<'a> = E::Subscription<'a> where Self: 'a;

fn subscribe(&self) -> Result<Self::Subscription, Self::Error> {
(**self).subscribe()
async fn subscribe(&self) -> Result<Self::Subscription<'_>, Self::Error> {
(**self).subscribe().await
}
}

impl<E, P> EventBus<P> for &E
where
E: EventBus<P>,
{
type Subscription = E::Subscription;
type Subscription<'a> = E::Subscription<'a> where Self: 'a;

fn subscribe(&self) -> Result<Self::Subscription, Self::Error> {
(**self).subscribe()
async fn subscribe(&self) -> Result<Self::Subscription<'_>, Self::Error> {
(**self).subscribe().await
}
}

pub trait PostboxProvider<P>: ErrorType {
type Postbox: Sender<Data = P>;
type Postbox<'a>: Sender<Data = P>
where
Self: 'a;

fn postbox(&self) -> Result<Self::Postbox, Self::Error>;
async fn postbox(&self) -> Result<Self::Postbox<'_>, Self::Error>;
}

impl<PB, P> PostboxProvider<P> for &mut PB
where
PB: PostboxProvider<P>,
{
type Postbox = PB::Postbox;
type Postbox<'a> = PB::Postbox<'a> where Self: 'a;

fn postbox(&self) -> Result<Self::Postbox, Self::Error> {
(**self).postbox()
async fn postbox(&self) -> Result<Self::Postbox<'_>, Self::Error> {
(**self).postbox().await
}
}

impl<PB, P> PostboxProvider<P> for &PB
where
PB: PostboxProvider<P>,
{
type Postbox = PB::Postbox;
type Postbox<'a> = PB::Postbox<'a> where Self: 'a;

fn postbox(&self) -> Result<Self::Postbox, Self::Error> {
(**self).postbox()
async fn postbox(&self) -> Result<Self::Postbox<'_>, Self::Error> {
(**self).postbox().await
}
}
}
55 changes: 26 additions & 29 deletions src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,57 +17,54 @@ pub mod asynch {
}
}

//#[cfg(feature = "nightly")]
mod unblocker {
use core::future::Future;

// Keep it GAT based for now so that it builds with stable Rust
// and therefore `crate::utils::asyncify` can also build with stable Rust
pub trait Unblocker {
type UnblockFuture<T>: Future<Output = T> + Send
type UnblockFuture<'a, F, T>: Future<Output = T> + Send
where
T: Send;
Self: 'a,
F: Send + 'a,
T: Send + 'a;

fn unblock<F, T>(&self, f: F) -> Self::UnblockFuture<T>
fn unblock<'a, F, T>(&'a self, f: F) -> Self::UnblockFuture<'a, F, T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static;
F: FnOnce() -> T + Send + 'a,
T: Send + 'a;
}

impl<U> Unblocker for &U
where
U: Unblocker,
{
type UnblockFuture<T>
= U::UnblockFuture<T> where T: Send;
type UnblockFuture<'a, F, T>
= U::UnblockFuture<'a, F, T> where Self: 'a, F: Send + 'a, T: Send + 'a;

fn unblock<F, T>(&self, f: F) -> Self::UnblockFuture<T>
fn unblock<'a, F, T>(&'a self, f: F) -> Self::UnblockFuture<'a, F, T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
F: FnOnce() -> T + Send + 'a,
T: Send + 'a,
{
(*self).unblock(f)
}
}

// pub trait Unblocker {
// async fn unblock<F, T>(&self, f: F) -> T
// where
// F: FnOnce() -> T + Send + 'static,
// T: Send + 'static;
// }
impl<U> Unblocker for &mut U
where
U: Unblocker,
{
type UnblockFuture<'a, F, T>
= U::UnblockFuture<'a, F, T> where Self: 'a, F: Send + 'a, T: Send + 'a;

// impl<U> Unblocker for &U
// where
// U: Unblocker,
// {
// async fn unblock<F, T>(&self, f: F) -> T
// where
// F: FnOnce() -> T + Send + 'static,
// T: Send + 'static,
// {
// (*self).unblock(f).await
// }
// }
fn unblock<'a, F, T>(&'a self, f: F) -> Self::UnblockFuture<'a, F, T>
where
F: FnOnce() -> T + Send + 'a,
T: Send + 'a,
{
(**self).unblock(f)
}
}
}
}
10 changes: 5 additions & 5 deletions src/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ pub mod asynch {
where
Self: 'a;

fn timer(&self) -> Result<Self::Timer<'_>, Self::Error>;
async fn timer(&self) -> Result<Self::Timer<'_>, Self::Error>;
}

impl<T> TimerService for &T
Expand All @@ -173,8 +173,8 @@ pub mod asynch {
{
type Timer<'a> = T::Timer<'a> where Self: 'a;

fn timer(&self) -> Result<Self::Timer<'_>, Self::Error> {
(*self).timer()
async fn timer(&self) -> Result<Self::Timer<'_>, Self::Error> {
(*self).timer().await
}
}

Expand All @@ -184,8 +184,8 @@ pub mod asynch {
{
type Timer<'a> = T::Timer<'a> where Self: 'a;

fn timer(&self) -> Result<Self::Timer<'_>, Self::Error> {
(**self).timer()
async fn timer(&self) -> Result<Self::Timer<'_>, Self::Error> {
(**self).timer().await
}
}
}
39 changes: 14 additions & 25 deletions src/utils/asyncify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,54 +66,43 @@ mod blocking_unblocker {
pub struct BlockingUnblocker(());

impl BlockingUnblocker {
pub fn unblock<F, T>(&self, f: F) -> BlockingFuture<T>
pub fn unblock<'a, F, T>(&'a self, f: F) -> BlockingFuture<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
F: FnOnce() -> T + Send + 'a,
T: Send + 'a,
{
BlockingFuture::new(f)
}
}

impl crate::executor::asynch::Unblocker for BlockingUnblocker {
type UnblockFuture<T> = BlockingFuture<T> where T: Send;
type UnblockFuture<'a, F, T> = BlockingFuture<'a, T> where Self: 'a, F: Send + 'a, T: Send + 'a;

fn unblock<F, T>(&self, f: F) -> Self::UnblockFuture<T>
fn unblock<'a, F, T>(&'a self, f: F) -> Self::UnblockFuture<'a, F, T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
F: FnOnce() -> T + Send + 'a,
T: Send + 'a,
{
BlockingUnblocker::unblock(self, f)
}
}

// #[cfg(feature = "nightly")]
// impl crate::executor::asynch::Unblocker for BlockingUnblocker {
// async fn unblock<F, T>(&self, f: F) -> T
// where
// F: FnOnce() -> T + Send + 'static,
// T: Send + 'static,
// {
// BlockingUnblocker::unblock(self, f).await
// }
// }

pub fn blocking_unblocker() -> BlockingUnblocker {
BlockingUnblocker(())
}

pub struct BlockingFuture<T> {
pub struct BlockingFuture<'a, T> {
// TODO: Need to box or else we get rustc error:
// "type parameter `F` is part of concrete type but not used in parameter list for the `impl Trait` type alias"
computation: Option<Box<dyn FnOnce() -> T + Send + 'static>>,
computation: Option<Box<dyn FnOnce() -> T + Send + 'a>>,
_result: PhantomData<fn() -> T>,
}

impl<T> BlockingFuture<T> {
impl<'a, T> BlockingFuture<'a, T> {
fn new<F>(computation: F) -> Self
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
F: FnOnce() -> T + Send + 'a,
T: Send + 'a,
{
Self {
computation: Some(Box::new(computation)),
Expand All @@ -122,9 +111,9 @@ mod blocking_unblocker {
}
}

impl<T> Future for BlockingFuture<T>
impl<'a, T> Future for BlockingFuture<'a, T>
where
T: Send,
T: Send + 'a,
{
type Output = T;

Expand Down
Loading

0 comments on commit 0822b00

Please sign in to comment.