From bc10ca359f985a4284bdef169ab4ac4b39453a54 Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Tue, 17 Oct 2023 16:05:21 +0200 Subject: [PATCH 1/3] Fix --- consensus/polybft/mocks_test.go | 6 ++++++ consensus/polybft/polybft.go | 14 ++++++++++++- e2e-polybft/e2e/consensus_test.go | 34 ++++++++++++++++++++++++++----- syncer/syncer.go | 16 +++++++++++++++ syncer/types.go | 2 ++ 5 files changed, 66 insertions(+), 6 deletions(-) diff --git a/consensus/polybft/mocks_test.go b/consensus/polybft/mocks_test.go index c2e8c6424b..3ca22f86ea 100644 --- a/consensus/polybft/mocks_test.go +++ b/consensus/polybft/mocks_test.go @@ -395,6 +395,12 @@ func (tp *syncerMock) Sync(func(*types.FullBlock) bool) error { return args.Error(0) } +func (tp *syncerMock) IsSyncingWithPeer() bool { + args := tp.Called() + + return args.Bool(0) +} + func init() { // setup custom hash header func setupHeaderHashFunc() diff --git a/consensus/polybft/polybft.go b/consensus/polybft/polybft.go index 0b85c59661..5496e7f8ec 100644 --- a/consensus/polybft/polybft.go +++ b/consensus/polybft/polybft.go @@ -599,12 +599,24 @@ func (p *Polybft) startConsensusProtocol() { if ev.Source == "syncer" && ev.NewChain[0].Number >= p.blockchain.CurrentHeader().Number { p.logger.Info("sync block notification received", "block height", ev.NewChain[0].Number, "current height", p.blockchain.CurrentHeader().Number) - syncerBlockCh <- struct{}{} + + select { + case syncerBlockCh <- struct{}{}: + default: + } } } } }() + // wait until he stops syncing + p.logger.Info("waiting to stop syncing so that we can try to join consensus if node is a validator") + + for p.syncer.IsSyncingWithPeer() { + } + + p.logger.Info("node synced up on start. Trying to join consensus if validator") + var ( sequenceCh <-chan struct{} stopSequence func() diff --git a/e2e-polybft/e2e/consensus_test.go b/e2e-polybft/e2e/consensus_test.go index c7e6d82bd1..0599a4d564 100644 --- a/e2e-polybft/e2e/consensus_test.go +++ b/e2e-polybft/e2e/consensus_test.go @@ -1,6 +1,7 @@ package e2e import ( + "bytes" "fmt" "math/big" "path" @@ -59,22 +60,40 @@ func TestE2E_Consensus_Basic_WithNonValidators(t *testing.T) { }) t.Run("sync protocol, drop single validator node", func(t *testing.T) { + validatorSrv := cluster.Servers[0] + validatorAcc, err := sidechain.GetAccountFromDir(validatorSrv.DataDir()) + require.NoError(t, err) + // query the current block number, as it is a starting point for the test - currentBlockNum, err := cluster.Servers[0].JSONRPC().Eth().BlockNumber() + currentBlockNum, err := validatorSrv.JSONRPC().Eth().BlockNumber() require.NoError(t, err) + // wait for 2 epochs to elapse, before we stop the node + require.NoError(t, cluster.WaitForBlock(currentBlockNum+2*epochSize, 2*time.Minute)) + // stop one node - node := cluster.Servers[0] - node.Stop() + validatorSrv.Stop() + + // check what is the current block on the running nodes + currentBlockNum, err = cluster.Servers[1].JSONRPC().Eth().BlockNumber() + require.NoError(t, err) // wait for 2 epochs to elapse, so that rest of the network progresses require.NoError(t, cluster.WaitForBlock(currentBlockNum+2*epochSize, 2*time.Minute)) // start the node again - node.Start() + validatorSrv.Start() // wait 2 more epochs to elapse and make sure that stopped node managed to catch up require.NoError(t, cluster.WaitForBlock(currentBlockNum+4*epochSize, 2*time.Minute)) + + // wait until the validator mines one block to check if he is back in consensus + require.NoError(t, cluster.WaitUntil(3*time.Minute, 2*time.Second, func() bool { + latestBlock, err := cluster.Servers[0].JSONRPC().Eth().GetBlockByNumber(ethgo.Latest, false) + require.NoError(t, err) + + return bytes.Equal(validatorAcc.Address().Bytes(), latestBlock.Miner.Bytes()) + })) }) t.Run("sync protocol, drop single non-validator node", func(t *testing.T) { @@ -93,7 +112,12 @@ func TestE2E_Consensus_Basic_WithNonValidators(t *testing.T) { node.Start() // wait 2 more epochs to elapse and make sure that stopped node managed to catch up - require.NoError(t, cluster.WaitForBlock(currentBlockNum+4*epochSize, 2*time.Minute)) + blockToWait := currentBlockNum + 4*epochSize + require.NoError(t, cluster.WaitForBlock(blockToWait, 2*time.Minute)) + + latestBlockOnDroppedNode, err := node.JSONRPC().Eth().GetBlockByNumber(ethgo.Latest, false) + require.NoError(t, err) + require.GreaterOrEqual(t, latestBlockOnDroppedNode.Number, blockToWait) }) } diff --git a/syncer/syncer.go b/syncer/syncer.go index 5c20b52624..c00773471f 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -3,6 +3,7 @@ package syncer import ( "errors" "fmt" + "sync/atomic" "time" "github.com/0xPolygon/polygon-edge/helper/progress" @@ -38,6 +39,8 @@ type syncer struct { // Channel to notify Sync that a new status arrived newStatusCh chan struct{} + + isSyncing atomic.Bool } func NewSyncer( @@ -219,6 +222,9 @@ func (s *syncer) bulkSyncWithPeer(peerID peer.ID, peerLatestBlock uint64, return 0, false, err } + s.setIsSyncing(true) + defer s.setIsSyncing(false) + // Create a blockchain subscription for the sync progression and start tracking subscription := s.blockchain.SubscribeEvents() s.syncProgression.StartProgression(localLatest+1, subscription) @@ -272,6 +278,16 @@ func (s *syncer) bulkSyncWithPeer(peerID peer.ID, peerLatestBlock uint64, } } +// setIsSyncing updates the isSyncing field +func (s *syncer) setIsSyncing(isSyncing bool) { + s.isSyncing.Store(isSyncing) +} + +// IsSyncingWithPeer indicates if node is syncing with peer +func (s *syncer) IsSyncingWithPeer() bool { + return s.isSyncing.Load() +} + func updateMetrics(fullBlock *types.FullBlock) { metrics.SetGauge([]string{syncerMetrics, "tx_num"}, float32(len(fullBlock.Block.Transactions))) metrics.SetGauge([]string{syncerMetrics, "receipts_num"}, float32(len(fullBlock.Receipts))) diff --git a/syncer/types.go b/syncer/types.go index 9e3f81f36e..cba42c4b94 100644 --- a/syncer/types.go +++ b/syncer/types.go @@ -70,6 +70,8 @@ type Syncer interface { HasSyncPeer() bool // Sync starts routine to sync blocks Sync(func(*types.FullBlock) bool) error + + IsSyncingWithPeer() bool } type Progression interface { From 415e050d9932c198f0336503f0a6ab1ff6e22bc1 Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Thu, 2 Nov 2023 09:48:00 +0100 Subject: [PATCH 2/3] Comments fix --- consensus/polybft/mocks_test.go | 2 +- consensus/polybft/polybft.go | 2 +- syncer/syncer.go | 17 +++-------------- syncer/types.go | 4 ++-- 4 files changed, 7 insertions(+), 18 deletions(-) diff --git a/consensus/polybft/mocks_test.go b/consensus/polybft/mocks_test.go index 3ca22f86ea..de1434ea51 100644 --- a/consensus/polybft/mocks_test.go +++ b/consensus/polybft/mocks_test.go @@ -395,7 +395,7 @@ func (tp *syncerMock) Sync(func(*types.FullBlock) bool) error { return args.Error(0) } -func (tp *syncerMock) IsSyncingWithPeer() bool { +func (tp *syncerMock) IsSyncing() bool { args := tp.Called() return args.Bool(0) diff --git a/consensus/polybft/polybft.go b/consensus/polybft/polybft.go index 5496e7f8ec..f9a46b63f5 100644 --- a/consensus/polybft/polybft.go +++ b/consensus/polybft/polybft.go @@ -612,7 +612,7 @@ func (p *Polybft) startConsensusProtocol() { // wait until he stops syncing p.logger.Info("waiting to stop syncing so that we can try to join consensus if node is a validator") - for p.syncer.IsSyncingWithPeer() { + for p.syncer.IsSyncing() { } p.logger.Info("node synced up on start. Trying to join consensus if validator") diff --git a/syncer/syncer.go b/syncer/syncer.go index c00773471f..a7aa1ec922 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -3,7 +3,6 @@ package syncer import ( "errors" "fmt" - "sync/atomic" "time" "github.com/0xPolygon/polygon-edge/helper/progress" @@ -39,8 +38,6 @@ type syncer struct { // Channel to notify Sync that a new status arrived newStatusCh chan struct{} - - isSyncing atomic.Bool } func NewSyncer( @@ -222,9 +219,6 @@ func (s *syncer) bulkSyncWithPeer(peerID peer.ID, peerLatestBlock uint64, return 0, false, err } - s.setIsSyncing(true) - defer s.setIsSyncing(false) - // Create a blockchain subscription for the sync progression and start tracking subscription := s.blockchain.SubscribeEvents() s.syncProgression.StartProgression(localLatest+1, subscription) @@ -278,14 +272,9 @@ func (s *syncer) bulkSyncWithPeer(peerID peer.ID, peerLatestBlock uint64, } } -// setIsSyncing updates the isSyncing field -func (s *syncer) setIsSyncing(isSyncing bool) { - s.isSyncing.Store(isSyncing) -} - -// IsSyncingWithPeer indicates if node is syncing with peer -func (s *syncer) IsSyncingWithPeer() bool { - return s.isSyncing.Load() +// IsSyncing indicates if node is syncing with peer +func (s *syncer) IsSyncing() bool { + return s.GetSyncProgression() != nil } func updateMetrics(fullBlock *types.FullBlock) { diff --git a/syncer/types.go b/syncer/types.go index cba42c4b94..a6f3f7a4c0 100644 --- a/syncer/types.go +++ b/syncer/types.go @@ -70,8 +70,8 @@ type Syncer interface { HasSyncPeer() bool // Sync starts routine to sync blocks Sync(func(*types.FullBlock) bool) error - - IsSyncingWithPeer() bool + // Indicates if syncer is syncing with the best peer + IsSyncing() bool } type Progression interface { From df57c2562725741e0a56de5b0f394c0be89c0716 Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Thu, 2 Nov 2023 10:26:22 +0100 Subject: [PATCH 3/3] Comments fix --- syncer/types.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/syncer/types.go b/syncer/types.go index a6f3f7a4c0..c8643d08ea 100644 --- a/syncer/types.go +++ b/syncer/types.go @@ -70,7 +70,7 @@ type Syncer interface { HasSyncPeer() bool // Sync starts routine to sync blocks Sync(func(*types.FullBlock) bool) error - // Indicates if syncer is syncing with the best peer + // IsSyncing indicates if syncer is syncing with the best peer IsSyncing() bool }