Skip to content

Commit

Permalink
chore: check for billing after index flow
Browse files Browse the repository at this point in the history
not sure if this is the final solution
  • Loading branch information
Candinya committed Jul 23, 2024
1 parent a1e7892 commit fc91c80
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 162 deletions.
70 changes: 51 additions & 19 deletions internal/service/indexer/l2/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ func (s *server) run(ctx context.Context) (err error) {
blockNumberCurrent := s.checkpoint.BlockNumber + 1

// Get current block (header and transactions).
block, err := s.ethereumClient.BlockByNumber(ctx, new(big.Int).SetUint64(blockNumberCurrent))
blockNumberBigInt := new(big.Int).SetUint64(blockNumberCurrent)

block, err := s.ethereumClient.BlockByNumber(ctx, blockNumberBigInt)
if err != nil {
return fmt.Errorf("get block %d: %w", blockNumberCurrent, err)
}
Expand All @@ -116,43 +118,64 @@ func (s *server) run(ctx context.Context) (err error) {
return fmt.Errorf("get receipts for block %d: %w", block.NumberU64(), err)
}

if err := s.index(ctx, block, receipts); err != nil {
possibleBillingEpoch, err := s.index(ctx, block, receipts)
if err != nil {
return fmt.Errorf("index block %d: %w", blockNumberCurrent, err)
}

// Exec pp billing flow
go s.execPPBillingFlow(blockNumberBigInt, possibleBillingEpoch)
}
}

func (s *server) execPPBillingFlow(blockNumber *big.Int, epoch *big.Int) {
// Step 2: check if is last batch of this epoch (use go routine to prevent possible transaction stuck)
if blockNumber == nil || epoch == nil {
// Invalid
return
}

zap.L().Debug("close epoch check start", zap.Uint64("epoch", epoch.Uint64()))

if err := s.closeEpoch(context.Background(), blockNumber, epoch); err != nil {
zap.L().Error("close epoch check failed", zap.Uint64("epoch", epoch.Uint64()), zap.Error(err))
}
}

