Skip to content

Commit

Permalink
Merge pull request #1993 from PlatONnetwork/hotfix-1973
Browse files Browse the repository at this point in the history
1.3.1
  • Loading branch information
benbaley authored Nov 30, 2022
2 parents 3407a1a + 8a5b3bf commit be7ed4c
Show file tree
Hide file tree
Showing 12 changed files with 88 additions and 34 deletions.
4 changes: 4 additions & 0 deletions consensus/bft_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,10 @@ func (bm *BftMock) Close() error {
return nil
}

func (bm *BftMock) Stop() error {
return nil
}

// ConsensusNodes returns the current consensus node address list.
func (bm *BftMock) ConsensusNodes() ([]discover.NodeID, error) {
return nil, nil
Expand Down
23 changes: 14 additions & 9 deletions consensus/cbft/cbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,6 @@ func (cbft *Cbft) ReceiveMessage(msg *ctypes.MsgInfo) error {
}

err := cbft.recordMessage(msg)
//cbft.log.Debug("Record message", "type", fmt.Sprintf("%T", msg.Msg), "msgHash", msg.Msg.MsgHash(), "duration", time.Since(begin))
if err != nil {
cbft.log.Warn("ReceiveMessage failed", "err", err)
return err
Expand All @@ -337,7 +336,6 @@ func (cbft *Cbft) ReceiveMessage(msg *ctypes.MsgInfo) error {
// Repeat filtering on consensus messages.
// First check.
if cbft.network.ContainsHistoryMessageHash(msg.Msg.MsgHash()) {
//cbft.log.Trace("Processed message for ReceiveMessage, no need to process", "msgHash", msg.Msg.MsgHash())
cbft.forgetMessage(msg.PeerID)
return nil
}
Expand Down Expand Up @@ -667,8 +665,6 @@ func (cbft *Cbft) VerifyHeader(chain consensus.ChainReader, header *types.Header

// VerifyHeaders is used to verify the validity of block headers in batch.
func (cbft *Cbft) VerifyHeaders(chain consensus.ChainReader, headers []*types.Header, seals []bool) (chan<- struct{}, <-chan error) {
//cbft.log.Trace("Verify headers", "total", len(headers))

abort := make(chan struct{})
results := make(chan error, len(headers))

Expand All @@ -689,7 +685,6 @@ func (cbft *Cbft) VerifyHeaders(chain consensus.ChainReader, headers []*types.He
// VerifySeal implements consensus.Engine, checking whether the signature contained
// in the header satisfies the consensus protocol requirements.
func (cbft *Cbft) VerifySeal(chain consensus.ChainReader, header *types.Header) error {
//cbft.log.Trace("Verify seal", "hash", header.Hash(), "number", header.Number)
if header.Number.Uint64() == 0 {
return ErrorUnKnowBlock
}
Expand Down Expand Up @@ -777,7 +772,7 @@ func (cbft *Cbft) OnSeal(block *types.Block, results chan<- *types.Block, stop <
ViewNumber: cbft.state.ViewNumber(),
Block: block,
BlockIndex: cbft.state.NextViewBlockIndex(),
ProposalIndex: uint32(me.Index),
ProposalIndex: me.Index,
}

// Next index is equal zero, This view does not produce a block.
Expand Down Expand Up @@ -1088,10 +1083,20 @@ func (cbft *Cbft) Close() error {
}
close(cbft.exitCh)
})
cbft.bridge.Close()
return nil
}

// Stop turns off the consensus asyncExecutor and fetcher.
func (cbft *Cbft) Stop() error {
cbft.log.Info("Stop cbft consensus")
if cbft.asyncExecutor != nil {
cbft.asyncExecutor.Stop()
}
cbft.bridge.Close()
if cbft.fetcher != nil {
cbft.fetcher.Stop()
}
cbft.blockCacheWriter.Stop()
return nil
}

Expand Down Expand Up @@ -1533,7 +1538,7 @@ func (cbft *Cbft) verifyConsensusMsg(msg ctypes.ConsensusMsg) (*cbfttypes.Valida
switch cm := msg.(type) {
case *protocols.PrepareBlock:
proposer := cbft.currentProposer()
if uint32(proposer.Index) != msg.NodeIndex() {
if proposer.Index != msg.NodeIndex() {
return nil, fmt.Errorf("current proposer index:%d, prepare block author index:%d", proposer.Index, msg.NodeIndex())
}
// BlockNum equal 1, the parent's block is genesis, doesn't has prepareQC
Expand Down Expand Up @@ -1727,7 +1732,7 @@ func (cbft *Cbft) verifyPrepareQC(oriNum uint64, oriHash common.Hash, qc *ctypes
}
// check if the corresponding block QC
if oriNum != qc.BlockNumber || oriHash != qc.BlockHash {
return authFailedError{
return handleError{
err: fmt.Errorf("verify prepare qc failed,not the corresponding qc,oriNum:%d,oriHash:%s,qcNum:%d,qcHash:%s",
oriNum, oriHash.String(), qc.BlockNumber, qc.BlockHash.String())}
}
Expand Down
3 changes: 1 addition & 2 deletions consensus/cbft/cbft_byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the PlatON-Go library. If not, see <http://www.gnu.org/licenses/>.


package cbft

import (
Expand Down Expand Up @@ -344,7 +343,7 @@ func TestPB06(t *testing.T) {
p := newPrepareBlock(nodes[0].engine.state.Epoch(), nodes[0].engine.state.ViewNumber(), qcBlock.Hash(), qcBlock.NumberU64()+1, blockIndex, proposalIndex, lockQC, viewChangeQC, nodes[proposalIndex].engine.config.Option.BlsPriKey, false, nodes[0], t)
err := nodes[0].engine.OnPrepareBlock("id", p)

_, ok := err.(authFailedError)
_, ok := err.(handleError)
assert.True(t, ok)
if ok {
assert.True(t, strings.HasPrefix(err.Error(), MismatchedPrepareQC))
Expand Down
11 changes: 6 additions & 5 deletions consensus/cbft/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the PlatON-Go library. If not, see <http://www.gnu.org/licenses/>.


package executor

import (
"errors"
"fmt"

"github.com/PlatONnetwork/PlatON-Go/common"
"github.com/PlatONnetwork/PlatON-Go/core/types"
Expand Down Expand Up @@ -101,11 +100,13 @@ func (exe *AsyncExecutor) ExecuteStatus() <-chan *BlockExecuteStatus {
// If execute channel if full, will return a error.
func (exe *AsyncExecutor) newTask(block *types.Block, parent *types.Block) error {
select {
case <-exe.closed:
return fmt.Errorf("asyncExecutor is stoped")
case exe.executeTasks <- &executeTask{parent: parent, block: block}:
return nil
default:
// FIXME: blocking if channel is full?
return errors.New("execute task queue is full")
//default:
// // FIXME: blocking if channel is full?
// return errors.New("execute task queue is full")
}
}

Expand Down
1 change: 0 additions & 1 deletion consensus/cbft/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the PlatON-Go library. If not, see <http://www.gnu.org/licenses/>.


package fetcher

import (
Expand Down
5 changes: 2 additions & 3 deletions consensus/cbft/sync_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the PlatON-Go library. If not, see <http://www.gnu.org/licenses/>.


package cbft

import (
Expand Down Expand Up @@ -112,7 +111,7 @@ func (cbft *Cbft) fetchBlock(id string, hash common.Hash, number uint64, qc *cty
return
}
if err := cbft.OnInsertQCBlock([]*types.Block{block}, []*ctypes.QuorumCert{blockList.QC[i]}); err != nil {
cbft.log.Error("Insert block failed", "error", err)
cbft.log.Warn("Insert block failed", "error", err)
asyncCallErr = err
}
wg.Done()
Expand Down Expand Up @@ -909,4 +908,4 @@ func (cbft *Cbft) SyncBlockQuorumCert(id string, blockNumber uint64, blockHash c
cbft.log.Debug("Send GetBlockQuorumCert", "peer", id, "msg", msg.String())
}

}
}
4 changes: 4 additions & 0 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type BlockCacheWriter interface {
Execute(block *types.Block, parent *types.Block) error
ClearCache(block *types.Block)
WriteBlock(block *types.Block) error
Stop()
}

// Engine is an algorithm agnostic consensus engine.
Expand Down Expand Up @@ -134,8 +135,11 @@ type Engine interface {
// Close terminates any background threads maintained by the consensus engine.
Close() error

Stop() error

// Pause consensus
Pause()

// Resume consensus
Resume()

Expand Down
3 changes: 3 additions & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ type BlockChain struct {
// procInterrupt must be atomically called
procInterrupt int32 // interrupt signaler for block processing
wg sync.WaitGroup // chain processing wait group for shutting down
executeWG sync.WaitGroup // execute block processing wait group for shutting down

engine consensus.Engine
processor Processor // block processor interface
Expand Down Expand Up @@ -859,6 +860,8 @@ func (bc *BlockChain) Stop() {
if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) {
return
}

bc.executeWG.Wait()
// Unsubscribe all subscriptions registered from blockchain
bc.scope.Close()
close(bc.quit)
Expand Down
56 changes: 47 additions & 9 deletions core/blockchain_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ type BlockChainCache struct {
stateDBMu sync.RWMutex
receiptsMu sync.RWMutex

executing sync.Mutex
executed sync.Map
executed sync.Map

executeCh chan *executeTask
quit chan struct{}
}

type stateDBCache struct {
Expand All @@ -61,6 +63,12 @@ type receiptsCache struct {
blockNum uint64
}

type executeTask struct {
parent *types.Block
block *types.Block
result chan error
}

func (pbc *BlockChainCache) CurrentBlock() *types.Block {
block := pbc.Engine().CurrentBlock()
if block != nil {
Expand Down Expand Up @@ -106,6 +114,11 @@ func NewBlockChainCache(blockChain *BlockChain) *BlockChainCache {
pbc.BlockChain = blockChain
pbc.stateDBCache = make(map[common.Hash]*stateDBCache)
pbc.receiptsCache = make(map[common.Hash]*receiptsCache)
pbc.executeCh = make(chan *executeTask)
pbc.quit = make(chan struct{})

pbc.executeWG.Add(1)
go pbc.executeLoop()

return pbc
}
Expand Down Expand Up @@ -229,6 +242,7 @@ func (bcc *BlockChainCache) MakeStateDBByHeader(header *types.Header) (*state.St
// Create a StateDB instance from the blockchain based on stateRoot
return state, nil
}
log.Error("Make stateDB err")
return nil, errMakeStateDB
}

Expand Down Expand Up @@ -260,6 +274,32 @@ func (bcc *BlockChainCache) ClearCache(block *types.Block) {
}

func (bcc *BlockChainCache) Execute(block *types.Block, parent *types.Block) error {
result := make(chan error)

select {
case <-bcc.quit:
return fmt.Errorf("blockChainCache is stopped")
case bcc.executeCh <- &executeTask{parent: parent, block: block, result: result}:
return <-result
}
}

func (bcc *BlockChainCache) executeLoop() {
defer bcc.executeWG.Done()

for {
select {
case task := <-bcc.executeCh:
err := bcc.executeBlock(task.block, task.parent)
task.result <- err
case <-bcc.quit:
log.Info("Stop cbft blockChainCache")
return
}
}
}

func (bcc *BlockChainCache) executeBlock(block *types.Block, parent *types.Block) error {
executed := func() bool {
if number, ok := bcc.executed.Load(block.Header().SealHash()); ok && number.(uint64) == block.Number().Uint64() {
log.Debug("Block has executed", "number", block.Number(), "hash", block.Hash(), "parentNumber", parent.Number(), "parentHash", parent.Hash())
Expand All @@ -272,14 +312,7 @@ func (bcc *BlockChainCache) Execute(block *types.Block, parent *types.Block) err
return nil
}

bcc.executing.Lock()
defer bcc.executing.Unlock()
if executed() {
return nil
}

log.Debug("Start execute block", "hash", block.Hash(), "number", block.Number(), "sealHash", block.Header().SealHash())

state, err := bcc.MakeStateDB(parent)
if err != nil {
log.Error("BlockChainCache MakeStateDB failed", "err", err)
Expand Down Expand Up @@ -346,6 +379,11 @@ func (bcc *BlockChainCache) WriteBlock(block *types.Block) error {
return nil
}

func (bcc *BlockChainCache) Stop() {
log.Info("Stopping blockChain cache")
close(bcc.quit)
}

type sealHashNumber struct {
number uint64
hash common.Hash
Expand Down
5 changes: 4 additions & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ type Ethereum struct {
func New(stack *node.Node, config *Config) (*Ethereum, error) {
// Ensure configuration values are compatible and sane
if config.SyncMode == downloader.LightSync {
return nil, errors.New("can't run eth.PlatON in light sync mode, use les.LightPlatON")
return nil, errors.New("can't run PlatON in light sync mode, use les.LightPlatON")
}
if !config.SyncMode.IsValid() {
return nil, fmt.Errorf("invalid sync mode %d", config.SyncMode)
Expand Down Expand Up @@ -596,6 +596,9 @@ func (s *Ethereum) Stop() error {
s.protocolManager.Stop()

// Then stop everything else.
// Only the operations related to block execution are stopped here
// and engine.Close cannot be called directly because it has a dependency on the following modules
s.engine.Stop()
s.bloomIndexer.Close()
close(s.closeBloomHandler)
s.txPool.Stop()
Expand Down
5 changes: 2 additions & 3 deletions miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
package miner

import (
"github.com/PlatONnetwork/PlatON-Go/common/hexutil"
"math/big"
"sync/atomic"
"time"

"github.com/PlatONnetwork/PlatON-Go/common/hexutil"

"github.com/PlatONnetwork/PlatON-Go/consensus"
"github.com/PlatONnetwork/PlatON-Go/core"
"github.com/PlatONnetwork/PlatON-Go/core/state"
Expand Down Expand Up @@ -52,7 +53,6 @@ type Config struct {
type Miner struct {
mux *event.TypeMux
worker *worker
eth Backend
engine consensus.Engine
exitCh chan struct{}

Expand All @@ -64,7 +64,6 @@ func New(eth Backend, config *Config, chainConfig *params.ChainConfig, miningCon
engine consensus.Engine, isLocalBlock func(block *types.Block) bool,
blockChainCache *core.BlockChainCache, vmTimeout uint64) *Miner {
miner := &Miner{
eth: eth,
mux: mux,
engine: engine,
exitCh: make(chan struct{}),
Expand Down
2 changes: 1 addition & 1 deletion params/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const (
//These versions are meaning the current code version.
VersionMajor = 1 // Major version component of the current release
VersionMinor = 3 // Minor version component of the current release
VersionPatch = 0 // Patch version component of the current release
VersionPatch = 1 // Patch version component of the current release
VersionMeta = "unstable" // Version metadata to append to the version string

//CAUTION: DO NOT MODIFY THIS ONCE THE CHAIN HAS BEEN INITIALIZED!!!
Expand Down

0 comments on commit be7ed4c

Please sign in to comment.