Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Beat [2/4]: implement blockbeat #8894

Merged
merged 19 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
0f8a524
chainio: introduce `chainio` to handle block synchronization
yyforyongyu Jun 27, 2024
e263ece
chainio: implement `Blockbeat`
yyforyongyu Jun 27, 2024
373b795
chainio: add helper methods to dispatch beats
yyforyongyu Oct 31, 2024
797c10e
chainio: add `BlockbeatDispatcher` to dispatch blockbeats
yyforyongyu Jun 27, 2024
5d91b5e
chainio: add partial implementation of `Consumer` interface
yyforyongyu Oct 17, 2024
269e37c
multi: implement `Consumer` on subsystems
yyforyongyu Oct 29, 2024
09ad818
sweep: remove block subscription in `UtxoSweeper` and `TxPublisher`
yyforyongyu Jun 4, 2024
6a6872a
sweep: remove redundant notifications during shutdown
yyforyongyu Nov 18, 2024
eed75c7
contractcourt: remove `waitForHeight` in resolvers
yyforyongyu Jun 4, 2024
627065c
contractcourt: remove block subscription in chain arbitrator
yyforyongyu Oct 29, 2024
bb494da
contractcourt: remove block subscription in channel arbitrator
yyforyongyu Oct 29, 2024
2c153b5
contractcourt: remove the `immediate` param used in `Resolve`
yyforyongyu Jun 4, 2024
6daea9c
contractcourt: start channel arbitrator with blockbeat
yyforyongyu Oct 29, 2024
48dd2ea
multi: start consumers with a starting blockbeat
yyforyongyu Oct 29, 2024
197f1e8
lnd: add new method `startLowLevelServices`
yyforyongyu Oct 17, 2024
07872fc
lnd: start `blockbeatDispatcher` and register consumers
yyforyongyu Oct 17, 2024
8bd6153
contractcourt: fix linter `funlen`
yyforyongyu Oct 29, 2024
b63855d
multi: improve loggings
yyforyongyu May 22, 2024
9ab2e53
chainio: use `errgroup` to limit num of goroutines
yyforyongyu Nov 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions chainio/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# Chainio
yyforyongyu marked this conversation as resolved.
Show resolved Hide resolved

`chainio` is a package designed to provide blockchain data access to various
subsystems within `lnd`. When a new block is received, it is encapsulated in a
`Blockbeat` object and disseminated to all registered consumers. Consumers may
receive these updates either concurrently or sequentially, based on their
registration configuration, ensuring that each subsystem maintains a
synchronized view of the current block state.

The main components include:

- `Blockbeat`: An interface that provides information about the block.

- `Consumer`: An interface that specifies how subsystems handle the blockbeat.

- `BlockbeatDispatcher`: The core service responsible for receiving each block
and distributing it to all consumers.

Additionally, the `BeatConsumer` struct provides a partial implementation of
the `Consumer` interface. This struct helps reduce code duplication, allowing
subsystems to avoid re-implementing the `ProcessBlock` method and provides a
commonly used `NotifyBlockProcessed` method.


### Register a Consumer

Consumers within the same queue are notified **sequentially**, while all queues
are notified **concurrently**. A queue consists of a slice of consumers, which
are notified in left-to-right order. Developers are responsible for determining
dependencies in block consumption across subsystems: independent subsystems
should be notified concurrently, whereas dependent subsystems should be
notified sequentially.

To notify the consumers concurrently, put them in different queues,
```go
// consumer1 and consumer2 will be notified concurrently.
queue1 := []chainio.Consumer{consumer1}
blockbeatDispatcher.RegisterQueue(consumer1)

queue2 := []chainio.Consumer{consumer2}
blockbeatDispatcher.RegisterQueue(consumer2)
```

To notify the consumers sequentially, put them in the same queue,
```go
// consumers will be notified sequentially via,
// consumer1 -> consumer2 -> consumer3
queue := []chainio.Consumer{
consumer1,
consumer2,
consumer3,
}
blockbeatDispatcher.RegisterQueue(queue)
```

### Implement the `Consumer` Interface

