Skip to content

Commit

Permalink
fix: make sure to query socket for other operators in the quorum
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Jan 3, 2025
1 parent 36b9280 commit 0eddf4e
Showing 1 changed file with 63 additions and 36 deletions.
99 changes: 63 additions & 36 deletions core/eth/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,25 @@ func (cs *ChainState) GetOperatorStateByOperator(ctx context.Context, blockNumbe
return nil, err
}

if _, ok := cs.SocketMap[operator]; !ok {
socket, err := cs.Tx.GetOperatorSocket(ctx, operator)
if err != nil {
return nil, err
}
cs.SocketMap[operator] = &socket
// for _, state := range operatorsByQuorum {
// for _, op := range state {
// if _, ok := cs.SocketMap[op.OperatorID]; !ok {
// socket, err := cs.Tx.GetOperatorSocket(ctx, op.OperatorID)
// if err != nil {
// return nil, err
// }
// cs.socketMu.Lock()
// cs.SocketMap[op.OperatorID] = &socket
// cs.socketMu.Unlock()
// }

// }
// }

err = cs.refreshSocketMap(ctx, operatorsByQuorum)
if err != nil {
return nil, err
}

return getOperatorState(operatorsByQuorum, uint32(blockNumber), cs.SocketMap)
}

Expand All @@ -62,21 +73,26 @@ func (cs *ChainState) GetOperatorState(ctx context.Context, blockNumber uint, qu
return nil, err
}

// for all operators in operatorsByQuorum, check if the socket is in the map
missingOperatorIds := make([]core.OperatorID, 0)
for _, quorum := range operatorsByQuorum {
for _, operator := range quorum {
missingOperatorIds = append(missingOperatorIds, operator.OperatorID)
}
}

if err := cs.buildSocketMap(ctx, missingOperatorIds); err != nil {
return nil, err
}
// Index for recent socket updates
if err := cs.indexSocketMap(ctx); err != nil {
// // for all operators in operatorsByQuorum, check if the socket is in the map
// missingOperatorIds := make([]core.OperatorID, 0)
// for _, quorum := range operatorsByQuorum {
// for _, operator := range quorum {
// missingOperatorIds = append(missingOperatorIds, operator.OperatorID)
// }
// }

// if err := cs.buildSocketMap(ctx, missingOperatorIds); err != nil {
// return nil, err
// }
// // Index for recent socket updates
// if err := cs.indexSocketMap(ctx); err != nil {
// return nil, err
// }
err = cs.refreshSocketMap(ctx, operatorsByQuorum)
if err != nil {
return nil, err
}

return getOperatorState(operatorsByQuorum, uint32(blockNumber), cs.SocketMap)
}

Expand Down Expand Up @@ -114,20 +130,21 @@ func (cs *ChainState) buildSocketMap(ctx context.Context, operatorIds []core.Ope
}

cs.socketMu.Lock()
defer cs.socketMu.Unlock()
for operatorID, socket := range socketMap {
cs.SocketMap[operatorID] = socket
}
cs.socketMu.Unlock()

return nil
}

// indexSocketMap preloads the socket map for the default quorums at the current block.
// indexSocketMap indexes a block range of socket update events
func (cs *ChainState) indexSocketMap(ctx context.Context) error {
currentBlockNumber, err := cs.GetCurrentBlockNumber()
if err != nil {
return err
}

registryCoordinator, err := cs.Tx.RegistryCoordinator(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -176,38 +193,48 @@ func (cs *ChainState) indexSocketMap(ctx context.Context) error {
}

var socket string
if method.Name == "registerOperator" {
if method.Name == "registerOperator" || method.Name == "registerOperatorWithChurn" {
socket = inputs[1].(string)
} else if method.Name == "updateOperatorSocket" {
socket = inputs[0].(string)
} else {
return fmt.Errorf("unknown method filtered for socket update event: %s", method.Name)
}
// operatorAddr := gcommon.BytesToAddress(log.Topics[1].Bytes())
operatorID := core.OperatorID(log.Topics[1].Bytes())
// fmt.Println("operator id", operatorID)
// fmt.Println("operator addr", operatorAddr)
// fmt.Println("operator addr hex", operatorAddr.Hex())
// operatorId, err := core.OperatorIDFromHex(operatorAddr.Hex())
// if err != nil {
// fmt.Println(err)
// return err
// }

socketUpdates = append(socketUpdates, &socketUpdateParams{
Socket: socket,
OperatorID: operatorID,
})
}

cs.socketMu.Lock()
defer cs.socketMu.Unlock()
for _, socketUpdate := range socketUpdates {
cs.SocketMap[core.OperatorID(socketUpdate.OperatorID)] = &socketUpdate.Socket
cs.SocketMap[socketUpdate.OperatorID] = &socketUpdate.Socket
}

cs.socketPrevBlockNumber = uint32(currentBlockNumber)
cs.socketMu.Unlock()

return nil
}

// refreshSocketMap refresh the socket map for the given operators by quorums at the current block.
func (cs *ChainState) refreshSocketMap(ctx context.Context, operatorsByQuorum core.OperatorStakes) error {
// for all operators in operatorsByQuorum, check if the socket is in the map
missingOperatorIds := make([]core.OperatorID, 0)
for _, quorum := range operatorsByQuorum {
for _, operator := range quorum {
missingOperatorIds = append(missingOperatorIds, operator.OperatorID)
}
}

if err := cs.buildSocketMap(ctx, missingOperatorIds); err != nil {
return err
}

// Index for recent socket updates
if err := cs.indexSocketMap(ctx); err != nil {
return err
}
return nil
}

Expand Down

0 comments on commit 0eddf4e

Please sign in to comment.