diff --git a/internal/service/indexer/l2/indexer.go b/internal/service/indexer/l2/indexer.go index 3a77a25..648cc24 100644 --- a/internal/service/indexer/l2/indexer.go +++ b/internal/service/indexer/l2/indexer.go @@ -105,7 +105,9 @@ func (s *server) run(ctx context.Context) (err error) { blockNumberCurrent := s.checkpoint.BlockNumber + 1 // Get current block (header and transactions). - block, err := s.ethereumClient.BlockByNumber(ctx, new(big.Int).SetUint64(blockNumberCurrent)) + blockNumberBigInt := new(big.Int).SetUint64(blockNumberCurrent) + + block, err := s.ethereumClient.BlockByNumber(ctx, blockNumberBigInt) if err != nil { return fmt.Errorf("get block %d: %w", blockNumberCurrent, err) } @@ -116,23 +118,42 @@ func (s *server) run(ctx context.Context) (err error) { return fmt.Errorf("get receipts for block %d: %w", block.NumberU64(), err) } - if err := s.index(ctx, block, receipts); err != nil { + possibleBillingEpoch, err := s.index(ctx, block, receipts) + if err != nil { return fmt.Errorf("index block %d: %w", blockNumberCurrent, err) } + + // Exec pp billing flow + go s.execPPBillingFlow(blockNumberBigInt, possibleBillingEpoch) + } +} + +func (s *server) execPPBillingFlow(blockNumber *big.Int, epoch *big.Int) { + // Step 2: check if is last batch of this epoch (use go routine to prevent possible transaction stuck) + if blockNumber == nil || epoch == nil { + // Invalid + return + } + + zap.L().Debug("close epoch check start", zap.Uint64("epoch", epoch.Uint64())) + + if err := s.closeEpoch(context.Background(), blockNumber, epoch); err != nil { + zap.L().Error("close epoch check failed", zap.Uint64("epoch", epoch.Uint64()), zap.Error(err)) } } -func (s *server) index(ctx context.Context, block *types.Block, receipts types.Receipts) error { +func (s *server) index(ctx context.Context, block *types.Block, receipts types.Receipts) (*big.Int, error) { // Begin a database transaction for the block. databaseTransaction, err := s.databaseClient.Begin(ctx) if err != nil { - return fmt.Errorf("begin database transaction: %w", err) + return nil, fmt.Errorf("begin database transaction: %w", err) } defer lo.Try(databaseTransaction.Rollback) - if err = s.processIndex(ctx, block, receipts, databaseTransaction); err != nil { - return fmt.Errorf("process index: %w", err) + possibleBillingEpoch, err := s.processIndex(ctx, block, receipts, databaseTransaction) + if err != nil { + return nil, fmt.Errorf("process index: %w", err) } // Update and save checkpoint to memory and database. @@ -140,19 +161,21 @@ func (s *server) index(ctx context.Context, block *types.Block, receipts types.R s.checkpoint.BlockNumber = block.NumberU64() if err := databaseTransaction.SaveCheckpoint(ctx, s.checkpoint); err != nil { - return fmt.Errorf("save checkpoint: %w", err) + return nil, fmt.Errorf("save checkpoint: %w", err) } if databaseTransaction.Commit() != nil { - return fmt.Errorf("commit database transaction: %w", err) + return nil, fmt.Errorf("commit database transaction: %w", err) } - return nil + return possibleBillingEpoch, nil } -func (s *server) processIndex(ctx context.Context, block *types.Block, receipts types.Receipts, databaseTransaction database.Client) error { +func (s *server) processIndex(ctx context.Context, block *types.Block, receipts types.Receipts, databaseTransaction database.Client) (*big.Int, error) { header := block.Header() + var possibleBillingEpoch *big.Int // = nil + for _, receipt := range receipts { // Discard all contract creation transactions. tx := block.Transaction(receipt.TxHash) @@ -176,30 +199,39 @@ func (s *server) processIndex(ctx context.Context, block *types.Block, receipts continue } - err := s.processLog(ctx, block, receipt, databaseTransaction, log, header, index) // WHY IS THIS LINTER SO ANNOYING + possibleBillingEpochThisLog, err := s.processLog(ctx, block, receipt, databaseTransaction, log, header, index) // WHY IS THIS LINTER SO ANNOYING if err != nil { - return err + return nil, err + } + + if possibleBillingEpochThisLog != nil { + possibleBillingEpoch = possibleBillingEpochThisLog } } } - return nil + return possibleBillingEpoch, nil } -func (s *server) processLog(ctx context.Context, block *types.Block, receipt *types.Receipt, databaseTransaction database.Client, log *types.Log, header *types.Header, index int) error { +func (s *server) processLog(ctx context.Context, block *types.Block, receipt *types.Receipt, databaseTransaction database.Client, log *types.Log, header *types.Header, index int) (*big.Int, error) { + var ( + possibleBillingEpoch *big.Int // = nil + err error + ) + switch log.Address { case l2.ContractMap[s.chainID.Uint64()].AddressBillingProxy: - if err := s.indexBillingLog(ctx, header, block.Transaction(log.TxHash), receipt, log, index, databaseTransaction); err != nil { - return fmt.Errorf("index billing log %s %d: %w", log.TxHash, log.Index, err) + if err = s.indexBillingLog(ctx, header, block.Transaction(log.TxHash), receipt, log, index, databaseTransaction); err != nil { + return nil, fmt.Errorf("index billing log %s %d: %w", log.TxHash, log.Index, err) } case l2.ContractMap[s.chainID.Uint64()].AddressStakingProxy: - if err := s.indexStakingLog(ctx, header, block.Transaction(log.TxHash), receipt, log, index, databaseTransaction); err != nil { - return fmt.Errorf("index staking log %s %d: %w", log.TxHash, log.Index, err) + if possibleBillingEpoch, err = s.indexStakingLog(ctx, header, block.Transaction(log.TxHash), receipt, log, index, databaseTransaction); err != nil { + return nil, fmt.Errorf("index staking log %s %d: %w", log.TxHash, log.Index, err) } } - return nil + return possibleBillingEpoch, nil } func NewServer(ctx context.Context, databaseClient database.Client, controlClient *control.StateClientWriter, redisClient *redis.Client, config *Config, billingConfig *config.Billing, settlerConfig *config.Settler) (service.Server, error) { diff --git a/internal/service/indexer/l2/indexer_staking.go b/internal/service/indexer/l2/indexer_staking.go index 6af8c16..0eef85a 100644 --- a/internal/service/indexer/l2/indexer_staking.go +++ b/internal/service/indexer/l2/indexer_staking.go @@ -5,29 +5,26 @@ import ( "fmt" "math/big" - "github.com/ethereum/go-ethereum/accounts/abi/bind" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/rss3-network/payment-processor/contract/l2" "github.com/rss3-network/payment-processor/internal/database" - "github.com/rss3-network/payment-processor/internal/service/indexer/constants" "github.com/rss3-network/payment-processor/schema" "go.uber.org/zap" ) -func (s *server) indexStakingLog(ctx context.Context, header *types.Header, _ *types.Transaction, _ *types.Receipt, log *types.Log, _ int, databaseTransaction database.Client) error { +func (s *server) indexStakingLog(ctx context.Context, header *types.Header, _ *types.Transaction, _ *types.Receipt, log *types.Log, _ int, databaseTransaction database.Client) (*big.Int, error) { switch eventHash := log.Topics[0]; eventHash { case l2.EventHashStakingRewardDistributed: return s.indexStakingDistributeRewardsLog(ctx, header, log, databaseTransaction) default: - return nil + return nil, nil } } -func (s *server) indexStakingDistributeRewardsLog(ctx context.Context, header *types.Header, log *types.Log, _ database.Client) error { +func (s *server) indexStakingDistributeRewardsLog(ctx context.Context, _ *types.Header, log *types.Log, databaseTransaction database.Client) (*big.Int, error) { stakingDistributeRewardsEvent, err := s.contractStaking.ParseRewardDistributed(*log) if err != nil { - return fmt.Errorf("parse RewardDistributed event: %w", err) + return nil, fmt.Errorf("parse RewardDistributed event: %w", err) } // The workflow here is: @@ -55,7 +52,7 @@ func (s *server) indexStakingDistributeRewardsLog(ctx context.Context, header *t // Save all node request count // Since closeEpoch depends on this, // here we use databaseClient directly rather than insert transaction - err = s.databaseClient.SaveNodeRequestCount(ctx, &schema.NodeRequestRecord{ + err = databaseTransaction.SaveNodeRequestCount(ctx, &schema.NodeRequestRecord{ NodeAddress: nodeAddr, Epoch: stakingDistributeRewardsEvent.Epoch, RequestCount: stakingDistributeRewardsEvent.RequestCounts[i], @@ -69,142 +66,9 @@ func (s *server) indexStakingDistributeRewardsLog(ctx context.Context, header *t zap.Error(err), ) - return fmt.Errorf("save node %s request count %d: %w", nodeAddr.Hex(), stakingDistributeRewardsEvent.RequestCounts[i].Int64(), err) + return nil, fmt.Errorf("save node %s request count %d: %w", nodeAddr.Hex(), stakingDistributeRewardsEvent.RequestCounts[i].Int64(), err) } } - // Step 2: check if is last batch of this epoch (use go routine to prevent possible transaction stuck) - go func() { - zap.L().Debug("close epoch check start", zap.Uint64("epoch", stakingDistributeRewardsEvent.Epoch.Uint64())) - - err := s.closeEpoch(context.Background(), header.Number, stakingDistributeRewardsEvent.Epoch) - if err != nil { - zap.L().Error("close epoch check failed", zap.Uint64("epoch", stakingDistributeRewardsEvent.Epoch.Uint64()), zap.Error(err)) - } - }() - - return nil // No error -} - -func (s *server) closeEpoch(ctx context.Context, blockNumber *big.Int, epoch *big.Int) error { - isStillProceeding, err := s.contractStaking.IsSettlementPhase(&bind.CallOpts{ - Context: ctx, - BlockNumber: blockNumber, - }) - - if err != nil { - return fmt.Errorf("failed to check is settlement phase: %w", err) - } - - if isStillProceeding { - // Not last batch, return - return nil - } // else is last batch, start proceed - - // 2.1. Set mutex lock - isMutexLockSuccessful, err := s.redisClient.SetNX(ctx, constants.EpochMutexLockKey, 1, constants.EpochMutexExpiration).Result() - - if err != nil { - return fmt.Errorf("failed to set mutex lock with redis: %w", err) - } - - if !isMutexLockSuccessful { - // A process already running, skip - return nil - } - - // Defer release mutex lock - defer s.redisClient.Del(ctx, constants.EpochMutexLockKey) - - s.closeEpochExec(ctx, epoch) // This cannot retry when error happens, so just report errors to slack rather than retry it - - return nil -} - -func (s *server) closeEpochExec(ctx context.Context, epoch *big.Int) { - // 2.2-3. billing - zap.L().Debug("closeEpochExec: 2.2-3. billing") - - totalCollected, err := s.billingFlow(ctx, epoch) - if err != nil { - zap.L().Error("failed to execute closeEpochExec: 2.2-3. billing", zap.Error(err)) - s.ReportFailedTransactionToSlack(err, nil, "closeEpochExec: 2.2-3. billing", nil, nil) - - return - } - - zap.L().Debug("billing flow total collect", zap.String("token", totalCollected.String())) - - if totalCollected.Cmp(big.NewInt(0)) == 0 { - // No request fees collect in this epoch, skip - zap.L().Info("no request fees collect in this epoch, skip") - - return - } - - // 2.4. calc request percentage - zap.L().Debug("closeEpochExec: 2.4. calc request percentage") - - allNodes, err := s.databaseClient.FindNodeRequestRewardsByEpoch(ctx, epoch) - if err != nil { - zap.L().Error("failed to execute closeEpochExec: 2.4. calc request percentage", zap.Error(err)) - s.ReportFailedTransactionToSlack(err, nil, "closeEpochExec: 2.4. calc request percentage", nil, nil) - - return - } - - if len(allNodes) == 0 { - zap.L().Debug("No active nodes in current epoch, skip") - return - } - - zap.L().Debug("All nodes found, start contribution calc", zap.Uint64("epoch", epoch.Uint64()), zap.Any("nodes", allNodes)) - - // Sum all requests count - totalRequestCount := big.NewInt(0) - - for _, node := range allNodes { - totalRequestCount.Add(totalRequestCount, node.RequestCount) - } - - if totalRequestCount.Cmp(big.NewInt(0)) == 0 { - // No requests happened in this epoch, skip - zap.L().Info("no requests happened in this epoch, skip") - - return - } - - // Calculate reward per request - rewardPerRequest := new(big.Int).Quo(totalCollected, totalRequestCount) - zap.L().Info( - "epoch reward per request", - zap.Uint64("epoch", epoch.Uint64()), - zap.String("totalRewards", totalCollected.String()), - zap.String("totalRequests", totalRequestCount.String()), - zap.String("rewardPerRequest", rewardPerRequest.String()), - ) - - // Calculate reward for nodes - rewardNodesAddress := []common.Address{} - rewardNodesAmount := []*big.Int{} - - for _, node := range allNodes { - // Calculate reward per node - reward := new(big.Int).Mul(rewardPerRequest, node.RequestCount) - - // Save into database - err = s.databaseClient.SetNodeRequestRewards(ctx, epoch, node.NodeAddress, reward) - if err != nil { - // Error, but no need to abort - zap.L().Error("update node request rewards", zap.String("address", node.NodeAddress.String()), zap.String("amount", reward.String()), zap.Any("node", node), zap.Error(err)) - } - - rewardNodesAddress = append(rewardNodesAddress, node.NodeAddress) - rewardNodesAmount = append(rewardNodesAmount, reward) - } - - // 2.5. billing: distribute request rewards - zap.L().Debug("closeEpochExec: 2.5. billing: distribute request rewards") - - s.distributeRequestRewards(ctx, rewardNodesAddress, rewardNodesAmount) + return stakingDistributeRewardsEvent.Epoch, nil // No error } diff --git a/internal/service/indexer/l2/payment_processor.go b/internal/service/indexer/l2/payment_processor.go new file mode 100644 index 0000000..56faeb2 --- /dev/null +++ b/internal/service/indexer/l2/payment_processor.go @@ -0,0 +1,135 @@ +package l2 + +import ( + "context" + "fmt" + "math/big" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/rss3-network/payment-processor/internal/service/indexer/constants" + "go.uber.org/zap" +) + +func (s *server) closeEpoch(ctx context.Context, blockNumber *big.Int, epoch *big.Int) error { + isStillProceeding, err := s.contractStaking.IsSettlementPhase(&bind.CallOpts{ + Context: ctx, + BlockNumber: blockNumber, + }) + + if err != nil { + return fmt.Errorf("failed to check is settlement phase: %w", err) + } + + if isStillProceeding { + // Not last batch, return + return nil + } // else is last batch, start proceed + + // 2.1. Set mutex lock + isMutexLockSuccessful, err := s.redisClient.SetNX(ctx, constants.EpochMutexLockKey, 1, constants.EpochMutexExpiration).Result() + + if err != nil { + return fmt.Errorf("failed to set mutex lock with redis: %w", err) + } + + if !isMutexLockSuccessful { + // A process already running, skip + return nil + } + + // Defer release mutex lock + defer s.redisClient.Del(ctx, constants.EpochMutexLockKey) + + s.closeEpochExec(ctx, epoch) // This cannot retry when error happens, so just report errors to slack rather than retry it + + return nil +} + +func (s *server) closeEpochExec(ctx context.Context, epoch *big.Int) { + // 2.2-3. billing + zap.L().Debug("closeEpochExec: 2.2-3. billing") + + totalCollected, err := s.billingFlow(ctx, epoch) + if err != nil { + zap.L().Error("failed to execute closeEpochExec: 2.2-3. billing", zap.Error(err)) + s.ReportFailedTransactionToSlack(err, nil, "closeEpochExec: 2.2-3. billing", nil, nil) + + return + } + + zap.L().Debug("billing flow total collect", zap.String("token", totalCollected.String())) + + if totalCollected.Cmp(big.NewInt(0)) == 0 { + // No request fees collect in this epoch, skip + zap.L().Info("no request fees collect in this epoch, skip") + + return + } + + // 2.4. calc request percentage + zap.L().Debug("closeEpochExec: 2.4. calc request percentage") + + allNodes, err := s.databaseClient.FindNodeRequestRewardsByEpoch(ctx, epoch) + if err != nil { + zap.L().Error("failed to execute closeEpochExec: 2.4. calc request percentage", zap.Error(err)) + s.ReportFailedTransactionToSlack(err, nil, "closeEpochExec: 2.4. calc request percentage", nil, nil) + + return + } + + if len(allNodes) == 0 { + zap.L().Debug("No active nodes in current epoch, skip") + return + } + + zap.L().Debug("All nodes found, start contribution calc", zap.Uint64("epoch", epoch.Uint64()), zap.Any("nodes", allNodes)) + + // Sum all requests count + totalRequestCount := big.NewInt(0) + + for _, node := range allNodes { + totalRequestCount.Add(totalRequestCount, node.RequestCount) + } + + if totalRequestCount.Cmp(big.NewInt(0)) == 0 { + // No requests happened in this epoch, skip + zap.L().Info("no requests happened in this epoch, skip") + + return + } + + // Calculate reward per request + rewardPerRequest := new(big.Int).Quo(totalCollected, totalRequestCount) + zap.L().Info( + "epoch reward per request", + zap.Uint64("epoch", epoch.Uint64()), + zap.String("totalRewards", totalCollected.String()), + zap.String("totalRequests", totalRequestCount.String()), + zap.String("rewardPerRequest", rewardPerRequest.String()), + ) + + // Calculate reward for nodes + rewardNodesAddress := []common.Address{} + rewardNodesAmount := []*big.Int{} + + for _, node := range allNodes { + // Calculate reward per node + reward := new(big.Int).Mul(rewardPerRequest, node.RequestCount) + + // Save into database + err = s.databaseClient.SetNodeRequestRewards(ctx, epoch, node.NodeAddress, reward) + if err != nil { + // Error, but no need to abort + zap.L().Error("update node request rewards", zap.String("address", node.NodeAddress.String()), zap.String("amount", reward.String()), zap.Any("node", node), zap.Error(err)) + } + + rewardNodesAddress = append(rewardNodesAddress, node.NodeAddress) + rewardNodesAmount = append(rewardNodesAmount, reward) + } + + // 2.5. billing: distribute request rewards + zap.L().Debug("closeEpochExec: 2.5. billing: distribute request rewards") + + s.distributeRequestRewards(ctx, rewardNodesAddress, rewardNodesAmount) +}