Skip to content

Commit

Permalink
query optimization: per event check interval
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaozhou committed Jun 6, 2020
1 parent bc936e4 commit 0955b1e
Show file tree
Hide file tree
Showing 17 changed files with 128 additions and 45 deletions.
18 changes: 10 additions & 8 deletions cnode/cooperativewithdraw/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,10 @@ func (p *Processor) monitorOnAllLedgers() {

func (p *Processor) monitorEvent(ledgerContract chain.Contract) {
monitorCfg := &monitor.Config{
EventName: event.CooperativeWithdraw,
Contract: ledgerContract,
StartBlock: p.monitorService.GetCurrentBlockNumber(),
EventName: event.CooperativeWithdraw,
Contract: ledgerContract,
StartBlock: p.monitorService.GetCurrentBlockNumber(),
CheckInterval: p.nodeConfig.GetCheckInterval(event.CooperativeWithdraw),
}
_, err := p.monitorService.Monitor(monitorCfg,
func(id monitor.CallbackID, eLog types.Log) {
Expand All @@ -161,11 +162,12 @@ func (p *Processor) monitorSingleEvent(ledgerContract chain.Contract, reset bool
startBlock := p.monitorService.GetCurrentBlockNumber()
endBlock := new(big.Int).Add(startBlock, big.NewInt(int64(config.CooperativeWithdrawTimeout)))
monitorCfg := &monitor.Config{
EventName: event.CooperativeWithdraw,
Contract: ledgerContract,
StartBlock: startBlock,
EndBlock: endBlock,
Reset: reset,
EventName: event.CooperativeWithdraw,
Contract: ledgerContract,
StartBlock: startBlock,
EndBlock: endBlock,
Reset: reset,
CheckInterval: p.nodeConfig.GetCheckInterval(event.CooperativeWithdraw),
}
_, err := p.monitorService.Monitor(monitorCfg,
func(id monitor.CallbackID, eLog types.Log) {
Expand Down
20 changes: 11 additions & 9 deletions cnode/open_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import (
"github.com/celer-network/goCeler/rpc"
"github.com/celer-network/goCeler/rtconfig"
"github.com/celer-network/goCeler/storage"
"github.com/celer-network/goutils/eth"
"github.com/celer-network/goCeler/utils"
"github.com/celer-network/goutils/eth"
"github.com/celer-network/goutils/log"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
ethcommon "github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -969,9 +969,10 @@ func (p *openChannelProcessor) monitorOnAllLedgers() {

func (p *openChannelProcessor) monitorEvent(ledgerContract chain.Contract) {
monitorCfg := &monitor.Config{
EventName: event.OpenChannel,
Contract: ledgerContract,
StartBlock: p.monitorService.GetCurrentBlockNumber(),
EventName: event.OpenChannel,
Contract: ledgerContract,
StartBlock: p.monitorService.GetCurrentBlockNumber(),
CheckInterval: p.nodeConfig.GetCheckInterval(event.OpenChannel),
}
_, err := p.monitorService.Monitor(monitorCfg,
func(id monitor.CallbackID, eLog types.Log) {
Expand Down Expand Up @@ -1009,11 +1010,12 @@ func (p *openChannelProcessor) monitorSingleEvent(ledgerContract chain.Contract,
startBlock := p.monitorService.GetCurrentBlockNumber()
endBlock := new(big.Int).Add(startBlock, big.NewInt(int64(config.OpenChannelTimeout)))
monitorCfg := &monitor.Config{
EventName: event.OpenChannel,
Contract: ledgerContract,
StartBlock: startBlock,
EndBlock: endBlock,
Reset: reset,
EventName: event.OpenChannel,
Contract: ledgerContract,
StartBlock: startBlock,
EndBlock: endBlock,
Reset: reset,
CheckInterval: p.nodeConfig.GetCheckInterval(event.OpenChannel),
}
_, err := p.monitorService.Monitor(monitorCfg,
func(id monitor.CallbackID, eLog types.Log) {
Expand Down
8 changes: 8 additions & 0 deletions common/cobj/celer_global_node_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type CelerGlobalNodeConfig struct {
routerRegistryContract chain.Contract
ledgers map[ctype.Addr]chain.Contract
chanDAL chanLedgerDAL
checkInterval map[string]uint64 // copy from CProfile
}

func NewCelerGlobalNodeConfig(
Expand Down Expand Up @@ -77,6 +78,7 @@ func NewCelerGlobalNodeConfig(
routerRegistryContract: routerRegistryContract,
ledgers: ledgers,
chanDAL: chanDAL,
checkInterval: profile.CheckInterval,
}
return gnc
}
Expand Down Expand Up @@ -144,3 +146,9 @@ func (config *CelerGlobalNodeConfig) GetLedgerContractOf(cid ctype.CidType) chai
}
return config.ledgers[ledgerAddr]
}

// GetCheckInterval returns interval if set in profile or 0
// monitor will treat 0 as check log every blockIntervalSec
func (cfg *CelerGlobalNodeConfig) GetCheckInterval(eventName string) uint64 {
return cfg.checkInterval[eventName]
}
6 changes: 6 additions & 0 deletions common/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ type ProfileEthereum struct {
Gateway string
ChainId, BlockIntervalSec, BlockDelayNum, DisputeTimeout uint64
Contracts ProfileContracts
// CheckInterval is map of eventname to its check interval for monitor service
// if not set (ie. 0) will check every blockIntervalSec (ie. same as check new block head)
// if specify, key must be one of event.go const string values
// monitor will check every checkInterval * blockIntervalSec
CheckInterval map[string]uint64
}

type ProfileContracts struct {
Expand Down Expand Up @@ -54,6 +59,7 @@ func (pj *ProfileJSON) ToCProfile() *CProfile {
SvrETHAddr: pj.Osp.Address,
SvrRPC: pj.Osp.Host,
ExplorerUrl: pj.Osp.ExplorerUrl,
CheckInterval: pj.Ethereum.CheckInterval, // json.Unmarshal guarantee non-nil map (could be empty)
}
return cp
}
Expand Down
1 change: 1 addition & 0 deletions common/profile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func TestProfileJSON(t *testing.T) {
chkEq(pj.Ethereum.ChainId, uint64(3), t)
chkEq(pj.Ethereum.Contracts.Ledger, "abcdef..", t)
chkEq(pj.Osp.Address, "c5b5..", t)
chkEq(pj.Ethereum.CheckInterval["IntendWithdraw"], uint64(5), t)

cp := pj.ToCProfile()
chkEq(cp.ChainId, int64(3), t)
Expand Down
3 changes: 3 additions & 0 deletions common/profile_test.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
"resolver": "123456..",
"ethPool": "11111..",
"payRegistry": "fedcba.."
},
"checkInterval": {
"IntendWithdraw": 5
}
},
"osp": {
Expand Down
2 changes: 2 additions & 0 deletions common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type CProfile struct {
DisputeTimeout uint64 `json:"disputeTimeout"`
Ledgers map[string]string `json:"ledgers"`
ExplorerUrl string `json:"explorerUrl,omitempty"`
CheckInterval map[string]uint64 `json:"checkInterval,omitempty"`
}

type GlobalNodeConfig interface {
Expand All @@ -59,6 +60,7 @@ type GlobalNodeConfig interface {
GetPayResolverContract() chain.Contract
GetPayRegistryContract() chain.Contract
GetRouterRegistryContract() chain.Contract
GetCheckInterval(string) uint64
}

type StreamWriter interface {
Expand Down
11 changes: 11 additions & 0 deletions deploy/mainnet/profile.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,17 @@
"payRegistry": "791bedaa0dd173142311005bb65b58c284cc948c",
"payResolver": "273456f8fe06f9d58f2480b7aeaa710a4a6abfc1",
"routerRegistry": "2f11656af5d1e9be634a8d00417cc05ebb43fc08"
},
"checkInterval": {
"OpenChannel": 2,
"Deposit": 2,
"CooperativeWithdraw": 3,
"RouterUpdated": 3,
"IntendSettle": 500,
"ConfirmSettle": 500,
"IntendWithdraw": 500,
"ConfirmWithdraw": 500,
"MigrateChannelTo": 500
}
},
"osp": {
Expand Down
11 changes: 11 additions & 0 deletions deploy/ropsten/profile.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,17 @@
"payRegistry": "a12063ab2136abbab09e036a260694fbd5e57982",
"payResolver": "605703f2c26aa67c4e63a27e5ace6ad2862bb53a",
"routerRegistry": "ee56cb265b17969aa7aba5db77a0982473b5fd29"
},
"checkInterval": {
"OpenChannel": 3,
"Deposit": 3,
"CooperativeWithdraw": 3,
"RouterUpdated": 3,
"IntendSettle": 500,
"ConfirmSettle": 500,
"IntendWithdraw": 500,
"ConfirmWithdraw": 500,
"MigrateChannelTo": 500
}
},
"osp": {
Expand Down
14 changes: 8 additions & 6 deletions dispute/dispute_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,10 @@ func (p *Processor) handleIntendSettleEventTx(tx *storage.DALTx, args ...interfa

func (p *Processor) monitorPaymentChannelSettleEvent(ledgerContract chain.Contract) {
monitorCfg := &monitor.Config{
EventName: event.IntendSettle,
Contract: ledgerContract,
StartBlock: p.monitorService.GetCurrentBlockNumber(),
EventName: event.IntendSettle,
Contract: ledgerContract,
StartBlock: p.monitorService.GetCurrentBlockNumber(),
CheckInterval: p.nodeConfig.GetCheckInterval(event.IntendSettle),
}
_, monErr := p.monitorService.Monitor(monitorCfg,
func(id monitor.CallbackID, eLog types.Log) {
Expand Down Expand Up @@ -336,9 +337,10 @@ func (p *Processor) monitorPaymentChannelSettleEvent(ledgerContract chain.Contra
log.Error(monErr)
}
monitorCfg2 := &monitor.Config{
EventName: event.ConfirmSettle,
Contract: ledgerContract,
StartBlock: p.monitorService.GetCurrentBlockNumber(),
EventName: event.ConfirmSettle,
Contract: ledgerContract,
StartBlock: p.monitorService.GetCurrentBlockNumber(),
CheckInterval: p.nodeConfig.GetCheckInterval(event.ConfirmSettle),
}
_, monErr = p.monitorService.Monitor(monitorCfg2,
func(id monitor.CallbackID, eLog types.Log) {
Expand Down
14 changes: 8 additions & 6 deletions dispute/dispute_withdraw.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,10 @@ func (p *Processor) VetoWithdraw(cid ctype.CidType) error {

func (p *Processor) monitorNoncooperativeWithdrawEvent(ledgerContract chain.Contract) {
monitorCfg := &monitor.Config{
EventName: event.IntendWithdraw,
Contract: ledgerContract,
StartBlock: p.monitorService.GetCurrentBlockNumber(),
EventName: event.IntendWithdraw,
Contract: ledgerContract,
StartBlock: p.monitorService.GetCurrentBlockNumber(),
CheckInterval: p.nodeConfig.GetCheckInterval(event.IntendWithdraw),
}
_, monErr := p.monitorService.Monitor(monitorCfg,
func(id monitor.CallbackID, eLog types.Log) {
Expand Down Expand Up @@ -195,9 +196,10 @@ func (p *Processor) monitorNoncooperativeWithdrawEvent(ledgerContract chain.Cont
log.Error(monErr)
}
monitorCfg2 := &monitor.Config{
EventName: event.ConfirmWithdraw,
Contract: ledgerContract,
StartBlock: p.monitorService.GetCurrentBlockNumber(),
EventName: event.ConfirmWithdraw,
Contract: ledgerContract,
StartBlock: p.monitorService.GetCurrentBlockNumber(),
CheckInterval: p.nodeConfig.GetCheckInterval(event.ConfirmWithdraw),
}
_, monErr = p.monitorService.Monitor(monitorCfg2,
func(id monitor.CallbackID, eLog types.Log) {
Expand Down
7 changes: 4 additions & 3 deletions migrate/migrate_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,9 +309,10 @@ func (p *MigrateChannelProcessor) monitorOnDeprecatedLedgers() {
// monitorMigrateChannelEvent monitors onchain event emitted from CelerLedger
func (p *MigrateChannelProcessor) monitorMigrateChannelEvent(contract chain.Contract) {
monitorCfg := &monitor.Config{
EventName: event.MigrateChannelTo,
Contract: contract,
StartBlock: p.monitorService.GetCurrentBlockNumber(),
EventName: event.MigrateChannelTo,
Contract: contract,
StartBlock: p.monitorService.GetCurrentBlockNumber(),
CheckInterval: p.nodeConfig.GetCheckInterval(event.MigrateChannelTo),
}
_, err := p.monitorService.Monitor(monitorCfg,
func(id monitor.CallbackID, eLog types.Log) {
Expand Down
2 changes: 2 additions & 0 deletions monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ func (s *Service) Monitor(cfg *Config, callback func(CallbackID, ethtypes.Log))
if eventToListen.CheckInterval == 0 {
eventToListen.CheckInterval = defaultCheckInterval
}
log.Infof("Starting watch: %s. startBlk: %s, endBlk: %s, blkDelay: %d, checkInterval: %d, reset: %t", watchName,
cfg.StartBlock, cfg.EndBlock, eventToListen.BlockDelay, eventToListen.CheckInterval, cfg.Reset)
id, err := s.MonitorEvent(*eventToListen, cfg.Reset)
if err != nil {
log.Errorf("Cannot register event %s: %s", watchName, err)
Expand Down
9 changes: 5 additions & 4 deletions route/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,11 @@ func (c *Controller) Start() {
// backtrack from one interval before the current block
func (c *Controller) monitorRouterUpdatedEvent() {
monitorCfg := &monitor.Config{
EventName: event.RouterUpdated,
Contract: c.nodeConfig.GetRouterRegistryContract(),
StartBlock: c.calculateStartBlockNumber(),
Reset: true,
EventName: event.RouterUpdated,
Contract: c.nodeConfig.GetRouterRegistryContract(),
StartBlock: c.calculateStartBlockNumber(),
Reset: true,
CheckInterval: c.nodeConfig.GetCheckInterval(event.RouterUpdated),
}
_, err := c.monitorService.Monitor(monitorCfg,
func(id monitor.CallbackID, eLog types.Log) {
Expand Down
12 changes: 12 additions & 0 deletions test/e2e/setup_onchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,18 @@ func SetupOnChain(appMap map[string]ctype.Addr, autofund bool) (*common.ProfileJ
BlockDelayNum: 0,
DisputeTimeout: 10,
Contracts: profileContracts,
CheckInterval: map[string]uint64{
"CooperativeWithdraw": 2,
"Deploy": 2,
"Deposit": 2,
"IntendSettle": 2,
"OpenChannel": 2,
"ConfirmSettle": 2,
"IntendWithdraw": 2,
"ConfirmWithdraw": 2,
"RouterUpdated": 2,
"MigrateChannelTo": 2,
},
}

profileOsp := common.ProfileOsp{
Expand Down
12 changes: 8 additions & 4 deletions watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func (w *Watch) fetchLogEvents() {
// Fetch server-side filtered log events in the target range of
// block numbers. The block delay limit is used to avoid fetching
// recently mined blocks that may still be undone by a chain reorg.
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // https://infura.io/docs/ethereum/json-rpc/eth-getLogs up to 10s
defer cancel()

toBlock := blkNum - w.blkDelay
Expand All @@ -377,7 +377,7 @@ func (w *Watch) fetchLogEvents() {

logs, err := w.service.client.FilterLogs(ctx, w.query)
if err != nil {
log.Tracef("cannot fetch logs: %s: [%d-%d]: %s", w.name, w.fromBlock, toBlock, err)
log.Warnf("cannot fetch logs: %s: [%d-%d]: %s", w.name, w.fromBlock, toBlock, err)
return
}

Expand Down Expand Up @@ -409,9 +409,13 @@ func (w *Watch) fetchLogEvents() {
// Update the next block number to start fetching.
if maxBlock >= w.fromBlock {
w.fromBlock = maxBlock + 1
log.Tracef("added %d logs to queue: %s: next from %d", count, w.name, w.fromBlock)
} else {
// we didn't find any event between fromBlock and toBlock, so we can fast forward
// maybe we should also do this above instead of maxBlock + 1?
w.fromBlock = toBlock
log.Tracef("fast forward %s fromBlock to %d", w.name, w.fromBlock)
}

log.Tracef("added %d logs to queue: %s: next from %d", count, w.name, w.fromBlock)
}

// Return true if the log event ID is strictly greater than the last ID.
Expand Down
23 changes: 18 additions & 5 deletions watcher/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type fakeClient struct {
quit chan bool
blkChan chan int64
blkNum int64
noLog bool // won't return any fakeLog
}

// NewFakeClient creates a fake watch client that return increasing
Expand Down Expand Up @@ -147,12 +148,13 @@ func (fc *fakeClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([
return nil, fmt.Errorf("FromBlock %d > ToBlock %d", from, to)
}

start := int(from) * perBlock
end := int(to+1) * perBlock
for i := start; i < end; i++ {
logs = append(logs, fakeLog(i))
if !fc.noLog {
start := int(from) * perBlock
end := int(to+1) * perBlock
for i := start; i < end; i++ {
logs = append(logs, fakeLog(i))
}
}

return logs, nil
}

Expand Down Expand Up @@ -225,6 +227,17 @@ func TestWatcher(t *testing.T) {
// before exiting the test to increase code coverage of the
// watcher shutdown code (goroutines exiting).
w.Close()
// test noLog case
client.noLog = true
w2, err := ws.NewWatch("foo", query, 2, 1, true) // reset fromBlock to 0
if w2.fromBlock != 0 {
t.Error("fromBlock isn't 0")
}
time.Sleep(2 * blkSleep) // so fetchLogEvents has run
if w2.fromBlock == 0 {
t.Error("fromBlock is still 0")
}
w2.Close()
time.Sleep(100 * time.Millisecond)
}

Expand Down

0 comments on commit 0955b1e

Please sign in to comment.