Skip to content

Commit

Permalink
Merge pull request #833 from forta-network/caner/send-metrics-in-slow…
Browse files Browse the repository at this point in the history
…-mode

Send pending metrics in slow mode
  • Loading branch information
canercidam authored Nov 21, 2023
2 parents 458e119 + 0ec2bab commit 83ec187
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 5 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ replace github.com/docker/docker => github.com/moby/moby v20.10.25+incompatible
require (
github.com/docker/docker v1.6.2
github.com/docker/go-connections v0.4.0
github.com/forta-network/forta-core-go v0.0.0-20231106113111-7ec637713f66
github.com/forta-network/forta-core-go v0.0.0-20231120162934-84a1ca7b3529
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/client_model v0.3.0
github.com/prometheus/common v0.39.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,8 @@ github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:1i71OnUq3iUe
github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ=
github.com/flynn/noise v1.0.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/forta-network/forta-core-go v0.0.0-20231106113111-7ec637713f66 h1:RuwgO+n+cDJTELaSkA2fZNUCeS1GQO+DbM6UsAb37Kw=
github.com/forta-network/forta-core-go v0.0.0-20231106113111-7ec637713f66/go.mod h1:F4O7Yxhs0WSB9iFLiCVSYMPI1wOKuLMZIk8dbVg+J/Y=
github.com/forta-network/forta-core-go v0.0.0-20231120162934-84a1ca7b3529 h1:K8sVPZZVoOOHfyKY7meMhVNzCZr2iwgfp3gPpm4m5KA=
github.com/forta-network/forta-core-go v0.0.0-20231120162934-84a1ca7b3529/go.mod h1:F4O7Yxhs0WSB9iFLiCVSYMPI1wOKuLMZIk8dbVg+J/Y=
github.com/forta-network/go-multicall v0.0.0-20230609185354-1436386c6707 h1:f6I7K43i2m6AwHSsDxh0Mf3qFzYt8BKnabSl/zGFmh0=
github.com/forta-network/go-multicall v0.0.0-20230609185354-1436386c6707/go.mod h1:nqTUF1REklpWLZ/M5HfzqhSHNz4dPVKzJvbLziqTZpw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
Expand Down
10 changes: 10 additions & 0 deletions services/publisher/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ func NewMetricsAggregator(bucketInterval time.Duration) *AgentMetricsAggregator
}
}

// HasPendingBotMetrics tells if there are bot metrics waiting to be flushed.
func (ama *AgentMetricsAggregator) HasPendingBotMetrics() bool {
for _, bucket := range ama.buckets {
if bucket.AgentId != "system" { // rest of the metrics are bot metrics
return true
}
}
return false
}

func (ama *AgentMetricsAggregator) findBucket(agentID string, t time.Time) *metricsBucket {
bucketTime := ama.FindClosestBucketTime(t)
for _, bucket := range ama.buckets {
Expand Down
13 changes: 11 additions & 2 deletions services/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,8 @@ func (pub *Publisher) shouldSkipPublishing(batch *protocol.AlertBatch) (string,
runsBots := pub.hasBots()

switch {
// exceptional case timing:

case localModeConfig.Enable && localModeConfig.IncludeMetrics:
if len(batch.Metrics) > 0 {
return "", false
Expand All @@ -426,15 +428,22 @@ func (pub *Publisher) shouldSkipPublishing(batch *protocol.AlertBatch) (string,
case pub.skipEmpty:
return becauseThereAreNoAlerts + " and skipEmpty is enabled", true

case runsBots && len(batch.Metrics) > 0:
return "", false // do not sacrifice metrics
// public node timing:

case len(batch.Metrics) > 0:
// do not sacrifice metrics if they are already flushed
return "", false

case runsBots && len(batch.Metrics) == 0:
if time.Since(lastBatchSendAttempt) >= fastReportInterval {
return "", false
}
return becauseThereAreNoAlerts + " and metrics and fast report deadline has not exceeded yet", true

case !runsBots && pub.metricsAggregator.HasPendingBotMetrics():
// if the node is not running bots, send pending bot metrics immediately
return "", false

case !runsBots:
if time.Since(lastBatchSendAttempt) >= slowReportInterval {
return "", false
Expand Down
19 changes: 19 additions & 0 deletions services/publisher/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,23 @@ func TestShouldSkipPublishing(t *testing.T) {
},
expectedSkipValue: false,
},
{
name: "has pending metrics and not running bots",
publisher: &Publisher{
lastBatchSendAttempt: veryRecently,
metricsAggregator: &AgentMetricsAggregator{
buckets: []*metricsBucket{
{
AgentMetrics: protocol.AgentMetrics{
AgentId: "0x123",
},
},
},
},
},
batch: &protocol.AlertBatch{},
expectedSkipValue: false,
},
{
name: "no metrics, running bots, too early",
publisher: &Publisher{
Expand All @@ -160,6 +177,7 @@ func TestShouldSkipPublishing(t *testing.T) {
name: "no bots, too early",
publisher: &Publisher{
lastBatchSendAttempt: veryRecently,
metricsAggregator: &AgentMetricsAggregator{},
},
batch: &protocol.AlertBatch{},
expectedSkipValue: true,
Expand All @@ -169,6 +187,7 @@ func TestShouldSkipPublishing(t *testing.T) {
name: "no bots, not early",
publisher: &Publisher{
lastBatchSendAttempt: time.Now().Add(-slowReportInterval), // not recently
metricsAggregator: &AgentMetricsAggregator{},
},
batch: &protocol.AlertBatch{},
expectedSkipValue: false,
Expand Down

0 comments on commit 83ec187

Please sign in to comment.