From b5b57f188f2a65b2dbdc49616463437abbb8e073 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 2 Jul 2024 11:11:40 +0200 Subject: [PATCH 1/5] Hold sep. Mutexes for pending `intercepted_msgs`/`peer_connected` events This is a minor refactor that will allow us to access the individual event queue Mutexes separately, allowing us to drop the locks earlier when processing them individually. --- lightning/src/onion_message/messenger.rs | 49 ++++++++++++------------ 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index 28f1bc79253..75d92610ae5 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -261,12 +261,8 @@ pub struct OnionMessenger< async_payments_handler: APH, custom_handler: CMH, intercept_messages_for_offline_peers: bool, - pending_events: Mutex, -} - -struct PendingEvents { - intercepted_msgs: Vec, - peer_connecteds: Vec, + pending_intercepted_msgs_events: Mutex>, + pending_peer_connected_events: Mutex>, } /// [`OnionMessage`]s buffered to be sent. @@ -1095,10 +1091,8 @@ where async_payments_handler, custom_handler, intercept_messages_for_offline_peers, - pending_events: Mutex::new(PendingEvents { - intercepted_msgs: Vec::new(), - peer_connecteds: Vec::new(), - }), + pending_intercepted_msgs_events: Mutex::new(Vec::new()), + pending_peer_connected_events: Mutex::new(Vec::new()), } } @@ -1316,14 +1310,15 @@ where fn enqueue_intercepted_event(&self, event: Event) { const MAX_EVENTS_BUFFER_SIZE: usize = (1 << 10) * 256; - let mut pending_events = self.pending_events.lock().unwrap(); - let total_buffered_bytes: usize = - pending_events.intercepted_msgs.iter().map(|ev| ev.serialized_length()).sum(); + let mut pending_intercepted_msgs_events = + self.pending_intercepted_msgs_events.lock().unwrap(); + let total_buffered_bytes: usize = pending_intercepted_msgs_events.iter() + .map(|ev| ev.serialized_length()).sum(); if total_buffered_bytes >= MAX_EVENTS_BUFFER_SIZE { log_trace!(self.logger, "Dropping event {:?}: buffer full", event); return } - pending_events.intercepted_msgs.push(event); + pending_intercepted_msgs_events.push(event); } /// Processes any events asynchronously using the given handler. @@ -1339,9 +1334,12 @@ where let mut intercepted_msgs = Vec::new(); let mut peer_connecteds = Vec::new(); { - let mut pending_events = self.pending_events.lock().unwrap(); - core::mem::swap(&mut pending_events.intercepted_msgs, &mut intercepted_msgs); - core::mem::swap(&mut pending_events.peer_connecteds, &mut peer_connecteds); + let mut pending_intercepted_msgs_events = + self.pending_intercepted_msgs_events.lock().unwrap(); + let mut pending_peer_connected_events = + self.pending_peer_connected_events.lock().unwrap(); + core::mem::swap(&mut *pending_intercepted_msgs_events, &mut intercepted_msgs); + core::mem::swap(&mut *pending_peer_connected_events, &mut peer_connecteds); } let mut futures = Vec::with_capacity(intercepted_msgs.len()); @@ -1417,18 +1415,19 @@ where } let mut events = Vec::new(); { - let mut pending_events = self.pending_events.lock().unwrap(); + let mut pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap(); + let mut pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap(); #[cfg(debug_assertions)] { - for ev in pending_events.intercepted_msgs.iter() { + for ev in pending_intercepted_msgs_events.iter() { if let Event::OnionMessageIntercepted { .. } = ev {} else { panic!(); } } - for ev in pending_events.peer_connecteds.iter() { + for ev in pending_peer_connected_events.iter() { if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); } } } - core::mem::swap(&mut pending_events.intercepted_msgs, &mut events); - events.append(&mut pending_events.peer_connecteds); - pending_events.peer_connecteds.shrink_to(10); // Limit total heap usage + core::mem::swap(&mut *pending_intercepted_msgs_events, &mut events); + events.append(&mut pending_peer_connected_events); + pending_peer_connected_events.shrink_to(10); // Limit total heap usage } for ev in events { handler.handle_event(ev); @@ -1558,7 +1557,9 @@ where .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new())) .mark_connected(); if self.intercept_messages_for_offline_peers { - self.pending_events.lock().unwrap().peer_connecteds.push( + let mut pending_peer_connected_events = + self.pending_peer_connected_events.lock().unwrap(); + pending_peer_connected_events.push( Event::OnionMessagePeerConnected { peer_node_id: *their_node_id } ); } From 018908fe9e23f3994ea1a015cdb955708ac5fdc1 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Mon, 15 Apr 2024 10:35:52 +0200 Subject: [PATCH 2/5] Make event handling fallible Previously, we would require our users to handle all events successfully inline or panic will trying to do so. If they would exit the `EventHandler` any other way we'd forget about the event and wouldn't replay them after restart. Here, we implement fallible event handling, allowing the user to return `Err(())` which signals to our event providers they should abort event processing and replay any unhandled events later (i.e., in the next invocation). --- lightning-background-processor/src/lib.rs | 81 +++-- lightning-invoice/src/utils.rs | 1 + lightning/src/chain/chainmonitor.rs | 23 +- lightning/src/chain/channelmonitor.rs | 51 ++- lightning/src/events/mod.rs | 25 +- lightning/src/ln/channelmanager.rs | 293 ++++++++++-------- .../src/onion_message/functional_tests.rs | 2 +- lightning/src/onion_message/messenger.rs | 20 +- lightning/src/util/async_poll.rs | 63 +++- 9 files changed, 346 insertions(+), 213 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 940d1b029e7..10f5ada505e 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -26,6 +26,8 @@ use lightning::chain::chainmonitor::{ChainMonitor, Persist}; use lightning::events::EventHandler; #[cfg(feature = "std")] use lightning::events::EventsProvider; +#[cfg(feature = "futures")] +use lightning::events::ReplayEvent; use lightning::events::{Event, PathFailure}; use lightning::ln::channelmanager::AChannelManager; @@ -583,6 +585,7 @@ use futures_util::{dummy_waker, Selector, SelectorOutput}; /// could setup `process_events_async` like this: /// ``` /// # use lightning::io; +/// # use lightning::events::ReplayEvent; /// # use std::sync::{Arc, RwLock}; /// # use std::sync::atomic::{AtomicBool, Ordering}; /// # use std::time::SystemTime; @@ -600,7 +603,7 @@ use futures_util::{dummy_waker, Selector, SelectorOutput}; /// # } /// # struct EventHandler {} /// # impl EventHandler { -/// # async fn handle_event(&self, _: lightning::events::Event) {} +/// # async fn handle_event(&self, _: lightning::events::Event) -> Result<(), ReplayEvent> { Ok(()) } /// # } /// # #[derive(Eq, PartialEq, Clone, Hash)] /// # struct SocketDescriptor {} @@ -698,7 +701,7 @@ pub async fn process_events_async< G: 'static + Deref> + Send + Sync, L: 'static + Deref + Send + Sync, P: 'static + Deref + Send + Sync, - EventHandlerFuture: core::future::Future, + EventHandlerFuture: core::future::Future>, EventHandler: Fn(Event) -> EventHandlerFuture, PS: 'static + Deref + Send, M: 'static @@ -751,12 +754,16 @@ where if update_scorer(scorer, &event, duration_since_epoch) { log_trace!(logger, "Persisting scorer after update"); if let Err(e) = persister.persist_scorer(&scorer) { - log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) + log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e); + // We opt not to abort early on persistence failure here as persisting + // the scorer is non-critical and we still hope that it will have + // resolved itself when it is potentially critical in event handling + // below. } } } } - event_handler(event).await; + event_handler(event).await }) }; define_run_body!( @@ -913,7 +920,7 @@ impl BackgroundProcessor { } } } - event_handler.handle_event(event); + event_handler.handle_event(event) }; define_run_body!( persister, @@ -1757,7 +1764,7 @@ mod tests { // Initiate the background processors to watch each node. let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); - let event_handler = |_: _| {}; + let event_handler = |_: _| Ok(()); let bg_processor = BackgroundProcessor::start( persister, event_handler, @@ -1847,7 +1854,7 @@ mod tests { let (_, nodes) = create_nodes(1, "test_timer_tick_called"); let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); - let event_handler = |_: _| {}; + let event_handler = |_: _| Ok(()); let bg_processor = BackgroundProcessor::start( persister, event_handler, @@ -1889,7 +1896,7 @@ mod tests { let persister = Arc::new( Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"), ); - let event_handler = |_: _| {}; + let event_handler = |_: _| Ok(()); let bg_processor = BackgroundProcessor::start( persister, event_handler, @@ -1924,7 +1931,7 @@ mod tests { let bp_future = super::process_events_async( persister, - |_: _| async {}, + |_: _| async { Ok(()) }, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), @@ -1957,7 +1964,7 @@ mod tests { let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test")); - let event_handler = |_: _| {}; + let event_handler = |_: _| Ok(()); let bg_processor = BackgroundProcessor::start( persister, event_handler, @@ -1986,7 +1993,7 @@ mod tests { let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test")); - let event_handler = |_: _| {}; + let event_handler = |_: _| Ok(()); let bg_processor = BackgroundProcessor::start( persister, event_handler, @@ -2021,13 +2028,16 @@ mod tests { // Set up a background event handler for FundingGenerationReady events. let (funding_generation_send, funding_generation_recv) = std::sync::mpsc::sync_channel(1); let (channel_pending_send, channel_pending_recv) = std::sync::mpsc::sync_channel(1); - let event_handler = move |event: Event| match event { - Event::FundingGenerationReady { .. } => funding_generation_send - .send(handle_funding_generation_ready!(event, channel_value)) - .unwrap(), - Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(), - Event::ChannelReady { .. } => {}, - _ => panic!("Unexpected event: {:?}", event), + let event_handler = move |event: Event| { + match event { + Event::FundingGenerationReady { .. } => funding_generation_send + .send(handle_funding_generation_ready!(event, channel_value)) + .unwrap(), + Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(), + Event::ChannelReady { .. } => {}, + _ => panic!("Unexpected event: {:?}", event), + } + Ok(()) }; let bg_processor = BackgroundProcessor::start( @@ -2082,11 +2092,14 @@ mod tests { // Set up a background event handler for SpendableOutputs events. let (sender, receiver) = std::sync::mpsc::sync_channel(1); - let event_handler = move |event: Event| match event { - Event::SpendableOutputs { .. } => sender.send(event).unwrap(), - Event::ChannelReady { .. } => {}, - Event::ChannelClosed { .. } => {}, - _ => panic!("Unexpected event: {:?}", event), + let event_handler = move |event: Event| { + match event { + Event::SpendableOutputs { .. } => sender.send(event).unwrap(), + Event::ChannelReady { .. } => {}, + Event::ChannelClosed { .. } => {}, + _ => panic!("Unexpected event: {:?}", event), + } + Ok(()) }; let persister = Arc::new(Persister::new(data_dir)); let bg_processor = BackgroundProcessor::start( @@ -2220,7 +2233,7 @@ mod tests { let (_, nodes) = create_nodes(2, "test_scorer_persistence"); let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); - let event_handler = |_: _| {}; + let event_handler = |_: _| Ok(()); let bg_processor = BackgroundProcessor::start( persister, event_handler, @@ -2315,7 +2328,7 @@ mod tests { let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender)); - let event_handler = |_: _| {}; + let event_handler = |_: _| Ok(()); let background_processor = BackgroundProcessor::start( persister, event_handler, @@ -2350,7 +2363,7 @@ mod tests { let (exit_sender, exit_receiver) = tokio::sync::watch::channel(()); let bp_future = super::process_events_async( persister, - |_: _| async {}, + |_: _| async { Ok(()) }, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), @@ -2492,12 +2505,15 @@ mod tests { #[test] fn test_payment_path_scoring() { let (sender, receiver) = std::sync::mpsc::sync_channel(1); - let event_handler = move |event: Event| match event { - Event::PaymentPathFailed { .. } => sender.send(event).unwrap(), - Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(), - Event::ProbeSuccessful { .. } => sender.send(event).unwrap(), - Event::ProbeFailed { .. } => sender.send(event).unwrap(), - _ => panic!("Unexpected event: {:?}", event), + let event_handler = move |event: Event| { + match event { + Event::PaymentPathFailed { .. } => sender.send(event).unwrap(), + Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(), + Event::ProbeSuccessful { .. } => sender.send(event).unwrap(), + Event::ProbeFailed { .. } => sender.send(event).unwrap(), + _ => panic!("Unexpected event: {:?}", event), + } + Ok(()) }; let (_, nodes) = create_nodes(1, "test_payment_path_scoring"); @@ -2543,6 +2559,7 @@ mod tests { Event::ProbeFailed { .. } => sender_ref.send(event).await.unwrap(), _ => panic!("Unexpected event: {:?}", event), } + Ok(()) } }; diff --git a/lightning-invoice/src/utils.rs b/lightning-invoice/src/utils.rs index 00b49c371ea..fa301a8dc06 100644 --- a/lightning-invoice/src/utils.rs +++ b/lightning-invoice/src/utils.rs @@ -1391,6 +1391,7 @@ mod test { } else { other_events.borrow_mut().push(event); } + Ok(()) }; nodes[fwd_idx].node.process_pending_events(&forward_event_handler); nodes[fwd_idx].node.process_pending_events(&forward_event_handler); diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index e6bb9d90778..93e1dae6ce3 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -33,8 +33,7 @@ use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance use crate::chain::transaction::{OutPoint, TransactionData}; use crate::ln::types::ChannelId; use crate::sign::ecdsa::EcdsaChannelSigner; -use crate::events; -use crate::events::{Event, EventHandler}; +use crate::events::{self, Event, EventHandler, ReplayEvent}; use crate::util::logger::{Logger, WithContext}; use crate::util::errors::APIError; use crate::util::wakers::{Future, Notifier}; @@ -533,7 +532,7 @@ where C::Target: chain::Filter, pub fn get_and_clear_pending_events(&self) -> Vec { use crate::events::EventsProvider; let events = core::cell::RefCell::new(Vec::new()); - let event_handler = |event: events::Event| events.borrow_mut().push(event); + let event_handler = |event: events::Event| Ok(events.borrow_mut().push(event)); self.process_pending_events(&event_handler); events.into_inner() } @@ -544,7 +543,7 @@ where C::Target: chain::Filter, /// See the trait-level documentation of [`EventsProvider`] for requirements. /// /// [`EventsProvider`]: crate::events::EventsProvider - pub async fn process_pending_events_async Future>( + pub async fn process_pending_events_async>, H: Fn(Event) -> Future>( &self, handler: H ) { // Sadly we can't hold the monitors read lock through an async call. Thus we have to do a @@ -552,8 +551,13 @@ where C::Target: chain::Filter, let mons_to_process = self.monitors.read().unwrap().keys().cloned().collect::>(); for funding_txo in mons_to_process { let mut ev; - super::channelmonitor::process_events_body!( - self.monitors.read().unwrap().get(&funding_txo).map(|m| &m.monitor), ev, handler(ev).await); + match super::channelmonitor::process_events_body!( + self.monitors.read().unwrap().get(&funding_txo).map(|m| &m.monitor), ev, handler(ev).await) { + Ok(()) => {}, + Err(ReplayEvent ()) => { + self.event_notifier.notify(); + } + } } } @@ -880,7 +884,12 @@ impl(&self, handler: H) where H::Target: EventHandler { for monitor_state in self.monitors.read().unwrap().values() { - monitor_state.monitor.process_pending_events(&handler); + match monitor_state.monitor.process_pending_events(&handler) { + Ok(()) => {}, + Err(ReplayEvent ()) => { + self.event_notifier.notify(); + } + } } } } diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 13f2ff044a2..5ecea825100 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -51,7 +51,7 @@ use crate::chain::Filter; use crate::util::logger::{Logger, Record}; use crate::util::ser::{Readable, ReadableArgs, RequiredWrapper, MaybeReadable, UpgradableRequired, Writer, Writeable, U48}; use crate::util::byte_utils; -use crate::events::{ClosureReason, Event, EventHandler}; +use crate::events::{ClosureReason, Event, EventHandler, ReplayEvent}; use crate::events::bump_transaction::{AnchorDescriptor, BumpTransactionEvent}; #[allow(unused_imports)] @@ -1159,34 +1159,53 @@ impl Writeable for ChannelMonitorImpl { macro_rules! _process_events_body { ($self_opt: expr, $event_to_handle: expr, $handle_event: expr) => { loop { + let mut handling_res = Ok(()); let (pending_events, repeated_events); if let Some(us) = $self_opt { let mut inner = us.inner.lock().unwrap(); if inner.is_processing_pending_events { - break; + break handling_res; } inner.is_processing_pending_events = true; pending_events = inner.pending_events.clone(); repeated_events = inner.get_repeated_events(); - } else { break; } - let num_events = pending_events.len(); + } else { break handling_res; } - for event in pending_events.into_iter().chain(repeated_events.into_iter()) { + let mut num_handled_events = 0; + for event in pending_events { $event_to_handle = event; - $handle_event; + match $handle_event { + Ok(()) => num_handled_events += 1, + Err(e) => { + // If we encounter an error we stop handling events and make sure to replay + // any unhandled events on the next invocation. + handling_res = Err(e); + break; + } + } + } + + if handling_res.is_ok() { + for event in repeated_events { + // For repeated events we ignore any errors as they will be replayed eventually + // anyways. + $event_to_handle = event; + let _ = $handle_event; + } } if let Some(us) = $self_opt { let mut inner = us.inner.lock().unwrap(); - inner.pending_events.drain(..num_events); + inner.pending_events.drain(..num_handled_events); inner.is_processing_pending_events = false; - if !inner.pending_events.is_empty() { - // If there's more events to process, go ahead and do so. + if handling_res.is_ok() && !inner.pending_events.is_empty() { + // If there's more events to process and we didn't fail so far, go ahead and do + // so. continue; } } - break; + break handling_res; } } } @@ -1498,21 +1517,23 @@ impl ChannelMonitor { /// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in /// order to handle these events. /// + /// Will return a [`ReplayEvent`] error if event handling failed and should eventually be retried. + /// /// [`SpendableOutputs`]: crate::events::Event::SpendableOutputs /// [`BumpTransaction`]: crate::events::Event::BumpTransaction - pub fn process_pending_events(&self, handler: &H) where H::Target: EventHandler { + pub fn process_pending_events(&self, handler: &H) -> Result<(), ReplayEvent> where H::Target: EventHandler { let mut ev; - process_events_body!(Some(self), ev, handler.handle_event(ev)); + process_events_body!(Some(self), ev, handler.handle_event(ev)) } /// Processes any events asynchronously. /// /// See [`Self::process_pending_events`] for more information. - pub async fn process_pending_events_async Future>( + pub async fn process_pending_events_async>, H: Fn(Event) -> Future>( &self, handler: &H - ) { + ) -> Result<(), ReplayEvent> { let mut ev; - process_events_body!(Some(self), ev, { handler(ev).await }); + process_events_body!(Some(self), ev, { handler(ev).await }) } #[cfg(test)] diff --git a/lightning/src/events/mod.rs b/lightning/src/events/mod.rs index acee931138f..e51a0972cdc 100644 --- a/lightning/src/events/mod.rs +++ b/lightning/src/events/mod.rs @@ -2300,8 +2300,12 @@ pub trait MessageSendEventsProvider { /// /// In order to ensure no [`Event`]s are lost, implementors of this trait will persist [`Event`]s /// and replay any unhandled events on startup. An [`Event`] is considered handled when -/// [`process_pending_events`] returns, thus handlers MUST fully handle [`Event`]s and persist any -/// relevant changes to disk *before* returning. +/// [`process_pending_events`] returns `Ok(())`, thus handlers MUST fully handle [`Event`]s and +/// persist any relevant changes to disk *before* returning `Ok(())`. In case of an error (e.g., +/// persistence failure) implementors should return `Err(ReplayEvent())`, signalling to the +/// [`EventsProvider`] to replay unhandled events on the next invocation (generally immediately). +/// Note that some events might not be replayed, please refer to the documentation for +/// the individual [`Event`] variants for more detail. /// /// Further, because an application may crash between an [`Event`] being handled and the /// implementor of this trait being re-serialized, [`Event`] handling must be idempotent - in @@ -2328,26 +2332,33 @@ pub trait EventsProvider { fn process_pending_events(&self, handler: H) where H::Target: EventHandler; } +/// An error type that may be returned to LDK in order to safely abort event handling if it can't +/// currently succeed (e.g., due to a persistence failure). +/// +/// LDK will ensure the event is persisted and will eventually be replayed. +#[derive(Clone, Copy, Debug)] +pub struct ReplayEvent(); + /// A trait implemented for objects handling events from [`EventsProvider`]. /// /// An async variation also exists for implementations of [`EventsProvider`] that support async /// event handling. The async event handler should satisfy the generic bounds: `F: -/// core::future::Future, H: Fn(Event) -> F`. +/// core::future::Future>, H: Fn(Event) -> F`. pub trait EventHandler { /// Handles the given [`Event`]. /// /// See [`EventsProvider`] for details that must be considered when implementing this method. - fn handle_event(&self, event: Event); + fn handle_event(&self, event: Event) -> Result<(), ReplayEvent>; } -impl EventHandler for F where F: Fn(Event) { - fn handle_event(&self, event: Event) { +impl EventHandler for F where F: Fn(Event) -> Result<(), ReplayEvent> { + fn handle_event(&self, event: Event) -> Result<(), ReplayEvent> { self(event) } } impl EventHandler for Arc { - fn handle_event(&self, event: Event) { + fn handle_event(&self, event: Event) -> Result<(), ReplayEvent> { self.deref().handle_event(event) } } diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index b14b6e60877..561053d85b4 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -41,7 +41,7 @@ use crate::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, Fee use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, WithChannelMonitor, ChannelMonitorUpdateStep, HTLC_FAIL_BACK_BUFFER, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY, MonitorEvent, CLOSED_CHANNEL_UPDATE_ID}; use crate::chain::transaction::{OutPoint, TransactionData}; use crate::events; -use crate::events::{Event, EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination, PaymentFailureReason}; +use crate::events::{Event, EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination, PaymentFailureReason, ReplayEvent}; // Since this struct is returned in `list_channels` methods, expose it here in case users want to // construct one themselves. use crate::ln::inbound_payment; @@ -1395,35 +1395,38 @@ where /// } /// /// // On the event processing thread once the peer has responded -/// channel_manager.process_pending_events(&|event| match event { -/// Event::FundingGenerationReady { -/// temporary_channel_id, counterparty_node_id, channel_value_satoshis, output_script, -/// user_channel_id, .. -/// } => { -/// assert_eq!(user_channel_id, 42); -/// let funding_transaction = wallet.create_funding_transaction( -/// channel_value_satoshis, output_script -/// ); -/// match channel_manager.funding_transaction_generated( -/// &temporary_channel_id, &counterparty_node_id, funding_transaction -/// ) { -/// Ok(()) => println!("Funding channel {}", temporary_channel_id), -/// Err(e) => println!("Error funding channel {}: {:?}", temporary_channel_id, e), -/// } -/// }, -/// Event::ChannelPending { channel_id, user_channel_id, former_temporary_channel_id, .. } => { -/// assert_eq!(user_channel_id, 42); -/// println!( -/// "Channel {} now {} pending (funding transaction has been broadcasted)", channel_id, -/// former_temporary_channel_id.unwrap() -/// ); -/// }, -/// Event::ChannelReady { channel_id, user_channel_id, .. } => { -/// assert_eq!(user_channel_id, 42); -/// println!("Channel {} ready", channel_id); -/// }, -/// // ... -/// # _ => {}, +/// channel_manager.process_pending_events(&|event| { +/// match event { +/// Event::FundingGenerationReady { +/// temporary_channel_id, counterparty_node_id, channel_value_satoshis, output_script, +/// user_channel_id, .. +/// } => { +/// assert_eq!(user_channel_id, 42); +/// let funding_transaction = wallet.create_funding_transaction( +/// channel_value_satoshis, output_script +/// ); +/// match channel_manager.funding_transaction_generated( +/// &temporary_channel_id, &counterparty_node_id, funding_transaction +/// ) { +/// Ok(()) => println!("Funding channel {}", temporary_channel_id), +/// Err(e) => println!("Error funding channel {}: {:?}", temporary_channel_id, e), +/// } +/// }, +/// Event::ChannelPending { channel_id, user_channel_id, former_temporary_channel_id, .. } => { +/// assert_eq!(user_channel_id, 42); +/// println!( +/// "Channel {} now {} pending (funding transaction has been broadcasted)", channel_id, +/// former_temporary_channel_id.unwrap() +/// ); +/// }, +/// Event::ChannelReady { channel_id, user_channel_id, .. } => { +/// assert_eq!(user_channel_id, 42); +/// println!("Channel {} ready", channel_id); +/// }, +/// // ... +/// # _ => {}, +/// } +/// Ok(()) /// }); /// # } /// ``` @@ -1447,28 +1450,31 @@ where /// # fn example(channel_manager: T) { /// # let channel_manager = channel_manager.get_cm(); /// # let error_message = "Channel force-closed"; -/// channel_manager.process_pending_events(&|event| match event { -/// Event::OpenChannelRequest { temporary_channel_id, counterparty_node_id, .. } => { -/// if !is_trusted(counterparty_node_id) { -/// match channel_manager.force_close_without_broadcasting_txn( -/// &temporary_channel_id, &counterparty_node_id, error_message.to_string() -/// ) { -/// Ok(()) => println!("Rejecting channel {}", temporary_channel_id), -/// Err(e) => println!("Error rejecting channel {}: {:?}", temporary_channel_id, e), +/// channel_manager.process_pending_events(&|event| { +/// match event { +/// Event::OpenChannelRequest { temporary_channel_id, counterparty_node_id, .. } => { +/// if !is_trusted(counterparty_node_id) { +/// match channel_manager.force_close_without_broadcasting_txn( +/// &temporary_channel_id, &counterparty_node_id, error_message.to_string() +/// ) { +/// Ok(()) => println!("Rejecting channel {}", temporary_channel_id), +/// Err(e) => println!("Error rejecting channel {}: {:?}", temporary_channel_id, e), +/// } +/// return Ok(()); /// } -/// return; -/// } /// -/// let user_channel_id = 43; -/// match channel_manager.accept_inbound_channel( -/// &temporary_channel_id, &counterparty_node_id, user_channel_id -/// ) { -/// Ok(()) => println!("Accepting channel {}", temporary_channel_id), -/// Err(e) => println!("Error accepting channel {}: {:?}", temporary_channel_id, e), -/// } -/// }, -/// // ... -/// # _ => {}, +/// let user_channel_id = 43; +/// match channel_manager.accept_inbound_channel( +/// &temporary_channel_id, &counterparty_node_id, user_channel_id +/// ) { +/// Ok(()) => println!("Accepting channel {}", temporary_channel_id), +/// Err(e) => println!("Error accepting channel {}: {:?}", temporary_channel_id, e), +/// } +/// }, +/// // ... +/// # _ => {}, +/// } +/// Ok(()) /// }); /// # } /// ``` @@ -1497,13 +1503,16 @@ where /// } /// /// // On the event processing thread -/// channel_manager.process_pending_events(&|event| match event { -/// Event::ChannelClosed { channel_id, user_channel_id, .. } => { -/// assert_eq!(user_channel_id, 42); -/// println!("Channel {} closed", channel_id); -/// }, -/// // ... -/// # _ => {}, +/// channel_manager.process_pending_events(&|event| { +/// match event { +/// Event::ChannelClosed { channel_id, user_channel_id, .. } => { +/// assert_eq!(user_channel_id, 42); +/// println!("Channel {} closed", channel_id); +/// }, +/// // ... +/// # _ => {}, +/// } +/// Ok(()) /// }); /// # } /// ``` @@ -1553,30 +1562,33 @@ where /// }; /// /// // On the event processing thread -/// channel_manager.process_pending_events(&|event| match event { -/// Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose { -/// PaymentPurpose::Bolt11InvoicePayment { payment_preimage: Some(payment_preimage), .. } => { -/// assert_eq!(payment_hash, known_payment_hash); -/// println!("Claiming payment {}", payment_hash); -/// channel_manager.claim_funds(payment_preimage); -/// }, -/// PaymentPurpose::Bolt11InvoicePayment { payment_preimage: None, .. } => { -/// println!("Unknown payment hash: {}", payment_hash); +/// channel_manager.process_pending_events(&|event| { +/// match event { +/// Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose { +/// PaymentPurpose::Bolt11InvoicePayment { payment_preimage: Some(payment_preimage), .. } => { +/// assert_eq!(payment_hash, known_payment_hash); +/// println!("Claiming payment {}", payment_hash); +/// channel_manager.claim_funds(payment_preimage); +/// }, +/// PaymentPurpose::Bolt11InvoicePayment { payment_preimage: None, .. } => { +/// println!("Unknown payment hash: {}", payment_hash); +/// }, +/// PaymentPurpose::SpontaneousPayment(payment_preimage) => { +/// assert_ne!(payment_hash, known_payment_hash); +/// println!("Claiming spontaneous payment {}", payment_hash); +/// channel_manager.claim_funds(payment_preimage); +/// }, +/// // ... +/// # _ => {}, /// }, -/// PaymentPurpose::SpontaneousPayment(payment_preimage) => { -/// assert_ne!(payment_hash, known_payment_hash); -/// println!("Claiming spontaneous payment {}", payment_hash); -/// channel_manager.claim_funds(payment_preimage); +/// Event::PaymentClaimed { payment_hash, amount_msat, .. } => { +/// assert_eq!(payment_hash, known_payment_hash); +/// println!("Claimed {} msats", amount_msat); /// }, /// // ... -/// # _ => {}, -/// }, -/// Event::PaymentClaimed { payment_hash, amount_msat, .. } => { -/// assert_eq!(payment_hash, known_payment_hash); -/// println!("Claimed {} msats", amount_msat); -/// }, -/// // ... -/// # _ => {}, +/// # _ => {}, +/// } +/// Ok(()) /// }); /// # } /// ``` @@ -1619,11 +1631,14 @@ where /// ); /// /// // On the event processing thread -/// channel_manager.process_pending_events(&|event| match event { -/// Event::PaymentSent { payment_hash, .. } => println!("Paid {}", payment_hash), -/// Event::PaymentFailed { payment_hash, .. } => println!("Failed paying {}", payment_hash), -/// // ... -/// # _ => {}, +/// channel_manager.process_pending_events(&|event| { +/// match event { +/// Event::PaymentSent { payment_hash, .. } => println!("Paid {}", payment_hash), +/// Event::PaymentFailed { payment_hash, .. } => println!("Failed paying {}", payment_hash), +/// // ... +/// # _ => {}, +/// } +/// Ok(()) /// }); /// # } /// ``` @@ -1657,23 +1672,25 @@ where /// let bech32_offer = offer.to_string(); /// /// // On the event processing thread -/// channel_manager.process_pending_events(&|event| match event { -/// Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose { -/// PaymentPurpose::Bolt12OfferPayment { payment_preimage: Some(payment_preimage), .. } => { -/// println!("Claiming payment {}", payment_hash); -/// channel_manager.claim_funds(payment_preimage); +/// channel_manager.process_pending_events(&|event| { +/// match event { +/// Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose { +/// PaymentPurpose::Bolt12OfferPayment { payment_preimage: Some(payment_preimage), .. } => { +/// println!("Claiming payment {}", payment_hash); +/// channel_manager.claim_funds(payment_preimage); +/// }, +/// PaymentPurpose::Bolt12OfferPayment { payment_preimage: None, .. } => { +/// println!("Unknown payment hash: {}", payment_hash); +/// } +/// # _ => {}, /// }, -/// PaymentPurpose::Bolt12OfferPayment { payment_preimage: None, .. } => { -/// println!("Unknown payment hash: {}", payment_hash); +/// Event::PaymentClaimed { payment_hash, amount_msat, .. } => { +/// println!("Claimed {} msats", amount_msat); /// }, /// // ... -/// # _ => {}, -/// }, -/// Event::PaymentClaimed { payment_hash, amount_msat, .. } => { -/// println!("Claimed {} msats", amount_msat); -/// }, -/// // ... -/// # _ => {}, +/// # _ => {}, +/// } +/// Ok(()) /// }); /// # Ok(()) /// # } @@ -1719,12 +1736,15 @@ where /// ); /// /// // On the event processing thread -/// channel_manager.process_pending_events(&|event| match event { -/// Event::PaymentSent { payment_id: Some(payment_id), .. } => println!("Paid {}", payment_id), -/// Event::PaymentFailed { payment_id, .. } => println!("Failed paying {}", payment_id), -/// Event::InvoiceRequestFailed { payment_id, .. } => println!("Failed paying {}", payment_id), -/// // ... -/// # _ => {}, +/// channel_manager.process_pending_events(&|event| { +/// match event { +/// Event::PaymentSent { payment_id: Some(payment_id), .. } => println!("Paid {}", payment_id), +/// Event::PaymentFailed { payment_id, .. } => println!("Failed paying {}", payment_id), +/// Event::InvoiceRequestFailed { payment_id, .. } => println!("Failed paying {}", payment_id), +/// // ... +/// # _ => {}, +/// } +/// Ok(()) /// }); /// # } /// ``` @@ -1779,11 +1799,14 @@ where /// ); /// /// // On the event processing thread -/// channel_manager.process_pending_events(&|event| match event { -/// Event::PaymentSent { payment_id: Some(payment_id), .. } => println!("Paid {}", payment_id), -/// Event::PaymentFailed { payment_id, .. } => println!("Failed paying {}", payment_id), -/// // ... -/// # _ => {}, +/// channel_manager.process_pending_events(&|event| { +/// match event { +/// Event::PaymentSent { payment_id: Some(payment_id), .. } => println!("Paid {}", payment_id), +/// Event::PaymentFailed { payment_id, .. } => println!("Failed paying {}", payment_id), +/// // ... +/// # _ => {}, +/// } +/// Ok(()) /// }); /// # Ok(()) /// # } @@ -1809,18 +1832,19 @@ where /// }; /// /// // On the event processing thread -/// channel_manager.process_pending_events(&|event| match event { -/// Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose { -/// PaymentPurpose::Bolt12RefundPayment { payment_preimage: Some(payment_preimage), .. } => { -/// assert_eq!(payment_hash, known_payment_hash); -/// println!("Claiming payment {}", payment_hash); -/// channel_manager.claim_funds(payment_preimage); -/// }, -/// PaymentPurpose::Bolt12RefundPayment { payment_preimage: None, .. } => { -/// println!("Unknown payment hash: {}", payment_hash); -/// }, -/// // ... -/// # _ => {}, +/// channel_manager.process_pending_events(&|event| { +/// match event { +/// Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose { +/// PaymentPurpose::Bolt12RefundPayment { payment_preimage: Some(payment_preimage), .. } => { +/// assert_eq!(payment_hash, known_payment_hash); +/// println!("Claiming payment {}", payment_hash); +/// channel_manager.claim_funds(payment_preimage); +/// }, +/// PaymentPurpose::Bolt12RefundPayment { payment_preimage: None, .. } => { +/// println!("Unknown payment hash: {}", payment_hash); +/// }, +/// // ... +/// # _ => {}, /// }, /// Event::PaymentClaimed { payment_hash, amount_msat, .. } => { /// assert_eq!(payment_hash, known_payment_hash); @@ -1828,6 +1852,8 @@ where /// }, /// // ... /// # _ => {}, +/// } +/// Ok(()) /// }); /// # } /// ``` @@ -2831,8 +2857,9 @@ macro_rules! handle_new_monitor_update { macro_rules! process_events_body { ($self: expr, $event_to_handle: expr, $handle_event: expr) => { + let mut handling_failed = false; let mut processed_all_events = false; - while !processed_all_events { + while !handling_failed && !processed_all_events { if $self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() { return; } @@ -2856,24 +2883,34 @@ macro_rules! process_events_body { } let pending_events = $self.pending_events.lock().unwrap().clone(); - let num_events = pending_events.len(); if !pending_events.is_empty() { result = NotifyOption::DoPersist; } let mut post_event_actions = Vec::new(); + let mut num_handled_events = 0; for (event, action_opt) in pending_events { $event_to_handle = event; - $handle_event; - if let Some(action) = action_opt { - post_event_actions.push(action); + match $handle_event { + Ok(()) => { + if let Some(action) = action_opt { + post_event_actions.push(action); + } + num_handled_events += 1; + } + Err(_e) => { + // If we encounter an error we stop handling events and make sure to replay + // any unhandled events on the next invocation. + handling_failed = true; + break; + } } } { let mut pending_events = $self.pending_events.lock().unwrap(); - pending_events.drain(..num_events); + pending_events.drain(..num_handled_events); processed_all_events = pending_events.is_empty(); // Note that `push_pending_forwards_ev` relies on `pending_events_processor` being // updated here with the `pending_events` lock acquired. @@ -9240,7 +9277,7 @@ where #[cfg(any(test, feature = "_test_utils"))] pub fn get_and_clear_pending_events(&self) -> Vec { let events = core::cell::RefCell::new(Vec::new()); - let event_handler = |event: events::Event| events.borrow_mut().push(event); + let event_handler = |event: events::Event| Ok(events.borrow_mut().push(event)); self.process_pending_events(&event_handler); events.into_inner() } @@ -9347,7 +9384,7 @@ where /// using the given event handler. /// /// See the trait-level documentation of [`EventsProvider`] for requirements. - pub async fn process_pending_events_async Future>( + pub async fn process_pending_events_async>, H: Fn(Event) -> Future>( &self, handler: H ) { let mut ev; diff --git a/lightning/src/onion_message/functional_tests.rs b/lightning/src/onion_message/functional_tests.rs index 371b4f5879d..16e62bf33f4 100644 --- a/lightning/src/onion_message/functional_tests.rs +++ b/lightning/src/onion_message/functional_tests.rs @@ -307,7 +307,7 @@ fn disconnect_peers(node_a: &MessengerNode, node_b: &MessengerNode) { fn release_events(node: &MessengerNode) -> Vec { let events = core::cell::RefCell::new(Vec::new()); - node.messenger.process_pending_events(&|e| events.borrow_mut().push(e)); + node.messenger.process_pending_events(&|e| Ok(events.borrow_mut().push(e))); events.into_inner() } diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index 75d92610ae5..3a94d6c26de 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -18,7 +18,7 @@ use bitcoin::secp256k1::{self, PublicKey, Scalar, Secp256k1, SecretKey}; use crate::blinded_path::{BlindedPath, IntroductionNode, NextMessageHop, NodeIdLookUp}; use crate::blinded_path::message::{advance_path_by_one, ForwardNode, ForwardTlvs, MessageContext, OffersContext, ReceiveTlvs}; use crate::blinded_path::utils; -use crate::events::{Event, EventHandler, EventsProvider}; +use crate::events::{Event, EventHandler, EventsProvider, ReplayEvent}; use crate::sign::{EntropySource, NodeSigner, Recipient}; use crate::ln::features::{InitFeatures, NodeFeatures}; use crate::ln::msgs::{self, OnionMessage, OnionMessageHandler, SocketAddress}; @@ -31,6 +31,7 @@ use super::packet::OnionMessageContents; use super::packet::ParsedOnionMessageContents; use super::offers::OffersMessageHandler; use super::packet::{BIG_PACKET_HOP_DATA_LEN, ForwardControlTlvs, Packet, Payload, ReceiveControlTlvs, SMALL_PACKET_HOP_DATA_LEN}; +use crate::util::async_poll::{MultiResultFuturePoller, ResultFuture}; use crate::util::logger::{Logger, WithContext}; use crate::util::ser::Writeable; @@ -1328,7 +1329,7 @@ where /// have an ordering requirement. /// /// See the trait-level documentation of [`EventsProvider`] for requirements. - pub async fn process_pending_events_async + core::marker::Unpin, H: Fn(Event) -> Future>( + pub async fn process_pending_events_async> + core::marker::Unpin, H: Fn(Event) -> Future>( &self, handler: H ) { let mut intercepted_msgs = Vec::new(); @@ -1346,26 +1347,29 @@ where for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() { if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient { if let Some(addresses) = addresses.take() { - futures.push(Some(handler(Event::ConnectionNeeded { node_id: *node_id, addresses }))); + let future = ResultFuture::Pending(handler(Event::ConnectionNeeded { node_id: *node_id, addresses })); + futures.push(future); } } } for ev in intercepted_msgs { if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); } - futures.push(Some(handler(ev))); + let future = ResultFuture::Pending(handler(ev)); + futures.push(future); } // Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds - crate::util::async_poll::MultiFuturePoller(futures).await; + MultiResultFuturePoller::new(futures).await; if peer_connecteds.len() <= 1 { for event in peer_connecteds { handler(event).await; } } else { let mut futures = Vec::new(); for event in peer_connecteds { - futures.push(Some(handler(event))); + let future = ResultFuture::Pending(handler(event)); + futures.push(future); } - crate::util::async_poll::MultiFuturePoller(futures).await; + MultiResultFuturePoller::new(futures).await; } } } @@ -1409,7 +1413,7 @@ where for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() { if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient { if let Some(addresses) = addresses.take() { - handler.handle_event(Event::ConnectionNeeded { node_id: *node_id, addresses }); + let _ = handler.handle_event(Event::ConnectionNeeded { node_id: *node_id, addresses }); } } } diff --git a/lightning/src/util/async_poll.rs b/lightning/src/util/async_poll.rs index 7a368af7bae..c18ada73a47 100644 --- a/lightning/src/util/async_poll.rs +++ b/lightning/src/util/async_poll.rs @@ -15,29 +15,62 @@ use core::marker::Unpin; use core::pin::Pin; use core::task::{Context, Poll}; -pub(crate) struct MultiFuturePoller + Unpin>(pub Vec>); +pub(crate) enum ResultFuture>, E: Copy + Unpin> { + Pending(F), + Ready(Result<(), E>), +} + +pub(crate) struct MultiResultFuturePoller< + F: Future> + Unpin, + E: Copy + Unpin, +> { + futures_state: Vec>, +} -impl + Unpin> Future for MultiFuturePoller { - type Output = (); - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { +impl> + Unpin, E: Copy + Unpin> MultiResultFuturePoller { + pub fn new(futures_state: Vec>) -> Self { + Self { futures_state } + } +} + +impl> + Unpin, E: Copy + Unpin> Future + for MultiResultFuturePoller +{ + type Output = Vec>; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { let mut have_pending_futures = false; - for fut_option in self.get_mut().0.iter_mut() { - let mut fut = match fut_option.take() { - None => continue, - Some(fut) => fut, - }; - match Pin::new(&mut fut).poll(cx) { - Poll::Ready(()) => {}, - Poll::Pending => { - have_pending_futures = true; - *fut_option = Some(fut); + let futures_state = &mut self.get_mut().futures_state; + for state in futures_state.iter_mut() { + match state { + ResultFuture::Pending(ref mut fut) => match Pin::new(fut).poll(cx) { + Poll::Ready(res) => { + *state = ResultFuture::Ready(res); + }, + Poll::Pending => { + have_pending_futures = true; + }, }, + ResultFuture::Ready(_) => continue, } } + if have_pending_futures { Poll::Pending } else { - Poll::Ready(()) + let results = futures_state + .drain(..) + .filter_map(|e| match e { + ResultFuture::Ready(res) => Some(res), + ResultFuture::Pending(_) => { + debug_assert!( + false, + "All futures are expected to be ready if none are pending" + ); + None + }, + }) + .collect(); + Poll::Ready(results) } } } From f5cea0e57a72f3218eda355f7f67326756e85820 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 2 Jul 2024 12:04:33 +0200 Subject: [PATCH 3/5] Handle fallible events in `OnionMessenger` Previously, we would just fire-and-forget in `OnionMessenger`'s event handling. Since we now introduced the possibility of event handling failures, we here adapt the event handling logic to retain any events which we failed to handle to have them replayed upon the next invocation of `process_pending_events`/`process_pending_events_async`. --- lightning/src/onion_message/messenger.rs | 124 ++++++++++++++++------- 1 file changed, 87 insertions(+), 37 deletions(-) diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index 3a94d6c26de..7c7cd261089 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -37,6 +37,7 @@ use crate::util::ser::Writeable; use core::fmt; use core::ops::Deref; +use core::sync::atomic::{AtomicBool, Ordering}; use crate::io; use crate::sync::Mutex; use crate::prelude::*; @@ -264,6 +265,7 @@ pub struct OnionMessenger< intercept_messages_for_offline_peers: bool, pending_intercepted_msgs_events: Mutex>, pending_peer_connected_events: Mutex>, + pending_events_processor: AtomicBool, } /// [`OnionMessage`]s buffered to be sent. @@ -1018,6 +1020,28 @@ where } } +macro_rules! drop_handled_events_and_abort { ($self: expr, $res: expr, $offset: expr, $event_queue: expr) => { + // We want to make sure to cleanly abort upon event handling failure. To this end, we drop all + // successfully handled events from the given queue, reset the events processing flag, and + // return, to have the events eventually replayed upon next invocation. + { + let mut queue_lock = $event_queue.lock().unwrap(); + + // We skip `$offset` result entries to reach the ones relevant for the given `$event_queue`. + let mut res_iter = $res.iter().skip($offset); + + // Keep all events which previously error'd *or* any that have been added since we dropped + // the Mutex before. + queue_lock.retain(|_| res_iter.next().map_or(true, |r| r.is_err())); + + if $res.iter().any(|r| r.is_err()) { + // We failed handling some events. Return to have them eventually replayed. + $self.pending_events_processor.store(false, Ordering::Release); + return; + } + } +}} + impl OnionMessenger where @@ -1094,6 +1118,7 @@ where intercept_messages_for_offline_peers, pending_intercepted_msgs_events: Mutex::new(Vec::new()), pending_peer_connected_events: Mutex::new(Vec::new()), + pending_events_processor: AtomicBool::new(false), } } @@ -1332,45 +1357,60 @@ where pub async fn process_pending_events_async> + core::marker::Unpin, H: Fn(Event) -> Future>( &self, handler: H ) { - let mut intercepted_msgs = Vec::new(); - let mut peer_connecteds = Vec::new(); - { - let mut pending_intercepted_msgs_events = - self.pending_intercepted_msgs_events.lock().unwrap(); - let mut pending_peer_connected_events = - self.pending_peer_connected_events.lock().unwrap(); - core::mem::swap(&mut *pending_intercepted_msgs_events, &mut intercepted_msgs); - core::mem::swap(&mut *pending_peer_connected_events, &mut peer_connecteds); + if self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() { + return; } - let mut futures = Vec::with_capacity(intercepted_msgs.len()); - for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() { - if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient { - if let Some(addresses) = addresses.take() { - let future = ResultFuture::Pending(handler(Event::ConnectionNeeded { node_id: *node_id, addresses })); - futures.push(future); + { + let intercepted_msgs = self.pending_intercepted_msgs_events.lock().unwrap().clone(); + let mut futures = Vec::with_capacity(intercepted_msgs.len()); + for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() { + if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient { + if let Some(addresses) = addresses.take() { + let future = ResultFuture::Pending(handler(Event::ConnectionNeeded { node_id: *node_id, addresses })); + futures.push(future); + } } } - } - for ev in intercepted_msgs { - if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); } - let future = ResultFuture::Pending(handler(ev)); - futures.push(future); - } - // Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds - MultiResultFuturePoller::new(futures).await; + // The offset in the `futures` vec at which `intercepted_msgs` start. We don't bother + // replaying `ConnectionNeeded` events. + let intercepted_msgs_offset = futures.len(); - if peer_connecteds.len() <= 1 { - for event in peer_connecteds { handler(event).await; } - } else { - let mut futures = Vec::new(); - for event in peer_connecteds { - let future = ResultFuture::Pending(handler(event)); + for ev in intercepted_msgs { + if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); } + let future = ResultFuture::Pending(handler(ev)); futures.push(future); } - MultiResultFuturePoller::new(futures).await; + // Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds + let res = MultiResultFuturePoller::new(futures).await; + drop_handled_events_and_abort!(self, res, intercepted_msgs_offset, self.pending_intercepted_msgs_events); } + + { + let peer_connecteds = self.pending_peer_connected_events.lock().unwrap().clone(); + let num_peer_connecteds = peer_connecteds.len(); + if num_peer_connecteds <= 1 { + for event in peer_connecteds { + if handler(event).await.is_ok() { + self.pending_peer_connected_events.lock().unwrap().drain(..num_peer_connecteds); + } else { + // We failed handling the event. Return to have it eventually replayed. + self.pending_events_processor.store(false, Ordering::Release); + return; + } + } + } else { + let mut futures = Vec::new(); + for event in peer_connecteds { + let future = ResultFuture::Pending(handler(event)); + futures.push(future); + } + let res = MultiResultFuturePoller::new(futures).await; + drop_handled_events_and_abort!(self, res, 0, self.pending_peer_connected_events); + } + } + self.pending_events_processor.store(false, Ordering::Release); } } @@ -1410,6 +1450,10 @@ where CMH::Target: CustomOnionMessageHandler, { fn process_pending_events(&self, handler: H) where H::Target: EventHandler { + if self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() { + return; + } + for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() { if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient { if let Some(addresses) = addresses.take() { @@ -1417,10 +1461,13 @@ where } } } - let mut events = Vec::new(); + let intercepted_msgs; + let peer_connecteds; { - let mut pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap(); + let pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap(); + intercepted_msgs = pending_intercepted_msgs_events.clone(); let mut pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap(); + peer_connecteds = pending_peer_connected_events.clone(); #[cfg(debug_assertions)] { for ev in pending_intercepted_msgs_events.iter() { if let Event::OnionMessageIntercepted { .. } = ev {} else { panic!(); } @@ -1429,13 +1476,16 @@ where if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); } } } - core::mem::swap(&mut *pending_intercepted_msgs_events, &mut events); - events.append(&mut pending_peer_connected_events); pending_peer_connected_events.shrink_to(10); // Limit total heap usage } - for ev in events { - handler.handle_event(ev); - } + + let res = intercepted_msgs.into_iter().map(|ev| handler.handle_event(ev)).collect::>(); + drop_handled_events_and_abort!(self, res, 0, self.pending_intercepted_msgs_events); + + let res = peer_connecteds.into_iter().map(|ev| handler.handle_event(ev)).collect::>(); + drop_handled_events_and_abort!(self, res, 0, self.pending_peer_connected_events); + + self.pending_events_processor.store(false, Ordering::Release); } } From 8599bc9784e572b4f5c77d1f6855bf8d705196c1 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Mon, 8 Jul 2024 14:51:03 +0200 Subject: [PATCH 4/5] Add simple test for event replaying --- lightning-background-processor/src/lib.rs | 53 ++++++++++++++++++++++- 1 file changed, 52 insertions(+), 1 deletion(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 10f5ada505e..314de746508 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -1019,10 +1019,13 @@ mod tests { use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey}; use bitcoin::transaction::Version; use bitcoin::{Amount, ScriptBuf, Txid}; + use core::sync::atomic::{AtomicBool, Ordering}; use lightning::chain::channelmonitor::ANTI_REORG_DELAY; use lightning::chain::transaction::OutPoint; use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter}; - use lightning::events::{Event, MessageSendEvent, MessageSendEventsProvider, PathFailure}; + use lightning::events::{ + Event, MessageSendEvent, MessageSendEventsProvider, PathFailure, ReplayEvent, + }; use lightning::ln::channelmanager; use lightning::ln::channelmanager::{ ChainParameters, PaymentId, BREAKDOWN_TIMEOUT, MIN_CLTV_EXPIRY_DELTA, @@ -2228,6 +2231,54 @@ mod tests { } } + #[test] + fn test_event_handling_failures_are_replayed() { + let (_, nodes) = create_nodes(2, "test_event_handling_failures_are_replayed"); + let channel_value = 100000; + let data_dir = nodes[0].kv_store.get_data_dir(); + let persister = Arc::new(Persister::new(data_dir.clone())); + + let (first_event_send, first_event_recv) = std::sync::mpsc::sync_channel(1); + let (second_event_send, second_event_recv) = std::sync::mpsc::sync_channel(1); + let should_fail_event_handling = Arc::new(AtomicBool::new(true)); + let event_handler = move |event: Event| { + if let Ok(true) = should_fail_event_handling.compare_exchange( + true, + false, + Ordering::Acquire, + Ordering::Relaxed, + ) { + first_event_send.send(event).unwrap(); + return Err(ReplayEvent()); + } + + second_event_send.send(event).unwrap(); + Ok(()) + }; + + let bg_processor = BackgroundProcessor::start( + persister, + event_handler, + nodes[0].chain_monitor.clone(), + nodes[0].node.clone(), + Some(nodes[0].messenger.clone()), + nodes[0].no_gossip_sync(), + nodes[0].peer_manager.clone(), + nodes[0].logger.clone(), + Some(nodes[0].scorer.clone()), + ); + + begin_open_channel!(nodes[0], nodes[1], channel_value); + assert_eq!( + first_event_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE)), + second_event_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE)) + ); + + if !std::thread::panicking() { + bg_processor.stop().unwrap(); + } + } + #[test] fn test_scorer_persistence() { let (_, nodes) = create_nodes(2, "test_scorer_persistence"); From e617a394e8c4a6339be48688c0a0a531f778fdbb Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Mon, 8 Jul 2024 15:43:18 +0200 Subject: [PATCH 5/5] Document `Failure Behavior and Persistence` for every event type --- lightning/src/events/mod.rs | 103 +++++++++++++++++++++++++++++++++++- 1 file changed, 102 insertions(+), 1 deletion(-) diff --git a/lightning/src/events/mod.rs b/lightning/src/events/mod.rs index e51a0972cdc..0e24e8e82f7 100644 --- a/lightning/src/events/mod.rs +++ b/lightning/src/events/mod.rs @@ -551,6 +551,10 @@ pub enum Event { /// Note that *all inputs* in the funding transaction must spend SegWit outputs or your /// counterparty can steal your funds! /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`), but won't be persisted across restarts. + /// /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager /// [`ChannelManager::funding_transaction_generated`]: crate::ln::channelmanager::ChannelManager::funding_transaction_generated FundingGenerationReady { @@ -608,6 +612,10 @@ pub enum Event { /// # Note /// This event used to be called `PaymentReceived` in LDK versions 0.0.112 and earlier. /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. + /// /// [`ChannelManager::claim_funds`]: crate::ln::channelmanager::ChannelManager::claim_funds /// [`ChannelManager::claim_funds_with_known_custom_tlvs`]: crate::ln::channelmanager::ChannelManager::claim_funds_with_known_custom_tlvs /// [`FailureCode::InvalidOnionPayload`]: crate::ln::channelmanager::FailureCode::InvalidOnionPayload @@ -677,6 +685,10 @@ pub enum Event { /// [`ChannelManager::claim_funds`] twice for the same [`Event::PaymentClaimable`] you may get /// multiple `PaymentClaimed` events. /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. + /// /// [`ChannelManager::claim_funds`]: crate::ln::channelmanager::ChannelManager::claim_funds PaymentClaimed { /// The node that received the payment. @@ -716,6 +728,10 @@ pub enum Event { /// This event will not be generated for onion message forwards; only for sends including /// replies. Handlers should connect to the node otherwise any buffered messages may be lost. /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`), but won't be persisted across restarts. + /// /// [`OnionMessage`]: msgs::OnionMessage /// [`MessageRouter`]: crate::onion_message::messenger::MessageRouter /// [`Destination`]: crate::onion_message::messenger::Destination @@ -730,6 +746,10 @@ pub enum Event { /// or was explicitly abandoned by [`ChannelManager::abandon_payment`]. This may be for an /// [`InvoiceRequest`] sent for an [`Offer`] or for a [`Refund`] that hasn't been redeemed. /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. + /// /// [`ChannelManager::abandon_payment`]: crate::ln::channelmanager::ChannelManager::abandon_payment /// [`InvoiceRequest`]: crate::offers::invoice_request::InvoiceRequest /// [`Offer`]: crate::offers::offer::Offer @@ -746,6 +766,10 @@ pub enum Event { /// [`ChannelManager::abandon_payment`] to abandon the associated payment. See those docs for /// further details. /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. + /// /// [`InvoiceRequest`]: crate::offers::invoice_request::InvoiceRequest /// [`Refund`]: crate::offers::refund::Refund /// [`UserConfig::manually_handle_bolt12_invoices`]: crate::util::config::UserConfig::manually_handle_bolt12_invoices @@ -768,6 +792,10 @@ pub enum Event { /// /// Note for MPP payments: in rare cases, this event may be preceded by a `PaymentPathFailed` /// event. In this situation, you SHOULD treat this payment as having succeeded. + /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. PaymentSent { /// The `payment_id` passed to [`ChannelManager::send_payment`]. /// @@ -806,6 +834,10 @@ pub enum Event { /// received and processed. In this case, the [`Event::PaymentFailed`] event MUST be ignored, /// and the payment MUST be treated as having succeeded. /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. + /// /// [`Retry`]: crate::ln::channelmanager::Retry /// [`ChannelManager::abandon_payment`]: crate::ln::channelmanager::ChannelManager::abandon_payment PaymentFailed { @@ -825,6 +857,10 @@ pub enum Event { /// /// Always generated after [`Event::PaymentSent`] and thus useful for scoring channels. See /// [`Event::PaymentSent`] for obtaining the payment preimage. + /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. PaymentPathSuccessful { /// The `payment_id` passed to [`ChannelManager::send_payment`]. /// @@ -850,6 +886,10 @@ pub enum Event { /// See [`ChannelManager::abandon_payment`] for giving up on this payment before its retries have /// been exhausted. /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. + /// /// [`ChannelManager::abandon_payment`]: crate::ln::channelmanager::ChannelManager::abandon_payment PaymentPathFailed { /// The `payment_id` passed to [`ChannelManager::send_payment`]. @@ -889,6 +929,10 @@ pub enum Event { error_data: Option>, }, /// Indicates that a probe payment we sent returned successful, i.e., only failed at the destination. + /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. ProbeSuccessful { /// The id returned by [`ChannelManager::send_probe`]. /// @@ -902,6 +946,10 @@ pub enum Event { path: Path, }, /// Indicates that a probe payment we sent failed at an intermediary node on the path. + /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. ProbeFailed { /// The id returned by [`ChannelManager::send_probe`]. /// @@ -923,6 +971,10 @@ pub enum Event { /// Used to indicate that [`ChannelManager::process_pending_htlc_forwards`] should be called at /// a time in the future. /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be regenerated after restarts. + /// /// [`ChannelManager::process_pending_htlc_forwards`]: crate::ln::channelmanager::ChannelManager::process_pending_htlc_forwards PendingHTLCsForwardable { /// The minimum amount of time that should be waited prior to calling @@ -939,6 +991,10 @@ pub enum Event { /// [`ChannelManager::fail_intercepted_htlc`] MUST be called in response to this event. See /// their docs for more information. /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. + /// /// [`ChannelManager::get_intercept_scid`]: crate::ln::channelmanager::ChannelManager::get_intercept_scid /// [`UserConfig::accept_intercept_htlcs`]: crate::util::config::UserConfig::accept_intercept_htlcs /// [`ChannelManager::forward_intercepted_htlc`]: crate::ln::channelmanager::ChannelManager::forward_intercepted_htlc @@ -974,6 +1030,10 @@ pub enum Event { /// You may hand them to the [`OutputSweeper`] utility which will store and (re-)generate spending /// transactions for you. /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. + /// /// [`OutputSweeper`]: crate::util::sweep::OutputSweeper SpendableOutputs { /// The outputs which you should store as spendable by you. @@ -985,6 +1045,10 @@ pub enum Event { }, /// This event is generated when a payment has been successfully forwarded through us and a /// forwarding fee earned. + /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. PaymentForwarded { /// The channel id of the incoming channel between the previous node and us. /// @@ -1046,6 +1110,10 @@ pub enum Event { /// This event is emitted when the funding transaction has been signed and is broadcast to the /// network. For 0conf channels it will be immediately followed by the corresponding /// [`Event::ChannelReady`] event. + /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. ChannelPending { /// The `channel_id` of the channel that is pending confirmation. channel_id: ChannelId, @@ -1075,6 +1143,10 @@ pub enum Event { /// be used. This event is emitted either when the funding transaction has been confirmed /// on-chain, or, in case of a 0conf channel, when both parties have confirmed the channel /// establishment. + /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. ChannelReady { /// The `channel_id` of the channel that is ready. channel_id: ChannelId, @@ -1101,6 +1173,10 @@ pub enum Event { /// /// [`ChannelManager::accept_inbound_channel`]: crate::ln::channelmanager::ChannelManager::accept_inbound_channel /// [`UserConfig::manually_accept_inbound_channels`]: crate::util::config::UserConfig::manually_accept_inbound_channels + /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. ChannelClosed { /// The `channel_id` of the channel which has been closed. Note that on-chain transactions /// resolving the channel are likely still awaiting confirmation. @@ -1135,6 +1211,10 @@ pub enum Event { /// inputs for another purpose. /// /// This event is not guaranteed to be generated for channels that are closed due to a restart. + /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. DiscardFunding { /// The channel_id of the channel which has been closed. channel_id: ChannelId, @@ -1150,6 +1230,10 @@ pub enum Event { /// The event is only triggered when a new open channel request is received and the /// [`UserConfig::manually_accept_inbound_channels`] config flag is set to true. /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. + /// /// [`ChannelManager::accept_inbound_channel`]: crate::ln::channelmanager::ChannelManager::accept_inbound_channel /// [`ChannelManager::force_close_without_broadcasting_txn`]: crate::ln::channelmanager::ChannelManager::force_close_without_broadcasting_txn /// [`UserConfig::manually_accept_inbound_channels`]: crate::util::config::UserConfig::manually_accept_inbound_channels @@ -1206,6 +1290,10 @@ pub enum Event { /// /// This event, however, does not get generated if an HTLC fails to meet the forwarding /// requirements (i.e. insufficient fees paid, or a CLTV that is too soon). + /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. HTLCHandlingFailed { /// The channel over which the HTLC was received. prev_channel_id: ChannelId, @@ -1219,6 +1307,10 @@ pub enum Event { /// [`ChannelHandshakeConfig::negotiate_anchors_zero_fee_htlc_tx`] config flag is set to true. /// It is limited to the scope of channels with anchor outputs. /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`), but will only be regenerated as needed after restarts. + /// /// [`ChannelHandshakeConfig::negotiate_anchors_zero_fee_htlc_tx`]: crate::util::config::ChannelHandshakeConfig::negotiate_anchors_zero_fee_htlc_tx BumpTransaction(BumpTransactionEvent), /// We received an onion message that is intended to be forwarded to a peer @@ -1226,6 +1318,10 @@ pub enum Event { /// `OnionMessenger` was initialized with /// [`OnionMessenger::new_with_offline_peer_interception`], see its docs. /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`), but won't be persisted across restarts. + /// /// [`OnionMessenger::new_with_offline_peer_interception`]: crate::onion_message::messenger::OnionMessenger::new_with_offline_peer_interception OnionMessageIntercepted { /// The node id of the offline peer. @@ -1239,6 +1335,10 @@ pub enum Event { /// initialized with /// [`OnionMessenger::new_with_offline_peer_interception`], see its docs. /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`), but won't be persisted across restarts. + /// /// [`OnionMessenger::new_with_offline_peer_interception`]: crate::onion_message::messenger::OnionMessenger::new_with_offline_peer_interception OnionMessagePeerConnected { /// The node id of the peer we just connected to, who advertises support for @@ -2335,7 +2435,8 @@ pub trait EventsProvider { /// An error type that may be returned to LDK in order to safely abort event handling if it can't /// currently succeed (e.g., due to a persistence failure). /// -/// LDK will ensure the event is persisted and will eventually be replayed. +/// Depending on the type, LDK may ensure the event is persisted and will eventually be replayed. +/// Please refer to the documentation of each [`Event`] variant for more details. #[derive(Clone, Copy, Debug)] pub struct ReplayEvent();