Skip to content

Commit

Permalink
[ADDED] Allow aggregating JS metrics and service observations from mu…
Browse files Browse the repository at this point in the history
…ltiple accounts in one config (#147)

* [ADDED] Ability to aggregate jetstream advisories from multiple accounts in a single account

Signed-off-by: Piotr Piotrowski <[email protected]>

* [ADDED] Ability to aggregate service observations from multiple accounts in a single account

Signed-off-by: Piotr Piotrowski <[email protected]>

* Add mapping of advisories subjects, improve validation

Signed-off-by: Piotr Piotrowski <[email protected]>

* Revert ability to set aggregate account from config files

Signed-off-by: Piotr Piotrowski <[email protected]>

---------

Signed-off-by: Piotr Piotrowski <[email protected]>
  • Loading branch information
piotrpio authored Aug 22, 2023
1 parent cd3629a commit 1fa100f
Show file tree
Hide file tree
Showing 7 changed files with 768 additions and 50 deletions.
118 changes: 96 additions & 22 deletions surveyor/jetstream_advisories.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,10 @@ type JSAdvisoryConfig struct {
// account name
AccountName string `json:"name"`

// optional configuration for importing JS metrics and advisories from other accounts
// it can only be set via JSAdvisoryConfig directly (not from config file)
ExternalAccountConfig *JSAdvisoriesExternalAccountConfig `json:"-"`

// connection options
JWT string `json:"jwt"`
Seed string `json:"seed"`
Expand All @@ -252,6 +256,24 @@ type JSAdvisoryConfig struct {
TLSConfig *tls.Config `json:"-"`
}

// JSAdvisoriesExternalAccountConfig is used to configure external accounts from which
// JS metrics and advisories will be imported
type JSAdvisoriesExternalAccountConfig struct {
// subject on which JS metrics from external accounts will be received.
// if not set, metrics will be gathered from account set in AccountName.
MetricsSubject string `json:"metrics_subject"`

// position of account token in ExternalMetricsSubject
MetricsAccountTokenPosition int `json:"metrics_account_token_position"`

// subject on which JS advisories from external accounts will be received.
// if not set, advisories will be gathered from account set in AccountName.
AdvisorySubject string `json:"advisory_subject"`

// position of account token in ExternalAdvisorySubject
AdvisoryAccountTokenPosition int `json:"advisory_account_token_position"`
}

// Validate is used to validate a JSAdvisoryConfig
func (o *JSAdvisoryConfig) Validate() error {
if o == nil {
Expand All @@ -267,6 +289,33 @@ func (o *JSAdvisoryConfig) Validate() error {
errs = append(errs, "name is required")
}

if o.ExternalAccountConfig != nil {
if o.ExternalAccountConfig.MetricsSubject == "" {
errs = append(errs, "external_account_config.metrics_subject is required when importing metrics from external accounts")
}
metricsTokens := strings.Split(o.ExternalAccountConfig.MetricsSubject, ".")
switch {
case o.ExternalAccountConfig.MetricsAccountTokenPosition <= 0:
errs = append(errs, "external_account_config.metrics_account_token_position is required when importing metrics from external accounts")
case o.ExternalAccountConfig.MetricsAccountTokenPosition > len(metricsTokens):
errs = append(errs, "external_account_config.metrics_account_token_position is greater than the number of tokens in external_account_config.metrics_subject")
case metricsTokens[o.ExternalAccountConfig.AdvisoryAccountTokenPosition-1] != "*":
errs = append(errs, "external_account_config.metrics_subject must have a wildcard token at the position specified by external_account_config.metrics_account_token_position")
}

if o.ExternalAccountConfig.AdvisorySubject == "" {
errs = append(errs, "external_account_config.advisory_subject is required when importing advisories from external accounts")
}
advisoryTokens := strings.Split(o.ExternalAccountConfig.AdvisorySubject, ".")
switch {
case o.ExternalAccountConfig.AdvisoryAccountTokenPosition <= 0:
errs = append(errs, "external_account_config.advisory_account_token_position is required when importing advisories from external accounts")
case o.ExternalAccountConfig.AdvisoryAccountTokenPosition > len(advisoryTokens):
errs = append(errs, "external_account_config.advisory_account_token_position is greater than the number of tokens in external_account_config.advisory_subject")
case advisoryTokens[o.ExternalAccountConfig.AdvisoryAccountTokenPosition-1] != "*":
errs = append(errs, "external_account_config.advisory_subject must have a wildcard token at the position specified by external_account_config.advisory_account_token_position")
}
}
if len(errs) == 0 {
return nil
}
Expand Down Expand Up @@ -362,14 +411,20 @@ func (o *jsAdvisoryListener) Start() error {
if err != nil {
return fmt.Errorf("nats connection failed for id: %s, account name: %s, error: %v", o.config.ID, o.config.AccountName, err)
}
metricsSubject := api.JSMetricPrefix + ".>"
advisorySubject := api.JSAdvisoryPrefix + ".>"
if o.config.ExternalAccountConfig != nil {
metricsSubject = o.config.ExternalAccountConfig.MetricsSubject
advisorySubject = o.config.ExternalAccountConfig.AdvisorySubject
}

subAdvisory, err := pc.nc.Subscribe(api.JSAdvisoryPrefix+".>", o.advisoryHandler)
subAdvisory, err := pc.nc.Subscribe(metricsSubject, o.advisoryHandler)
if err != nil {
pc.ReturnToPool()
return fmt.Errorf("could not subscribe to JetStream advisory for id: %s, account name: %s, topic: %s, error: %v", o.config.ID, o.config.AccountName, api.JSAdvisoryPrefix, err)
}

subMetric, err := pc.nc.Subscribe(api.JSMetricPrefix+".>", o.advisoryHandler)
subMetric, err := pc.nc.Subscribe(advisorySubject, o.advisoryHandler)
if err != nil {
_ = subAdvisory.Unsubscribe()
pc.ReturnToPool()
Expand Down Expand Up @@ -421,65 +476,84 @@ func limitJSSubject(subj string) string {
}

func (o *jsAdvisoryListener) advisoryHandler(m *nats.Msg) {
accountName := o.config.AccountName
var err error
if o.config.ExternalAccountConfig != nil {
var tokenPosition int
if m.Sub.Subject == o.config.ExternalAccountConfig.MetricsSubject {
tokenPosition = o.config.ExternalAccountConfig.MetricsAccountTokenPosition
} else if m.Sub.Subject == o.config.ExternalAccountConfig.AdvisorySubject {
tokenPosition = o.config.ExternalAccountConfig.AdvisoryAccountTokenPosition
}
if tokenPosition == 0 {
o.logger.Warnf("Could not parse JetStream API Advisory: no configured subject matches subscription subject")
return
}
accountName, err = getTokenFromSubject(m.Subject, tokenPosition)
if err != nil {
o.logger.Warnf("Could not parse JetStream API Advisory: %s", err)
return
}
}
schema, event, err := jsm.ParseEvent(m.Data)
if err != nil {
o.metrics.jsAdvisoryParseErrorCtr.WithLabelValues(o.config.AccountName).Inc()
o.metrics.jsAdvisoryParseErrorCtr.WithLabelValues(accountName).Inc()
o.logger.Warnf("Could not parse JetStream API Audit Advisory: %s", err)
return
}

o.metrics.jsTotalAdvisoryCtr.WithLabelValues(o.config.AccountName).Inc()
o.metrics.jsTotalAdvisoryCtr.WithLabelValues(accountName).Inc()

switch event := event.(type) {
case *advisory.JetStreamAPIAuditV1:
o.metrics.jsAPIAuditCtr.WithLabelValues(limitJSSubject(event.Subject), o.config.AccountName).Inc()
o.metrics.jsAPIAuditCtr.WithLabelValues(limitJSSubject(event.Subject), accountName).Inc()

case *advisory.ConsumerDeliveryExceededAdvisoryV1:
o.metrics.jsDeliveryExceededCtr.WithLabelValues(o.config.AccountName, event.Stream, event.Consumer).Add(float64(event.Deliveries))
o.metrics.jsDeliveryExceededCtr.WithLabelValues(accountName, event.Stream, event.Consumer).Add(float64(event.Deliveries))

case *metric.ConsumerAckMetricV1:
o.metrics.jsAckMetricDelay.WithLabelValues(o.config.AccountName, event.Stream, event.Consumer).Observe(time.Duration(event.Delay).Seconds())
o.metrics.jsAckMetricDeliveries.WithLabelValues(o.config.AccountName, event.Stream, event.Consumer).Add(float64(event.Deliveries))
o.metrics.jsAckMetricDelay.WithLabelValues(accountName, event.Stream, event.Consumer).Observe(time.Duration(event.Delay).Seconds())
o.metrics.jsAckMetricDeliveries.WithLabelValues(accountName, event.Stream, event.Consumer).Add(float64(event.Deliveries))

case *advisory.JSConsumerActionAdvisoryV1:
o.metrics.jsConsumerActionCtr.WithLabelValues(o.config.AccountName, event.Stream, event.Action.String()).Inc()
o.metrics.jsConsumerActionCtr.WithLabelValues(accountName, event.Stream, event.Action.String()).Inc()

case *advisory.JSStreamActionAdvisoryV1:
o.metrics.jsStreamActionCtr.WithLabelValues(o.config.AccountName, event.Stream, event.Action.String()).Inc()
o.metrics.jsStreamActionCtr.WithLabelValues(accountName, event.Stream, event.Action.String()).Inc()

case *advisory.JSConsumerDeliveryTerminatedAdvisoryV1:
o.metrics.jsDeliveryTerminatedCtr.WithLabelValues(o.config.AccountName, event.Stream, event.Consumer).Inc()
o.metrics.jsDeliveryTerminatedCtr.WithLabelValues(accountName, event.Stream, event.Consumer).Inc()

case *advisory.JSRestoreCreateAdvisoryV1:
o.metrics.jsRestoreCreatedCtr.WithLabelValues(o.config.AccountName, event.Stream).Inc()
o.metrics.jsRestoreCreatedCtr.WithLabelValues(accountName, event.Stream).Inc()

case *advisory.JSRestoreCompleteAdvisoryV1:
o.metrics.jsRestoreSizeCtr.WithLabelValues(o.config.AccountName, event.Stream).Add(float64(event.Bytes))
o.metrics.jsRestoreDuration.WithLabelValues(o.config.AccountName, event.Stream).Observe(event.End.Sub(event.Start).Seconds())
o.metrics.jsRestoreSizeCtr.WithLabelValues(accountName, event.Stream).Add(float64(event.Bytes))
o.metrics.jsRestoreDuration.WithLabelValues(accountName, event.Stream).Observe(event.End.Sub(event.Start).Seconds())

case *advisory.JSSnapshotCreateAdvisoryV1:
o.metrics.jsSnapshotSizeCtr.WithLabelValues(o.config.AccountName, event.Stream).Add(float64(event.BlkSize * event.NumBlks))
o.metrics.jsSnapshotSizeCtr.WithLabelValues(accountName, event.Stream).Add(float64(event.BlkSize * event.NumBlks))

case *advisory.JSSnapshotCompleteAdvisoryV1:
o.metrics.jsSnapthotDuration.WithLabelValues(o.config.AccountName, event.Stream).Observe(event.End.Sub(event.Start).Seconds())
o.metrics.jsSnapthotDuration.WithLabelValues(accountName, event.Stream).Observe(event.End.Sub(event.Start).Seconds())

case *advisory.JSConsumerLeaderElectedV1:
o.metrics.jsConsumerLeaderElected.WithLabelValues(o.config.AccountName, event.Stream).Inc()
o.metrics.jsConsumerLeaderElected.WithLabelValues(accountName, event.Stream).Inc()

case *advisory.JSConsumerQuorumLostV1:
o.metrics.jsConsumerQuorumLost.WithLabelValues(o.config.AccountName, event.Stream).Inc()
o.metrics.jsConsumerQuorumLost.WithLabelValues(accountName, event.Stream).Inc()

case *advisory.JSStreamLeaderElectedV1:
o.metrics.jsStreamLeaderElected.WithLabelValues(o.config.AccountName, event.Stream).Inc()
o.metrics.jsStreamLeaderElected.WithLabelValues(accountName, event.Stream).Inc()

case *advisory.JSStreamQuorumLostV1:
o.metrics.jsStreamQuorumLost.WithLabelValues(o.config.AccountName, event.Stream).Inc()
o.metrics.jsStreamQuorumLost.WithLabelValues(accountName, event.Stream).Inc()

case *advisory.JSConsumerDeliveryNakAdvisoryV1:
o.metrics.jsConsumerDeliveryNAK.WithLabelValues(o.config.AccountName, event.Stream, event.Consumer).Inc()
o.metrics.jsConsumerDeliveryNAK.WithLabelValues(accountName, event.Stream, event.Consumer).Inc()

default:
o.metrics.jsUnknownAdvisoryCtr.WithLabelValues(schema, o.config.AccountName).Inc()
o.metrics.jsUnknownAdvisoryCtr.WithLabelValues(schema, accountName).Inc()
o.logger.Warnf("Could not handle event as an JetStream Advisory with schema %s", schema)
}
}
Expand Down
Loading

0 comments on commit 1fa100f

Please sign in to comment.