Skip to content

Commit

Permalink
Merge pull request onflow#5969 from AndriiDiachuk/choose-execution-no…
Browse files Browse the repository at this point in the history
…des-preferred-EN-ids-fix

[Access] chooseExecutionNodes fix
  • Loading branch information
peterargue authored Jun 10, 2024
2 parents 952cbe4 + 2d76ad3 commit 36575f8
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 27 deletions.
76 changes: 62 additions & 14 deletions engine/access/rpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ func executionNodesForBlockID(
}
executorIDs = executorIdentities.NodeIDs()
} else {
// try to find atleast minExecutionNodesCnt execution node ids from the execution receipts for the given blockID
// try to find at least minExecutionNodesCnt execution node ids from the execution receipts for the given blockID
for attempt := 0; attempt < maxAttemptsForExecutionReceipt; attempt++ {
executorIDs, err = findAllExecutionNodes(blockID, executionReceipts, log)
if err != nil {
Expand Down Expand Up @@ -539,34 +539,82 @@ func findAllExecutionNodes(
func chooseExecutionNodes(state protocol.State, executorIDs flow.IdentifierList) (flow.IdentitySkeletonList, error) {
allENs, err := state.Final().Identities(filter.HasRole[flow.Identity](flow.RoleExecution))
if err != nil {
return nil, fmt.Errorf("failed to retreive all execution IDs: %w", err)
return nil, fmt.Errorf("failed to retrieve all execution IDs: %w", err)
}

// first try and choose from the preferred EN IDs
var chosenIDs flow.IdentityList
// choose from preferred EN IDs
if len(preferredENIdentifiers) > 0 {
// find the preferred execution node IDs which have executed the transaction
chosenIDs = allENs.Filter(filter.And(filter.HasNodeID[flow.Identity](preferredENIdentifiers...),
filter.HasNodeID[flow.Identity](executorIDs...)))
if len(chosenIDs) > 0 {
return chosenIDs.ToSkeleton(), nil
}
chosenIDs := chooseFromPreferredENIDs(allENs, executorIDs)
return chosenIDs.ToSkeleton(), nil
}

// if no preferred EN ID is found, then choose from the fixed EN IDs
if len(fixedENIdentifiers) > 0 {
// choose fixed ENs which have executed the transaction
chosenIDs = allENs.Filter(filter.And(
chosenIDs := allENs.Filter(filter.And(
filter.HasNodeID[flow.Identity](fixedENIdentifiers...),
filter.HasNodeID[flow.Identity](executorIDs...)))
filter.HasNodeID[flow.Identity](executorIDs...),
))
if len(chosenIDs) > 0 {
return chosenIDs.ToSkeleton(), nil
}
// if no such ENs are found then just choose all fixed ENs
// if no such ENs are found, then just choose all fixed ENs
chosenIDs = allENs.Filter(filter.HasNodeID[flow.Identity](fixedENIdentifiers...))
return chosenIDs.ToSkeleton(), nil
}

// If no preferred or fixed ENs have been specified, then return all executor IDs i.e. no preference at all
// if no preferred or fixed ENs have been specified, then return all executor IDs i.e., no preference at all
return allENs.Filter(filter.HasNodeID[flow.Identity](executorIDs...)).ToSkeleton(), nil
}

// chooseFromPreferredENIDs finds the subset of execution nodes if preferred execution nodes are defined.
// If preferredENIdentifiers is set and there are less than maxNodesCnt nodes selected, than the list is padded up to
// maxNodesCnt nodes using the following order:
// 1. Use any EN with a receipt.
// 2. Use any preferred node not already selected.
// 3. Use any EN not already selected.
func chooseFromPreferredENIDs(allENs flow.IdentityList, executorIDs flow.IdentifierList) flow.IdentityList {
var chosenIDs flow.IdentityList

// filter for both preferred and executor IDs
chosenIDs = allENs.Filter(filter.And(
filter.HasNodeID[flow.Identity](preferredENIdentifiers...),
filter.HasNodeID[flow.Identity](executorIDs...),
))

if len(chosenIDs) >= maxNodesCnt {
return chosenIDs
}

// function to add nodes to chosenIDs if they are not already included
addIfNotExists := func(candidates flow.IdentityList) {
for _, en := range candidates {
_, exists := chosenIDs.ByNodeID(en.NodeID)
if !exists {
chosenIDs = append(chosenIDs, en)
if len(chosenIDs) >= maxNodesCnt {
return
}
}
}
}

// add any EN with a receipt
receiptENs := allENs.Filter(filter.HasNodeID[flow.Identity](executorIDs...))
addIfNotExists(receiptENs)
if len(chosenIDs) >= maxNodesCnt {
return chosenIDs
}

// add any preferred node not already selected
preferredENs := allENs.Filter(filter.HasNodeID[flow.Identity](preferredENIdentifiers...))
addIfNotExists(preferredENs)
if len(chosenIDs) >= maxNodesCnt {
return chosenIDs
}

// add any EN not already selected
addIfNotExists(allENs)

return chosenIDs
}
49 changes: 36 additions & 13 deletions engine/access/rpc/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1744,14 +1744,6 @@ func (suite *Suite) TestExecutionNodesForBlockID() {
suite.Run("no preferred or fixed ENs", func() {
testExecutionNodesForBlockID(nil, nil, allExecutionNodes)
})
// if only preferred ENs are specified, the ExecutionNodesForBlockID function should
// return the preferred ENs list
suite.Run("two preferred ENs with zero fixed EN", func() {
// mark the first two ENs as preferred
preferredENs := allExecutionNodes[0:2]
expectedList := preferredENs
testExecutionNodesForBlockID(preferredENs, nil, expectedList)
})
// if only fixed ENs are specified, the ExecutionNodesForBlockID function should
// return the fixed ENs list
suite.Run("two fixed ENs with zero preferred EN", func() {
Expand All @@ -1760,14 +1752,22 @@ func (suite *Suite) TestExecutionNodesForBlockID() {
expectedList := fixedENs
testExecutionNodesForBlockID(nil, fixedENs, expectedList)
})
// if only preferred ENs are specified, the ExecutionNodesForBlockID function should
// return the preferred ENs list
suite.Run("two preferred ENs with zero fixed EN", func() {
// mark the first two ENs as preferred
preferredENs := allExecutionNodes[0:2]
expectedList := allExecutionNodes[0:maxNodesCnt]
testExecutionNodesForBlockID(preferredENs, nil, expectedList)
})
// if both are specified, the ExecutionNodesForBlockID function should
// return the preferred ENs list
suite.Run("four fixed ENs of which two are preferred ENs", func() {
// mark the first four ENs as fixed
fixedENs := allExecutionNodes[0:5]
// mark the first two of the fixed ENs as preferred ENs
preferredENs := fixedENs[0:2]
expectedList := preferredENs
expectedList := fixedENs[0:maxNodesCnt]
testExecutionNodesForBlockID(preferredENs, fixedENs, expectedList)
})
// if both are specified, but the preferred ENs don't match the ExecutorIDs in the ER,
Expand All @@ -1777,7 +1777,8 @@ func (suite *Suite) TestExecutionNodesForBlockID() {
fixedENs := allExecutionNodes[0:2]
// specify two ENs not specified in the ERs as preferred
preferredENs := unittest.IdentityListFixture(2, unittest.WithRole(flow.RoleExecution))
expectedList := fixedENs
// add one more node ID besides of the fixed ENs list cause expected length of the list should be maxNodesCnt
expectedList := append(fixedENs, allExecutionNodes[2])
testExecutionNodesForBlockID(preferredENs, fixedENs, expectedList)
})
// if execution receipts are not yet available, the ExecutionNodesForBlockID function should retry twice
Expand All @@ -1791,9 +1792,32 @@ func (suite *Suite) TestExecutionNodesForBlockID() {
currentAttempt = 0
// mark the first two ENs as preferred
preferredENs := allExecutionNodes[0:2]
expectedList := preferredENs
expectedList := allExecutionNodes[0:maxNodesCnt]
testExecutionNodesForBlockID(preferredENs, nil, expectedList)
})
// if preferredENIdentifiers was set and there are less than maxNodesCnt nodes selected than check the order
// of adding ENs ids
suite.Run("add nodes in the correct order", func() {
// mark the first EN as preferred
preferredENIdentifiers = allExecutionNodes[0:1].NodeIDs()
// mark the fourth EN with receipt
executorIDs := allExecutionNodes[3:4].NodeIDs()

receiptNodes := allExecutionNodes[3:4] // any EN with a receipt
preferredNodes := allExecutionNodes[0:1] // preferred EN node not already selected
additionalNode := allExecutionNodes[1:2] // any EN not already selected

expectedOrder := flow.IdentityList{
receiptNodes[0],
preferredNodes[0],
additionalNode[0],
}

chosenIDs := chooseFromPreferredENIDs(allExecutionNodes, executorIDs)

require.ElementsMatch(suite.T(), chosenIDs, expectedOrder)
require.Equal(suite.T(), len(chosenIDs), maxNodesCnt)
})
}

// TestGetTransactionResultEventEncodingVersion tests the GetTransactionResult function with different event encoding versions.
Expand Down Expand Up @@ -2025,8 +2049,7 @@ func (suite *Suite) TestNodeCommunicator() {
// Simulate closed circuit breaker error
suite.execClient.
On("GetTransactionResultsByBlockID", ctx, exeEventReq).
Return(nil, gobreaker.ErrOpenState).
Once()
Return(nil, gobreaker.ErrOpenState)

result, err := backend.GetTransactionResultsByBlockID(ctx, blockId, entitiesproto.EventEncodingVersion_JSON_CDC_V0)
suite.Assert().Nil(result)
Expand Down

0 comments on commit 36575f8

Please sign in to comment.