Implementing the `Consumer` interface is straightforward. Below is an example
of how
[`sweep.TxPublisher`](https://github.com/lightningnetwork/lnd/blob/5cec466fad44c582a64cfaeb91f6d5fd302fcf85/sweep/fee_bumper.go#L310)
implements this interface.

To start, embed the partial implementation `chainio.BeatConsumer`, which
already provides the `ProcessBlock` implementation and commonly used
`NotifyBlockProcessed` method, and exposes `BlockbeatChan` for the consumer to
receive blockbeats.

```go
type TxPublisher struct {
started atomic.Bool
stopped atomic.Bool

chainio.BeatConsumer

...
```

We should also remember to initialize this `BeatConsumer`,

```go
...
// Mount the block consumer.
tp.BeatConsumer = chainio.NewBeatConsumer(tp.quit, tp.Name())
```

Finally, in the main event loop, read from `BlockbeatChan`, process the
received blockbeat, and, crucially, call `tp.NotifyBlockProcessed` to inform
the blockbeat dispatcher that processing is complete.

```go
for {
select {
case beat := <-tp.BlockbeatChan:
// Consume this blockbeat, usually it means updating the subsystem
// using the new block data.

// Notify we've processed the block.
tp.NotifyBlockProcessed(beat, nil)

...
```

### Existing Queues

Currently, we have a single queue of consumers dedicated to handling force
closures. This queue includes `ChainArbitrator`, `UtxoSweeper`, and
`TxPublisher`, with `ChainArbitrator` managing two internal consumers:
ziggie1984 marked this conversation as resolved.
Show resolved Hide resolved
`chainWatcher` and `ChannelArbitrator`. The blockbeat flows sequentially
through the chain as follows: `ChainArbitrator => chainWatcher =>
ChannelArbitrator => UtxoSweeper => TxPublisher`. The following diagram
illustrates the flow within the public subsystems.

```mermaid
sequenceDiagram
autonumber
participant bb as BlockBeat
participant cc as ChainArb
participant us as UtxoSweeper
participant tp as TxPublisher

note left of bb: 0. received block x,<br>dispatching...

note over bb,cc: 1. send block x to ChainArb,<br>wait for its done signal
bb->>cc: block x
rect rgba(165, 0, 85, 0.8)
critical signal processed
cc->>bb: processed block
option Process error or timeout
bb->>bb: error and exit
end
end

note over bb,us: 2. send block x to UtxoSweeper, wait for its done signal
bb->>us: block x
rect rgba(165, 0, 85, 0.8)
critical signal processed
us->>bb: processed block
option Process error or timeout
bb->>bb: error and exit
end
end

note over bb,tp: 3. send block x to TxPublisher, wait for its done signal
bb->>tp: block x
rect rgba(165, 0, 85, 0.8)
critical signal processed
tp->>bb: processed block
option Process error or timeout
bb->>bb: error and exit
end
end
```
55 changes: 55 additions & 0 deletions chainio/blockbeat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package chainio

import (
"fmt"

"github.com/btcsuite/btclog/v2"
"github.com/lightningnetwork/lnd/build"
"github.com/lightningnetwork/lnd/chainntnfs"
)

// Beat implements the Blockbeat interface. It contains the block epoch and a
// customized logger.
//
// TODO(yy): extend this to check for confirmation status - which serves as the
// single source of truth, to avoid the potential race between receiving blocks
ziggie1984 marked this conversation as resolved.
Show resolved Hide resolved
// and `GetTransactionDetails/RegisterSpendNtfn/RegisterConfirmationsNtfn`.
type Beat struct {
// epoch is the current block epoch the blockbeat is aware of.
epoch chainntnfs.BlockEpoch

// log is the customized logger for the blockbeat which prints the
// block height.
log btclog.Logger
}

// Compile-time check to ensure Beat satisfies the Blockbeat interface.
var _ Blockbeat = (*Beat)(nil)

// NewBeat creates a new beat with the specified block epoch and a customized
// logger.
func NewBeat(epoch chainntnfs.BlockEpoch) *Beat {
b := &Beat{
epoch: epoch,
}

// Create a customized logger for the blockbeat.
logPrefix := fmt.Sprintf("Height[%6d]:", b.Height())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can use the advantages of structured logging here already @ellemouton ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed offline and the conclusion here was that we should do this as a follow up just to avoid all the rebase conflicts

(although.. @yyforyongyu - do the follow-ups actually touch this main beat logic? cause if not then there might not be any conflicts to worry about)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but yeah - agreed that just having a height=x kv pair is nicer at the end of the day. Another reason to pass a context through where we can here.

but yeah - if too many merge conflicts, then a follow up is all good

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I also wanna try it - does it mean we don't need the prefix logger anymore? Is it possible we could have something like this being logged?

2024-11-19 10:30:29.467 [DBG] CHIO prefix_log.go:51: Height[871039]: Notifying queue=1 with 3 consumers

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should go with structured logging from the beginning, this might also faciliated the structure and we do not need a logger object in the blockbeat.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's tricky cause we need to do proper context usage first which is hard to contain to just a single system...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it mean we don't need the prefix logger anymore

yeah we can eventually remove the prefix logger. I think we just need to get used to the k=v pairs at the end & of the log line that we can filter on instead of the prefix. Prefix i think should just be the subsystem.

b.log = build.NewPrefixLog(logPrefix, clog)
ziggie1984 marked this conversation as resolved.
Show resolved Hide resolved

return b
}

// Height returns the height of the block epoch.
//
// NOTE: Part of the Blockbeat interface.
func (b *Beat) Height() int32 {
return b.epoch.Height
}

// logger returns the logger for the blockbeat.
//
// NOTE: Part of the private blockbeat interface.
func (b *Beat) logger() btclog.Logger {
return b.log
}
28 changes: 28 additions & 0 deletions chainio/blockbeat_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package chainio

import (
"errors"
"testing"

"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/stretchr/testify/require"
)

var errDummy = errors.New("dummy error")

// TestNewBeat tests the NewBeat and Height functions.
func TestNewBeat(t *testing.T) {
t.Parallel()

// Create a testing epoch.
epoch := chainntnfs.BlockEpoch{
Height: 1,
}

// Create the beat and check the internal state.
beat := NewBeat(epoch)
require.Equal(t, epoch, beat.epoch)

// Check the height function.
require.Equal(t, epoch.Height, beat.Height())
}
113 changes: 113 additions & 0 deletions chainio/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package chainio

// BeatConsumer defines a supplementary component that should be used by
// subsystems which implement the `Consumer` interface. It partially implements
// the `Consumer` interface by providing the method `ProcessBlock` such that
// subsystems don't need to re-implement it.
//
// While inheritance is not commonly used in Go, subsystems embedding this
// struct cannot pass the interface check for `Consumer` because the `Name`
// method is not implemented, which gives us a "mortise and tenon" structure.
// In addition to reducing code duplication, this design allows `ProcessBlock`
// to work on the concrete type `Beat` to access its internal states.
type BeatConsumer struct {
// BlockbeatChan is a channel to receive blocks from Blockbeat. The
// received block contains the best known height and the txns confirmed
// in this block.
BlockbeatChan chan Blockbeat

// name is the name of the consumer which embeds the BlockConsumer.
name string

// quit is a channel that closes when the BlockConsumer is shutting
// down.
//
// NOTE: this quit channel should be mounted to the same quit channel
// used by the subsystem.
quit chan struct{}

// errChan is a buffered chan that receives an error returned from
// processing this block.
errChan chan error
}

// NewBeatConsumer creates a new BlockConsumer.
func NewBeatConsumer(quit chan struct{}, name string) BeatConsumer {
ziggie1984 marked this conversation as resolved.
Show resolved Hide resolved
// Refuse to start `lnd` if the quit channel is not initialized. We
// treat this case as if we are facing a nil pointer dereference, as
// there's no point to return an error here, which will cause the node
// to fail to be started anyway.
if quit == nil {
panic("quit channel is nil")
}

b := BeatConsumer{
BlockbeatChan: make(chan Blockbeat),
name: name,
errChan: make(chan error, 1),
quit: quit,
}

return b
}

// ProcessBlock takes a blockbeat and sends it to the consumer's blockbeat
// channel. It will send it to the subsystem's BlockbeatChan, and block until
// the processed result is received from the subsystem. The subsystem must call
// `NotifyBlockProcessed` after it has finished processing the block.
//
// NOTE: part of the `chainio.Consumer` interface.
func (b *BeatConsumer) ProcessBlock(beat Blockbeat) error {
// Update the current height.
beat.logger().Tracef("set current height for [%s]", b.name)

select {
// Send the beat to the blockbeat channel. It's expected that the
// consumer will read from this channel and process the block. Once
// processed, it should return the error or nil to the beat.Err chan.
case b.BlockbeatChan <- beat:
beat.logger().Tracef("Sent blockbeat to [%s]", b.name)

case <-b.quit:
beat.logger().Debugf("[%s] received shutdown before sending "+
"beat", b.name)

return nil
}

// Check the consumer's err chan. We expect the consumer to call
// `beat.NotifyBlockProcessed` to send the error back here.
select {
case err := <-b.errChan:
beat.logger().Debugf("[%s] processed beat: err=%v", b.name, err)

return err

case <-b.quit:
beat.logger().Debugf("[%s] received shutdown", b.name)
}

return nil
}

// NotifyBlockProcessed signals that the block has been processed. It takes the
// blockbeat being processed and an error resulted from processing it. This
// error is then sent back to the consumer's err chan to unblock
// `ProcessBlock`.
//
// NOTE: This method must be called by the subsystem after it has finished
// processing the block.
func (b *BeatConsumer) NotifyBlockProcessed(beat Blockbeat, err error) {
// Update the current height.
beat.logger().Debugf("[%s]: notifying beat processed", b.name)

select {
case b.errChan <- err:
beat.logger().Debugf("[%s]: notified beat processed, err=%v",
b.name, err)

case <-b.quit:
beat.logger().Debugf("[%s] received shutdown before notifying "+
"beat processed", b.name)
}
}
Loading
Loading