From 9eaea70b749ba5340facda3862ca2574fb90d39e Mon Sep 17 00:00:00 2001 From: ziggie Date: Wed, 24 Jan 2024 00:23:10 +0000 Subject: [PATCH] chain: PrunedBlockDispatcher bugfix. In case we fail the request via the neutriono block peer fetcher we make sure we fail all dependant `GetBlock` calls and remove this block request completely so that a new request for this same block hash can be made. --- chain/bitcoind_conn.go | 28 ++++- chain/pruned_block_dispatcher.go | 81 ++++++++++++-- chain/pruned_block_dispatcher_test.go | 151 ++++++++++++++++++++++---- 3 files changed, 226 insertions(+), 34 deletions(-) diff --git a/chain/bitcoind_conn.go b/chain/bitcoind_conn.go index 88edf1b05a..44d0fdda2d 100644 --- a/chain/bitcoind_conn.go +++ b/chain/bitcoind_conn.go @@ -469,10 +469,16 @@ func (c *BitcoindConn) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock, error) { return nil, err } + // cancelChan is needed in case a block request with the same hash is + // already registered and fails via the returned errChan. Because we + // don't register a new request in this case this cancelChan is used + // to signal the failure of the dependant block request. + cancelChan := make(chan error, 1) + // Now that we know the block has been pruned for sure, request it from // our backend peers. blockChan, errChan := c.prunedBlockDispatcher.Query( - []*chainhash.Hash{hash}, + []*chainhash.Hash{hash}, cancelChan, ) for { @@ -482,12 +488,32 @@ func (c *BitcoindConn) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock, error) { case err := <-errChan: if err != nil { + // An error was returned for this block request. + // We have to make sure we remove the blockhash + // from the list of queried blocks. + // Moreover because in case a block is requested + // more than onced no redundant block requests + // are registered but rather a reply channel is + // added to the pending block request. This + // means we need to cancel all dependent + // `GetBlock` calls via the cancel channel when + // the fetching of the block was NOT successful. + c.prunedBlockDispatcher.CancelRequest( + *hash, err, + ) + return nil, err } // errChan fired before blockChan with a nil error, wait // for the block now. + // The cancelChan is only used when there is already a pending + // block request for this hash and that block request fails via + // the error channel above. + case err := <-cancelChan: + return nil, err + case <-c.quit: return nil, ErrBitcoindClientShuttingDown } diff --git a/chain/pruned_block_dispatcher.go b/chain/pruned_block_dispatcher.go index 9c2d5e6168..3eb9d26c84 100644 --- a/chain/pruned_block_dispatcher.go +++ b/chain/pruned_block_dispatcher.go @@ -143,7 +143,14 @@ type PrunedBlockDispatcher struct { // NOTE: The blockMtx lock must always be held when accessing this // field. blocksQueried map[chainhash.Hash][]chan *wire.MsgBlock - blockMtx sync.Mutex + + // blockQueryCancel signals the cancelation of a `GetBlock` request. + // + // NOTE: The blockMtx lock must always be held when accessing this + // field. + blockQueryCancel map[chainhash.Hash][]chan<- error + + blockMtx sync.Mutex // currentPeers represents the set of peers we're currently connected // to. Each peer found here will have a worker spawned within the @@ -198,12 +205,13 @@ func NewPrunedBlockDispatcher(cfg *PrunedBlockDispatcherConfig) ( NewWorker: query.NewWorker, Ranking: query.NewPeerRanking(), }), - blocksQueried: make(map[chainhash.Hash][]chan *wire.MsgBlock), - currentPeers: make(map[string]*peer.Peer), - bannedPeers: make(map[string]struct{}), - peersConnected: peersConnected, - timeSource: blockchain.NewMedianTime(), - quit: make(chan struct{}), + blocksQueried: make(map[chainhash.Hash][]chan *wire.MsgBlock), + blockQueryCancel: make(map[chainhash.Hash][]chan<- error), + currentPeers: make(map[string]*peer.Peer), + bannedPeers: make(map[string]struct{}), + peersConnected: peersConnected, + timeSource: blockchain.NewMedianTime(), + quit: make(chan struct{}), }, nil } @@ -502,9 +510,10 @@ func (d *PrunedBlockDispatcher) banPeer(peer string) { // Query submits a request to query the information of the given blocks. func (d *PrunedBlockDispatcher) Query(blocks []*chainhash.Hash, + cancelChan chan<- error, opts ...query.QueryOption) (<-chan *wire.MsgBlock, <-chan error) { - reqs, blockChan, err := d.newRequest(blocks) + reqs, blockChan, err := d.newRequest(blocks, cancelChan) if err != nil { errChan := make(chan error, 1) errChan <- err @@ -515,14 +524,18 @@ func (d *PrunedBlockDispatcher) Query(blocks []*chainhash.Hash, if len(reqs) > 0 { errChan = d.workManager.Query(reqs, opts...) } + return blockChan, errChan } // newRequest construct a new query request for the given blocks to submit to // the internal workManager. A channel is also returned through which the // requested blocks are sent through. -func (d *PrunedBlockDispatcher) newRequest(blocks []*chainhash.Hash) ( - []*query.Request, <-chan *wire.MsgBlock, error) { +// +// NOTE: The cancelChan must be a buffered. +func (d *PrunedBlockDispatcher) newRequest(blocks []*chainhash.Hash, + cancelChan chan<- error) ([]*query.Request, <-chan *wire.MsgBlock, + error) { // Make sure the channel is buffered enough to handle all blocks. blockChan := make(chan *wire.MsgBlock, len(blocks)) @@ -550,6 +563,10 @@ func (d *PrunedBlockDispatcher) newRequest(blocks []*chainhash.Hash) ( } else { log.Debugf("Received new request for pending query of "+ "block %v", *block) + + d.blockQueryCancel[*block] = append( + d.blockQueryCancel[*block], cancelChan, + ) } d.blocksQueried[*block] = append( @@ -678,3 +695,47 @@ func (d *PrunedBlockDispatcher) handleResp(req, resp wire.Message, return progress } + +// CancelRequest removes all information regarding a failed block request. +// When for example the Peer disconnects or runs in a timeout we make sure +// that all related information is deleted and a new request for this block +// can be registered. Moreover will also cancel all depending goroutines. +func (d *PrunedBlockDispatcher) CancelRequest(blockHash chainhash.Hash, + err error) { + + // failDependant is a helper function which fails all dependant + // goroutines via their cancel channels. + failDependant := func(cancelChans []chan<- error) { + defer d.wg.Done() + + for _, cancel := range cancelChans { + select { + case cancel <- err: + case <-d.quit: + return + } + } + } + + d.blockMtx.Lock() + + // Before removing the block hash we get the cancelChans which were + // registered for block requests which already had an ongoing pending + // request registered. + cancelChans, ok := d.blockQueryCancel[blockHash] + + // Remove all data related to this block request to make sure the same + // block can be registered again in the future. + delete(d.blocksQueried, blockHash) + delete(d.blockQueryCancel, blockHash) + + d.blockMtx.Unlock() + + // In case there are goroutines depending on this block request we + // make sure we cancel them. + // We do this in goroutine to not block the initial request. + if ok { + d.wg.Add(1) + go failDependant(cancelChans) + } +} diff --git a/chain/pruned_block_dispatcher_test.go b/chain/pruned_block_dispatcher_test.go index b7f801d8b5..24f32ed7fa 100644 --- a/chain/pruned_block_dispatcher_test.go +++ b/chain/pruned_block_dispatcher_test.go @@ -15,6 +15,7 @@ import ( "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/peer" "github.com/btcsuite/btcd/wire" + "github.com/lightninglabs/neutrino/query" "github.com/lightningnetwork/lnd/ticker" "github.com/stretchr/testify/require" ) @@ -258,12 +259,18 @@ func (h *prunedBlockDispatcherHarness) newPeer() *peer.Peer { } // query requests the given blocks from the PrunedBlockDispatcher. -func (h *prunedBlockDispatcherHarness) query(blocks []*chainhash.Hash) ( - <-chan *wire.MsgBlock, <-chan error) { +func (h *prunedBlockDispatcherHarness) query(blocks []*chainhash.Hash, + opts ...query.QueryOption) ( + <-chan *wire.MsgBlock, <-chan error, <-chan error) { h.t.Helper() - blockChan, errChan := h.dispatcher.Query(blocks) + // cancelChan will receive an error msg in case the dependant block + // request fails. This is used for block requests already have a pending + // request registered and this request fails. + cancelChan := make(chan error, 1) + + blockChan, errChan := h.dispatcher.Query(blocks, cancelChan, opts...) select { case err := <-errChan: require.NoError(h.t, err) @@ -274,7 +281,7 @@ func (h *prunedBlockDispatcherHarness) query(blocks []*chainhash.Hash) ( h.blocksQueried[*block]++ } - return blockChan, errChan + return blockChan, errChan, cancelChan } // disablePeerReplies prevents the query peer from replying. @@ -363,7 +370,7 @@ func (h *prunedBlockDispatcherHarness) assertPeerQueried() { // assertPeerReplied asserts that the query peer replies with a block the // PrunedBlockDispatcher queried for. func (h *prunedBlockDispatcherHarness) assertPeerReplied( - blockChan <-chan *wire.MsgBlock, errChan <-chan error, + blockChan <-chan *wire.MsgBlock, errChan, cancelChan <-chan error, expectCompletionSignal bool) { h.t.Helper() @@ -385,12 +392,14 @@ func (h *prunedBlockDispatcherHarness) assertPeerReplied( delete(h.blocksQueried, blockHash) } + case err := <-errChan: + h.t.Fatalf("received unexpected error send: %v", err) + + case err := <-cancelChan: + h.t.Fatalf("received unexpected cancel request with error: %v", + err) + case <-time.After(5 * time.Second): - select { - case err := <-errChan: - h.t.Fatalf("received unexpected error send: %v", err) - default: - } h.t.Fatal("expected reply from peer") } @@ -406,6 +415,37 @@ func (h *prunedBlockDispatcherHarness) assertPeerReplied( } } +// assertPeerReplied asserts that the query peer replies with a block the +// PrunedBlockDispatcher queried for. +func (h *prunedBlockDispatcherHarness) assertPeerFailed( + blockChan <-chan *wire.MsgBlock, errChan, cancelChan <-chan error, + expectedErr error) { + + h.t.Helper() + + select { + case <-blockChan: + h.t.Fatalf("expected no reply from peer") + + case err := <-errChan: + require.ErrorIs(h.t, err, expectedErr) + for _, hash := range h.hashes { + h.dispatcher.CancelRequest(*hash, err) + // The corresponding block is deleted from the request + // queue in `CancelRequest` so we delete it from the + // harness as well. + delete(h.blocksQueried, *hash) + } + + case err := <-cancelChan: + require.ErrorIs(h.t, err, expectedErr) + + case <-time.After(5 * time.Second): + h.t.Fatal("expected reply from peer") + } + +} + // assertNoPeerDialed asserts that the PrunedBlockDispatcher hasn't established // a new peer connection. func (h *prunedBlockDispatcherHarness) assertNoPeerDialed() { @@ -420,15 +460,21 @@ func (h *prunedBlockDispatcherHarness) assertNoPeerDialed() { // assertNoReply asserts that the peer hasn't replied to a query. func (h *prunedBlockDispatcherHarness) assertNoReply( - blockChan <-chan *wire.MsgBlock, errChan <-chan error) { + blockChan <-chan *wire.MsgBlock, errChan, cancelChan <-chan error) { h.t.Helper() select { case block := <-blockChan: h.t.Fatalf("received unexpected block %v", block.BlockHash()) + case err := <-errChan: h.t.Fatalf("received unexpected error send: %v", err) + + case err := <-cancelChan: + h.t.Fatalf("received unexpected cancel request with error: %v", + err) + case <-time.After(2 * time.Second): } } @@ -449,16 +495,69 @@ func TestPrunedBlockDispatcherQuerySameBlock(t *testing.T) { // Queue all the block requests one by one. blockChans := make([]<-chan *wire.MsgBlock, 0, numRequests) errChans := make([]<-chan error, 0, numRequests) + cancelChans := make([]<-chan error, 0, numRequests) + + for i := 0; i < numRequests; i++ { + blockChan, errChan, cancelChan := h.query(h.hashes) + blockChans = append(blockChans, blockChan) + errChans = append(errChans, errChan) + cancelChans = append(cancelChans, cancelChan) + + } + + // We should only see one query. + h.assertPeerQueried() + for i := 0; i < numRequests; i++ { + h.assertPeerReplied(blockChans[i], errChans[i], cancelChans[i], + i == 0) + } +} + +// TestPrunedBlockDispatcherQuerySameBlock tests that client requests for the +// same block result in only fetching the block once while pending. +func TestPrunedBlockDispatcherQueryFailSameBlock(t *testing.T) { + t.Parallel() + + const numBlocks = 1 + const numPeers = 5 + const numRequests = numBlocks * numPeers + + h := newNetworkBlockTestHarness(t, numBlocks, numPeers, numPeers) + h.start() + defer h.stop() + + // Queue all the block requests one by one. + blockChans := make([]<-chan *wire.MsgBlock, 0, numRequests) + errChans := make([]<-chan error, 0, numRequests) + cancelChans := make([]<-chan error, 0, numRequests) + + // We want to force a timeout. + h.disablePeerReplies() + for i := 0; i < numRequests; i++ { - blockChan, errChan := h.query(h.hashes) + // The default retry is 2 and is defined in the neutrino + // package. We want to fail the request therefore we use 1. + // Moreover the default timeout of a single request is 2 seconds + // and currently not configurable so we have to make sure when + // asserting not not timeout before. + blockChan, errChan, cancelChan := h.query( + h.hashes, query.NumRetries(1), + ) blockChans = append(blockChans, blockChan) errChans = append(errChans, errChan) + cancelChans = append(cancelChans, cancelChan) } // We should only see one query. h.assertPeerQueried() for i := 0; i < numRequests; i++ { - h.assertPeerReplied(blockChans[i], errChans[i], i == 0) + h.assertPeerFailed(blockChans[i], errChans[i], cancelChans[i], + query.ErrQueryTimeout) + } + + // + for _, block := range h.hashes { + delete(h.blocksQueried, *block) } } @@ -476,7 +575,7 @@ func TestPrunedBlockDispatcherMultipleGetData(t *testing.T) { defer h.stop() // Request all blocks. - blockChan, errChan := h.query(h.hashes) + blockChan, errChan, cancelChan := h.query(h.hashes) // Since we have more blocks than can fit in a single GetData message, // we should expect multiple queries. For each query, we should expect @@ -491,7 +590,8 @@ func TestPrunedBlockDispatcherMultipleGetData(t *testing.T) { for j := 0; j < maxRequestInvs; j++ { expectCompletionSignal := blocksRecvd == numBlocks-1 h.assertPeerReplied( - blockChan, errChan, expectCompletionSignal, + blockChan, errChan, cancelChan, + expectCompletionSignal, ) blocksRecvd++ @@ -517,16 +617,21 @@ func TestPrunedBlockDispatcherMultipleQueryPeers(t *testing.T) { // Queue all the block requests one by one. blockChans := make([]<-chan *wire.MsgBlock, 0, numBlocks) errChans := make([]<-chan error, 0, numBlocks) + cancelChans := make([]<-chan error, 0, numBlocks) + for i := 0; i < numBlocks; i++ { - blockChan, errChan := h.query(h.hashes[i : i+1]) + blockChan, errChan, cancelChan := h.query(h.hashes[i : i+1]) blockChans = append(blockChans, blockChan) errChans = append(errChans, errChan) + cancelChans = append(cancelChans, cancelChan) + } // We should see one query per block. for i := 0; i < numBlocks; i++ { h.assertPeerQueried() - h.assertPeerReplied(blockChans[i], errChans[i], true) + h.assertPeerReplied(blockChans[i], errChans[i], cancelChans[i], + true) } } @@ -544,7 +649,7 @@ func TestPrunedBlockDispatcherPeerPoller(t *testing.T) { h.assertNoPeerDialed() // We'll then query for a block. - blockChan, errChan := h.query(h.hashes) + blockChan, errChan, cancelChan := h.query(h.hashes) // Refresh our peers. This would dial some peers, but we don't have any // yet. @@ -563,7 +668,7 @@ func TestPrunedBlockDispatcherPeerPoller(t *testing.T) { // Disconnect our peer and re-enable replies. h.disconnectPeer(peer, false) h.enablePeerReplies() - h.assertNoReply(blockChan, errChan) + h.assertNoReply(blockChan, errChan, cancelChan) // Force a refresh once again. Since the peer has disconnected, a new // connection should be made and the peer should be queried again. @@ -579,7 +684,7 @@ func TestPrunedBlockDispatcherPeerPoller(t *testing.T) { // Now that we know we've connected to the peer, we should be able to // receive their response. - h.assertPeerReplied(blockChan, errChan, true) + h.assertPeerReplied(blockChan, errChan, cancelChan, true) } // TestPrunedBlockDispatcherInvalidBlock ensures that validation is performed on @@ -597,9 +702,9 @@ func TestPrunedBlockDispatcherInvalidBlock(t *testing.T) { // We'll then query for a block. We shouldn't see a response as the // block should have failed validation. - blockChan, errChan := h.query(h.hashes) + blockChan, errChan, cancelChan := h.query(h.hashes) h.assertPeerQueried() - h.assertNoReply(blockChan, errChan) + h.assertNoReply(blockChan, errChan, cancelChan) // Since the peer sent us an invalid block, they should have been // disconnected and banned. Refreshing our peers shouldn't result in a @@ -618,7 +723,7 @@ func TestPrunedBlockDispatcherInvalidBlock(t *testing.T) { h.refreshPeers() h.assertPeerDialed() h.assertPeerQueried() - h.assertPeerReplied(blockChan, errChan, true) + h.assertPeerReplied(blockChan, errChan, cancelChan, true) } func TestSatisfiesRequiredServices(t *testing.T) {