func (s *server) index(ctx context.Context, block *types.Block, receipts types.Receipts) error {
func (s *server) index(ctx context.Context, block *types.Block, receipts types.Receipts) (*big.Int, error) {
// Begin a database transaction for the block.
databaseTransaction, err := s.databaseClient.Begin(ctx)
if err != nil {
return fmt.Errorf("begin database transaction: %w", err)
return nil, fmt.Errorf("begin database transaction: %w", err)
}

defer lo.Try(databaseTransaction.Rollback)

if err = s.processIndex(ctx, block, receipts, databaseTransaction); err != nil {
return fmt.Errorf("process index: %w", err)
possibleBillingEpoch, err := s.processIndex(ctx, block, receipts, databaseTransaction)
if err != nil {
return nil, fmt.Errorf("process index: %w", err)
}

// Update and save checkpoint to memory and database.
s.checkpoint.BlockHash = block.Hash()
s.checkpoint.BlockNumber = block.NumberU64()

if err := databaseTransaction.SaveCheckpoint(ctx, s.checkpoint); err != nil {
return fmt.Errorf("save checkpoint: %w", err)
return nil, fmt.Errorf("save checkpoint: %w", err)
}

if databaseTransaction.Commit() != nil {
return fmt.Errorf("commit database transaction: %w", err)
return nil, fmt.Errorf("commit database transaction: %w", err)
}

return nil
return possibleBillingEpoch, nil
}

func (s *server) processIndex(ctx context.Context, block *types.Block, receipts types.Receipts, databaseTransaction database.Client) error {
func (s *server) processIndex(ctx context.Context, block *types.Block, receipts types.Receipts, databaseTransaction database.Client) (*big.Int, error) {
header := block.Header()

var possibleBillingEpoch *big.Int // = nil

for _, receipt := range receipts {
// Discard all contract creation transactions.
tx := block.Transaction(receipt.TxHash)
Expand All @@ -176,30 +199,39 @@ func (s *server) processIndex(ctx context.Context, block *types.Block, receipts
continue
}

err := s.processLog(ctx, block, receipt, databaseTransaction, log, header, index) // WHY IS THIS LINTER SO ANNOYING
possibleBillingEpochThisLog, err := s.processLog(ctx, block, receipt, databaseTransaction, log, header, index) // WHY IS THIS LINTER SO ANNOYING

if err != nil {
return err
return nil, err
}

if possibleBillingEpochThisLog != nil {
possibleBillingEpoch = possibleBillingEpochThisLog
}
}
}

return nil
return possibleBillingEpoch, nil
}

func (s *server) processLog(ctx context.Context, block *types.Block, receipt *types.Receipt, databaseTransaction database.Client, log *types.Log, header *types.Header, index int) error {
func (s *server) processLog(ctx context.Context, block *types.Block, receipt *types.Receipt, databaseTransaction database.Client, log *types.Log, header *types.Header, index int) (*big.Int, error) {
var (
possibleBillingEpoch *big.Int // = nil
err error
)

switch log.Address {
case l2.ContractMap[s.chainID.Uint64()].AddressBillingProxy:
if err := s.indexBillingLog(ctx, header, block.Transaction(log.TxHash), receipt, log, index, databaseTransaction); err != nil {
return fmt.Errorf("index billing log %s %d: %w", log.TxHash, log.Index, err)
if err = s.indexBillingLog(ctx, header, block.Transaction(log.TxHash), receipt, log, index, databaseTransaction); err != nil {
return nil, fmt.Errorf("index billing log %s %d: %w", log.TxHash, log.Index, err)
}
case l2.ContractMap[s.chainID.Uint64()].AddressStakingProxy:
if err := s.indexStakingLog(ctx, header, block.Transaction(log.TxHash), receipt, log, index, databaseTransaction); err != nil {
return fmt.Errorf("index staking log %s %d: %w", log.TxHash, log.Index, err)
if possibleBillingEpoch, err = s.indexStakingLog(ctx, header, block.Transaction(log.TxHash), receipt, log, index, databaseTransaction); err != nil {
return nil, fmt.Errorf("index staking log %s %d: %w", log.TxHash, log.Index, err)
}
}

return nil
return possibleBillingEpoch, nil
}

func NewServer(ctx context.Context, databaseClient database.Client, controlClient *control.StateClientWriter, redisClient *redis.Client, config *Config, billingConfig *config.Billing, settlerConfig *config.Settler) (service.Server, error) {
Expand Down
150 changes: 7 additions & 143 deletions internal/service/indexer/l2/indexer_staking.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,26 @@ import (
"fmt"
"math/big"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/rss3-network/payment-processor/contract/l2"
"github.com/rss3-network/payment-processor/internal/database"
"github.com/rss3-network/payment-processor/internal/service/indexer/constants"
"github.com/rss3-network/payment-processor/schema"
"go.uber.org/zap"
)

func (s *server) indexStakingLog(ctx context.Context, header *types.Header, _ *types.Transaction, _ *types.Receipt, log *types.Log, _ int, databaseTransaction database.Client) error {
func (s *server) indexStakingLog(ctx context.Context, header *types.Header, _ *types.Transaction, _ *types.Receipt, log *types.Log, _ int, databaseTransaction database.Client) (*big.Int, error) {
switch eventHash := log.Topics[0]; eventHash {
case l2.EventHashStakingRewardDistributed:
return s.indexStakingDistributeRewardsLog(ctx, header, log, databaseTransaction)
default:
return nil
return nil, nil
}
}

func (s *server) indexStakingDistributeRewardsLog(ctx context.Context, header *types.Header, log *types.Log, _ database.Client) error {
func (s *server) indexStakingDistributeRewardsLog(ctx context.Context, _ *types.Header, log *types.Log, databaseTransaction database.Client) (*big.Int, error) {
stakingDistributeRewardsEvent, err := s.contractStaking.ParseRewardDistributed(*log)
if err != nil {
return fmt.Errorf("parse RewardDistributed event: %w", err)
return nil, fmt.Errorf("parse RewardDistributed event: %w", err)
}

// The workflow here is:
Expand Down Expand Up @@ -55,7 +52,7 @@ func (s *server) indexStakingDistributeRewardsLog(ctx context.Context, header *t
// Save all node request count
// Since closeEpoch depends on this,
// here we use databaseClient directly rather than insert transaction
err = s.databaseClient.SaveNodeRequestCount(ctx, &schema.NodeRequestRecord{
err = databaseTransaction.SaveNodeRequestCount(ctx, &schema.NodeRequestRecord{
NodeAddress: nodeAddr,
Epoch: stakingDistributeRewardsEvent.Epoch,
RequestCount: stakingDistributeRewardsEvent.RequestCounts[i],
Expand All @@ -69,142 +66,9 @@ func (s *server) indexStakingDistributeRewardsLog(ctx context.Context, header *t
zap.Error(err),
)

return fmt.Errorf("save node %s request count %d: %w", nodeAddr.Hex(), stakingDistributeRewardsEvent.RequestCounts[i].Int64(), err)
return nil, fmt.Errorf("save node %s request count %d: %w", nodeAddr.Hex(), stakingDistributeRewardsEvent.RequestCounts[i].Int64(), err)
}
}

// Step 2: check if is last batch of this epoch (use go routine to prevent possible transaction stuck)
go func() {
zap.L().Debug("close epoch check start", zap.Uint64("epoch", stakingDistributeRewardsEvent.Epoch.Uint64()))

err := s.closeEpoch(context.Background(), header.Number, stakingDistributeRewardsEvent.Epoch)
if err != nil {
zap.L().Error("close epoch check failed", zap.Uint64("epoch", stakingDistributeRewardsEvent.Epoch.Uint64()), zap.Error(err))
}
}()

return nil // No error
}

func (s *server) closeEpoch(ctx context.Context, blockNumber *big.Int, epoch *big.Int) error {
isStillProceeding, err := s.contractStaking.IsSettlementPhase(&bind.CallOpts{
Context: ctx,
BlockNumber: blockNumber,
})

if err != nil {
return fmt.Errorf("failed to check is settlement phase: %w", err)
}

if isStillProceeding {
// Not last batch, return
return nil
} // else is last batch, start proceed

// 2.1. Set mutex lock
isMutexLockSuccessful, err := s.redisClient.SetNX(ctx, constants.EpochMutexLockKey, 1, constants.EpochMutexExpiration).Result()

if err != nil {
return fmt.Errorf("failed to set mutex lock with redis: %w", err)
}

if !isMutexLockSuccessful {
// A process already running, skip
return nil
}

// Defer release mutex lock
defer s.redisClient.Del(ctx, constants.EpochMutexLockKey)

s.closeEpochExec(ctx, epoch) // This cannot retry when error happens, so just report errors to slack rather than retry it

return nil
}

func (s *server) closeEpochExec(ctx context.Context, epoch *big.Int) {
// 2.2-3. billing
zap.L().Debug("closeEpochExec: 2.2-3. billing")

totalCollected, err := s.billingFlow(ctx, epoch)
if err != nil {
zap.L().Error("failed to execute closeEpochExec: 2.2-3. billing", zap.Error(err))
s.ReportFailedTransactionToSlack(err, nil, "closeEpochExec: 2.2-3. billing", nil, nil)

return
}

zap.L().Debug("billing flow total collect", zap.String("token", totalCollected.String()))

if totalCollected.Cmp(big.NewInt(0)) == 0 {
// No request fees collect in this epoch, skip
zap.L().Info("no request fees collect in this epoch, skip")

return
}

// 2.4. calc request percentage
zap.L().Debug("closeEpochExec: 2.4. calc request percentage")

allNodes, err := s.databaseClient.FindNodeRequestRewardsByEpoch(ctx, epoch)
if err != nil {
zap.L().Error("failed to execute closeEpochExec: 2.4. calc request percentage", zap.Error(err))
s.ReportFailedTransactionToSlack(err, nil, "closeEpochExec: 2.4. calc request percentage", nil, nil)

return
}

if len(allNodes) == 0 {
zap.L().Debug("No active nodes in current epoch, skip")
return
}

zap.L().Debug("All nodes found, start contribution calc", zap.Uint64("epoch", epoch.Uint64()), zap.Any("nodes", allNodes))

// Sum all requests count
totalRequestCount := big.NewInt(0)

for _, node := range allNodes {
totalRequestCount.Add(totalRequestCount, node.RequestCount)
}

if totalRequestCount.Cmp(big.NewInt(0)) == 0 {
// No requests happened in this epoch, skip
zap.L().Info("no requests happened in this epoch, skip")

return
}

// Calculate reward per request
rewardPerRequest := new(big.Int).Quo(totalCollected, totalRequestCount)
zap.L().Info(
"epoch reward per request",
zap.Uint64("epoch", epoch.Uint64()),
zap.String("totalRewards", totalCollected.String()),
zap.String("totalRequests", totalRequestCount.String()),
zap.String("rewardPerRequest", rewardPerRequest.String()),
)

// Calculate reward for nodes
rewardNodesAddress := []common.Address{}
rewardNodesAmount := []*big.Int{}

for _, node := range allNodes {
// Calculate reward per node
reward := new(big.Int).Mul(rewardPerRequest, node.RequestCount)

// Save into database
err = s.databaseClient.SetNodeRequestRewards(ctx, epoch, node.NodeAddress, reward)
if err != nil {
// Error, but no need to abort
zap.L().Error("update node request rewards", zap.String("address", node.NodeAddress.String()), zap.String("amount", reward.String()), zap.Any("node", node), zap.Error(err))
}

rewardNodesAddress = append(rewardNodesAddress, node.NodeAddress)
rewardNodesAmount = append(rewardNodesAmount, reward)
}

// 2.5. billing: distribute request rewards
zap.L().Debug("closeEpochExec: 2.5. billing: distribute request rewards")

s.distributeRequestRewards(ctx, rewardNodesAddress, rewardNodesAmount)
return stakingDistributeRewardsEvent.Epoch, nil // No error
}
Loading

0 comments on commit fc91c80

Please sign in to comment.