Skip to content

Commit

Permalink
Merge pull request #3193 from tnull/2024-07-2995-followups
Browse files Browse the repository at this point in the history
#2995 followups
  • Loading branch information
tnull authored Aug 22, 2024
2 parents dced69d + 53a616b commit 0d7ae86
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 18 deletions.
4 changes: 2 additions & 2 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2321,8 +2321,8 @@ mod tests {

begin_open_channel!(nodes[0], nodes[1], channel_value);
assert_eq!(
first_event_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE)),
second_event_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
first_event_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE)).unwrap(),
second_event_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE)).unwrap()
);

if !std::thread::panicking() {
Expand Down
5 changes: 3 additions & 2 deletions lightning/src/events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -776,8 +776,9 @@ pub enum Event {
/// replies. Handlers should connect to the node otherwise any buffered messages may be lost.
///
/// # Failure Behavior and Persistence
/// This event will eventually be replayed after failures-to-handle (i.e., the event handler
/// returning `Err(ReplayEvent ())`), but won't be persisted across restarts.
/// This event won't be replayed after failures-to-handle
/// (i.e., the event handler returning `Err(ReplayEvent ())`), and also won't be persisted
/// across restarts.
///
/// [`OnionMessage`]: msgs::OnionMessage
/// [`MessageRouter`]: crate::onion_message::messenger::MessageRouter
Expand Down
67 changes: 53 additions & 14 deletions lightning/src/onion_message/messenger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1047,21 +1047,25 @@ where
}
}

macro_rules! drop_handled_events_and_abort { ($self: expr, $res: expr, $offset: expr, $event_queue: expr) => {
macro_rules! drop_handled_events_and_abort { ($self: expr, $res_iter: expr, $event_queue: expr) => {
// We want to make sure to cleanly abort upon event handling failure. To this end, we drop all
// successfully handled events from the given queue, reset the events processing flag, and
// return, to have the events eventually replayed upon next invocation.
{
let mut queue_lock = $event_queue.lock().unwrap();

// We skip `$offset` result entries to reach the ones relevant for the given `$event_queue`.
let mut res_iter = $res.iter().skip($offset);

// Keep all events which previously error'd *or* any that have been added since we dropped
// the Mutex before.
queue_lock.retain(|_| res_iter.next().map_or(true, |r| r.is_err()));
let mut any_error = false;
queue_lock.retain(|_| {
$res_iter.next().map_or(true, |r| {
let is_err = r.is_err();
any_error |= is_err;
is_err
})
});

if $res.iter().any(|r| r.is_err()) {
if any_error {
// We failed handling some events. Return to have them eventually replayed.
$self.pending_events_processor.store(false, Ordering::Release);
$self.event_notifier.notify();
Expand Down Expand Up @@ -1426,7 +1430,8 @@ where
}
// Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
let res = MultiResultFuturePoller::new(futures).await;
drop_handled_events_and_abort!(self, res, intercepted_msgs_offset, self.pending_intercepted_msgs_events);
let mut res_iter = res.iter().skip(intercepted_msgs_offset);
drop_handled_events_and_abort!(self, res_iter, self.pending_intercepted_msgs_events);
}

{
Expand All @@ -1449,7 +1454,8 @@ where
futures.push(future);
}
let res = MultiResultFuturePoller::new(futures).await;
drop_handled_events_and_abort!(self, res, 0, self.pending_peer_connected_events);
let mut res_iter = res.iter();
drop_handled_events_and_abort!(self, res_iter, self.pending_peer_connected_events);
}
}
self.pending_events_processor.store(false, Ordering::Release);
Expand Down Expand Up @@ -1508,7 +1514,7 @@ where
{
let pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap();
intercepted_msgs = pending_intercepted_msgs_events.clone();
let mut pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap();
let pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap();
peer_connecteds = pending_peer_connected_events.clone();
#[cfg(debug_assertions)] {
for ev in pending_intercepted_msgs_events.iter() {
Expand All @@ -1518,14 +1524,47 @@ where
if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); }
}
}
pending_peer_connected_events.shrink_to(10); // Limit total heap usage
}

let res = intercepted_msgs.into_iter().map(|ev| handler.handle_event(ev)).collect::<Vec<_>>();
drop_handled_events_and_abort!(self, res, 0, self.pending_intercepted_msgs_events);
let mut handling_intercepted_msgs_failed = false;
let mut num_handled_intercepted_events = 0;
for ev in intercepted_msgs {
match handler.handle_event(ev) {
Ok(()) => num_handled_intercepted_events += 1,
Err(ReplayEvent ()) => {
handling_intercepted_msgs_failed = true;
break;
}
}
}

{
let mut pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap();
pending_intercepted_msgs_events.drain(..num_handled_intercepted_events);
}

let res = peer_connecteds.into_iter().map(|ev| handler.handle_event(ev)).collect::<Vec<_>>();
drop_handled_events_and_abort!(self, res, 0, self.pending_peer_connected_events);
if handling_intercepted_msgs_failed {
self.pending_events_processor.store(false, Ordering::Release);
self.event_notifier.notify();
return;
}

let mut num_handled_peer_connecteds = 0;
for ev in peer_connecteds {
match handler.handle_event(ev) {
Ok(()) => num_handled_peer_connecteds += 1,
Err(ReplayEvent ()) => {
self.event_notifier.notify();
break;
}
}
}

{
let mut pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap();
pending_peer_connected_events.drain(..num_handled_peer_connecteds);
pending_peer_connected_events.shrink_to(10); // Limit total heap usage
}

self.pending_events_processor.store(false, Ordering::Release);
}
Expand Down

0 comments on commit 0d7ae86

Please sign in to comment.