diff --git a/surveyor/jetstream_advisories.go b/surveyor/jetstream_advisories.go index a6ac4d3..cebd76a 100644 --- a/surveyor/jetstream_advisories.go +++ b/surveyor/jetstream_advisories.go @@ -235,6 +235,10 @@ type JSAdvisoryConfig struct { // account name AccountName string `json:"name"` + // optional configuration for importing JS metrics and advisories from other accounts + // it can only be set via JSAdvisoryConfig directly (not from config file) + ExternalAccountConfig *JSAdvisoriesExternalAccountConfig `json:"-"` + // connection options JWT string `json:"jwt"` Seed string `json:"seed"` @@ -252,6 +256,24 @@ type JSAdvisoryConfig struct { TLSConfig *tls.Config `json:"-"` } +// JSAdvisoriesExternalAccountConfig is used to configure external accounts from which +// JS metrics and advisories will be imported +type JSAdvisoriesExternalAccountConfig struct { + // subject on which JS metrics from external accounts will be received. + // if not set, metrics will be gathered from account set in AccountName. + MetricsSubject string `json:"metrics_subject"` + + // position of account token in ExternalMetricsSubject + MetricsAccountTokenPosition int `json:"metrics_account_token_position"` + + // subject on which JS advisories from external accounts will be received. + // if not set, advisories will be gathered from account set in AccountName. + AdvisorySubject string `json:"advisory_subject"` + + // position of account token in ExternalAdvisorySubject + AdvisoryAccountTokenPosition int `json:"advisory_account_token_position"` +} + // Validate is used to validate a JSAdvisoryConfig func (o *JSAdvisoryConfig) Validate() error { if o == nil { @@ -267,6 +289,33 @@ func (o *JSAdvisoryConfig) Validate() error { errs = append(errs, "name is required") } + if o.ExternalAccountConfig != nil { + if o.ExternalAccountConfig.MetricsSubject == "" { + errs = append(errs, "external_account_config.metrics_subject is required when importing metrics from external accounts") + } + metricsTokens := strings.Split(o.ExternalAccountConfig.MetricsSubject, ".") + switch { + case o.ExternalAccountConfig.MetricsAccountTokenPosition <= 0: + errs = append(errs, "external_account_config.metrics_account_token_position is required when importing metrics from external accounts") + case o.ExternalAccountConfig.MetricsAccountTokenPosition > len(metricsTokens): + errs = append(errs, "external_account_config.metrics_account_token_position is greater than the number of tokens in external_account_config.metrics_subject") + case metricsTokens[o.ExternalAccountConfig.AdvisoryAccountTokenPosition-1] != "*": + errs = append(errs, "external_account_config.metrics_subject must have a wildcard token at the position specified by external_account_config.metrics_account_token_position") + } + + if o.ExternalAccountConfig.AdvisorySubject == "" { + errs = append(errs, "external_account_config.advisory_subject is required when importing advisories from external accounts") + } + advisoryTokens := strings.Split(o.ExternalAccountConfig.AdvisorySubject, ".") + switch { + case o.ExternalAccountConfig.AdvisoryAccountTokenPosition <= 0: + errs = append(errs, "external_account_config.advisory_account_token_position is required when importing advisories from external accounts") + case o.ExternalAccountConfig.AdvisoryAccountTokenPosition > len(advisoryTokens): + errs = append(errs, "external_account_config.advisory_account_token_position is greater than the number of tokens in external_account_config.advisory_subject") + case advisoryTokens[o.ExternalAccountConfig.AdvisoryAccountTokenPosition-1] != "*": + errs = append(errs, "external_account_config.advisory_subject must have a wildcard token at the position specified by external_account_config.advisory_account_token_position") + } + } if len(errs) == 0 { return nil } @@ -362,14 +411,20 @@ func (o *jsAdvisoryListener) Start() error { if err != nil { return fmt.Errorf("nats connection failed for id: %s, account name: %s, error: %v", o.config.ID, o.config.AccountName, err) } + metricsSubject := api.JSMetricPrefix + ".>" + advisorySubject := api.JSAdvisoryPrefix + ".>" + if o.config.ExternalAccountConfig != nil { + metricsSubject = o.config.ExternalAccountConfig.MetricsSubject + advisorySubject = o.config.ExternalAccountConfig.AdvisorySubject + } - subAdvisory, err := pc.nc.Subscribe(api.JSAdvisoryPrefix+".>", o.advisoryHandler) + subAdvisory, err := pc.nc.Subscribe(metricsSubject, o.advisoryHandler) if err != nil { pc.ReturnToPool() return fmt.Errorf("could not subscribe to JetStream advisory for id: %s, account name: %s, topic: %s, error: %v", o.config.ID, o.config.AccountName, api.JSAdvisoryPrefix, err) } - subMetric, err := pc.nc.Subscribe(api.JSMetricPrefix+".>", o.advisoryHandler) + subMetric, err := pc.nc.Subscribe(advisorySubject, o.advisoryHandler) if err != nil { _ = subAdvisory.Unsubscribe() pc.ReturnToPool() @@ -421,65 +476,84 @@ func limitJSSubject(subj string) string { } func (o *jsAdvisoryListener) advisoryHandler(m *nats.Msg) { + accountName := o.config.AccountName + var err error + if o.config.ExternalAccountConfig != nil { + var tokenPosition int + if m.Sub.Subject == o.config.ExternalAccountConfig.MetricsSubject { + tokenPosition = o.config.ExternalAccountConfig.MetricsAccountTokenPosition + } else if m.Sub.Subject == o.config.ExternalAccountConfig.AdvisorySubject { + tokenPosition = o.config.ExternalAccountConfig.AdvisoryAccountTokenPosition + } + if tokenPosition == 0 { + o.logger.Warnf("Could not parse JetStream API Advisory: no configured subject matches subscription subject") + return + } + accountName, err = getTokenFromSubject(m.Subject, tokenPosition) + if err != nil { + o.logger.Warnf("Could not parse JetStream API Advisory: %s", err) + return + } + } schema, event, err := jsm.ParseEvent(m.Data) if err != nil { - o.metrics.jsAdvisoryParseErrorCtr.WithLabelValues(o.config.AccountName).Inc() + o.metrics.jsAdvisoryParseErrorCtr.WithLabelValues(accountName).Inc() o.logger.Warnf("Could not parse JetStream API Audit Advisory: %s", err) return } - o.metrics.jsTotalAdvisoryCtr.WithLabelValues(o.config.AccountName).Inc() + o.metrics.jsTotalAdvisoryCtr.WithLabelValues(accountName).Inc() switch event := event.(type) { case *advisory.JetStreamAPIAuditV1: - o.metrics.jsAPIAuditCtr.WithLabelValues(limitJSSubject(event.Subject), o.config.AccountName).Inc() + o.metrics.jsAPIAuditCtr.WithLabelValues(limitJSSubject(event.Subject), accountName).Inc() case *advisory.ConsumerDeliveryExceededAdvisoryV1: - o.metrics.jsDeliveryExceededCtr.WithLabelValues(o.config.AccountName, event.Stream, event.Consumer).Add(float64(event.Deliveries)) + o.metrics.jsDeliveryExceededCtr.WithLabelValues(accountName, event.Stream, event.Consumer).Add(float64(event.Deliveries)) case *metric.ConsumerAckMetricV1: - o.metrics.jsAckMetricDelay.WithLabelValues(o.config.AccountName, event.Stream, event.Consumer).Observe(time.Duration(event.Delay).Seconds()) - o.metrics.jsAckMetricDeliveries.WithLabelValues(o.config.AccountName, event.Stream, event.Consumer).Add(float64(event.Deliveries)) + o.metrics.jsAckMetricDelay.WithLabelValues(accountName, event.Stream, event.Consumer).Observe(time.Duration(event.Delay).Seconds()) + o.metrics.jsAckMetricDeliveries.WithLabelValues(accountName, event.Stream, event.Consumer).Add(float64(event.Deliveries)) case *advisory.JSConsumerActionAdvisoryV1: - o.metrics.jsConsumerActionCtr.WithLabelValues(o.config.AccountName, event.Stream, event.Action.String()).Inc() + o.metrics.jsConsumerActionCtr.WithLabelValues(accountName, event.Stream, event.Action.String()).Inc() case *advisory.JSStreamActionAdvisoryV1: - o.metrics.jsStreamActionCtr.WithLabelValues(o.config.AccountName, event.Stream, event.Action.String()).Inc() + o.metrics.jsStreamActionCtr.WithLabelValues(accountName, event.Stream, event.Action.String()).Inc() case *advisory.JSConsumerDeliveryTerminatedAdvisoryV1: - o.metrics.jsDeliveryTerminatedCtr.WithLabelValues(o.config.AccountName, event.Stream, event.Consumer).Inc() + o.metrics.jsDeliveryTerminatedCtr.WithLabelValues(accountName, event.Stream, event.Consumer).Inc() case *advisory.JSRestoreCreateAdvisoryV1: - o.metrics.jsRestoreCreatedCtr.WithLabelValues(o.config.AccountName, event.Stream).Inc() + o.metrics.jsRestoreCreatedCtr.WithLabelValues(accountName, event.Stream).Inc() case *advisory.JSRestoreCompleteAdvisoryV1: - o.metrics.jsRestoreSizeCtr.WithLabelValues(o.config.AccountName, event.Stream).Add(float64(event.Bytes)) - o.metrics.jsRestoreDuration.WithLabelValues(o.config.AccountName, event.Stream).Observe(event.End.Sub(event.Start).Seconds()) + o.metrics.jsRestoreSizeCtr.WithLabelValues(accountName, event.Stream).Add(float64(event.Bytes)) + o.metrics.jsRestoreDuration.WithLabelValues(accountName, event.Stream).Observe(event.End.Sub(event.Start).Seconds()) case *advisory.JSSnapshotCreateAdvisoryV1: - o.metrics.jsSnapshotSizeCtr.WithLabelValues(o.config.AccountName, event.Stream).Add(float64(event.BlkSize * event.NumBlks)) + o.metrics.jsSnapshotSizeCtr.WithLabelValues(accountName, event.Stream).Add(float64(event.BlkSize * event.NumBlks)) case *advisory.JSSnapshotCompleteAdvisoryV1: - o.metrics.jsSnapthotDuration.WithLabelValues(o.config.AccountName, event.Stream).Observe(event.End.Sub(event.Start).Seconds()) + o.metrics.jsSnapthotDuration.WithLabelValues(accountName, event.Stream).Observe(event.End.Sub(event.Start).Seconds()) case *advisory.JSConsumerLeaderElectedV1: - o.metrics.jsConsumerLeaderElected.WithLabelValues(o.config.AccountName, event.Stream).Inc() + o.metrics.jsConsumerLeaderElected.WithLabelValues(accountName, event.Stream).Inc() case *advisory.JSConsumerQuorumLostV1: - o.metrics.jsConsumerQuorumLost.WithLabelValues(o.config.AccountName, event.Stream).Inc() + o.metrics.jsConsumerQuorumLost.WithLabelValues(accountName, event.Stream).Inc() case *advisory.JSStreamLeaderElectedV1: - o.metrics.jsStreamLeaderElected.WithLabelValues(o.config.AccountName, event.Stream).Inc() + o.metrics.jsStreamLeaderElected.WithLabelValues(accountName, event.Stream).Inc() case *advisory.JSStreamQuorumLostV1: - o.metrics.jsStreamQuorumLost.WithLabelValues(o.config.AccountName, event.Stream).Inc() + o.metrics.jsStreamQuorumLost.WithLabelValues(accountName, event.Stream).Inc() case *advisory.JSConsumerDeliveryNakAdvisoryV1: - o.metrics.jsConsumerDeliveryNAK.WithLabelValues(o.config.AccountName, event.Stream, event.Consumer).Inc() + o.metrics.jsConsumerDeliveryNAK.WithLabelValues(accountName, event.Stream, event.Consumer).Inc() default: - o.metrics.jsUnknownAdvisoryCtr.WithLabelValues(schema, o.config.AccountName).Inc() + o.metrics.jsUnknownAdvisoryCtr.WithLabelValues(schema, accountName).Inc() o.logger.Warnf("Could not handle event as an JetStream Advisory with schema %s", schema) } } diff --git a/surveyor/jetstream_advisories_test.go b/surveyor/jetstream_advisories_test.go index 4db2ed4..a73bdbb 100644 --- a/surveyor/jetstream_advisories_test.go +++ b/surveyor/jetstream_advisories_test.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "reflect" + "strings" "testing" "time" @@ -74,6 +75,8 @@ func TestJetStream_limitJSSubject(t *testing.T) { {"$JS.API.STREAM.MSG.GET.ORDERS", "$JS.API.STREAM.MSG.GET"}, {"$JS.API.STREAM.LIST", "$JS.API.STREAM.LIST"}, {"$JS.API.CONSUMER.CREATE.ORDERS", "$JS.API.CONSUMER.CREATE"}, + {"$JS.API.CONSUMER.CREATE.ORDERS.NEW", "$JS.API.CONSUMER.CREATE"}, + {"$JS.API.CONSUMER.CREATE.ORDERS.NEW.filter", "$JS.API.CONSUMER.CREATE"}, {"$JS.API.CONSUMER.DURABLE.CREATE.ORDERS.NEW", "$JS.API.CONSUMER.DURABLE.CREATE"}, } @@ -221,6 +224,295 @@ nats_jetstream_acknowledgement_deliveries{account="global",consumer="OUT",stream } } +func TestJetStream_AggMetrics(t *testing.T) { + tests := []struct { + name string + advisoryConfig *JSAdvisoryConfig + configErrors []string + }{ + { + name: "aggregate service export from config", + advisoryConfig: &JSAdvisoryConfig{ + ID: "test_advisory", + AccountName: "aggregate_service", + Username: "agg_service", + Password: "agg_service", + ExternalAccountConfig: &JSAdvisoriesExternalAccountConfig{ + MetricsSubject: "$JS.EVENT.METRIC.ACC.*.>", + MetricsAccountTokenPosition: 5, + AdvisorySubject: "$JS.EVENT.ADVISORY.ACC.*.>", + AdvisoryAccountTokenPosition: 5, + }, + }, + }, + { + name: "aggregate stream export from config", + advisoryConfig: &JSAdvisoryConfig{ + ID: "test_advisory", + AccountName: "aggregate_service", + Username: "agg_stream", + Password: "agg_stream", + ExternalAccountConfig: &JSAdvisoriesExternalAccountConfig{ + MetricsSubject: "$JS.EVENT.METRIC.ACC.*.>", + MetricsAccountTokenPosition: 5, + AdvisorySubject: "$JS.EVENT.ADVISORY.ACC.*.>", + AdvisoryAccountTokenPosition: 5, + }, + }, + }, + { + name: "invalid config, empty subject", + advisoryConfig: &JSAdvisoryConfig{ + ID: "test_advisory", + AccountName: "aggregate_service", + Username: "agg_service", + Password: "agg_service", + ExternalAccountConfig: &JSAdvisoriesExternalAccountConfig{ + MetricsSubject: "", + MetricsAccountTokenPosition: 5, + AdvisorySubject: "", + AdvisoryAccountTokenPosition: 5, + }, + }, + configErrors: []string{ + "external_account_config.metrics_subject is required when importing metrics from external accounts", + "external_account_config.advisory_subject is required when importing advisories from external accounts", + }, + }, + { + name: "invalid config, empty token position", + advisoryConfig: &JSAdvisoryConfig{ + ID: "test_advisory", + AccountName: "aggregate_service", + Username: "agg_service", + Password: "agg_service", + ExternalAccountConfig: &JSAdvisoriesExternalAccountConfig{ + MetricsSubject: "JS.EVENT.METRICS.ACC.*.>", + MetricsAccountTokenPosition: 0, + AdvisorySubject: "$JS.EVENT.ADVISORY.ACC.*.>", + AdvisoryAccountTokenPosition: 0, + }, + }, + configErrors: []string{ + "external_account_config.metrics_account_token_position is required when importing metrics from external accounts", + "external_account_config.advisory_account_token_position is required when importing advisories from external accounts", + }, + }, + { + name: "invalid config, account token position out of range", + advisoryConfig: &JSAdvisoryConfig{ + ID: "test_advisory", + AccountName: "aggregate_service", + Username: "agg_service", + Password: "agg_service", + ExternalAccountConfig: &JSAdvisoriesExternalAccountConfig{ + MetricsSubject: "JS.EVENT.METRICS.ACC.*.>", + MetricsAccountTokenPosition: 7, + AdvisorySubject: "$JS.EVENT.ADVISORY.ACC.*.>", + AdvisoryAccountTokenPosition: 7, + }, + }, + configErrors: []string{ + "external_account_config.metrics_account_token_position is greater than the number of tokens in external_account_config.metrics_subject", + "external_account_config.advisory_account_token_position is greater than the number of tokens in external_account_config.advisory_subject", + }, + }, + { + name: "invalid config, token position is not a wildcard", + advisoryConfig: &JSAdvisoryConfig{ + ID: "test_advisory", + AccountName: "aggregate_service", + Username: "agg_service", + Password: "agg_service", + ExternalAccountConfig: &JSAdvisoriesExternalAccountConfig{ + MetricsSubject: "$JS.EVENT.METRICS.ACC.*.>", + MetricsAccountTokenPosition: 2, + AdvisorySubject: "$JS.EVENT.ADVISORY.ACC.*.>", + AdvisoryAccountTokenPosition: 2, + }, + }, + configErrors: []string{ + "external_account_config.metrics_subject must have a wildcard token at the position specified by external_account_config.metrics_account_token_position", + "external_account_config.advisory_subject must have a wildcard token at the position specified by external_account_config.advisory_account_token_position", + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + js := st.NewJetStreamServer(t) + defer js.Shutdown() + + opt := GetDefaultOptions() + opt.URLs = js.ClientURL() + metrics := NewJetStreamAdvisoryMetrics(prometheus.NewRegistry(), nil) + + s, err := NewSurveyor(opt) + if err != nil { + t.Fatalf("couldn't create surveyor: %v", err) + } + if err = s.Start(); err != nil { + t.Fatalf("start error: %v", err) + } + defer s.Stop() + config := test.advisoryConfig + advManager := s.JetStreamAdvisoryManager() + advManager.metrics = metrics + + err = advManager.Set(config) + if len(test.configErrors) > 0 { + errorsMatch(t, err, test.configErrors) + return + } + if err != nil { + t.Fatalf("Error setting advisory config: %s", err) + } + waitForAdvUpdate(t, advManager, map[string]*JSAdvisoryConfig{config.ID: config}) + + urlA := "nats://a:a@" + strings.TrimPrefix(js.ClientURL(), "nats://") + urlB := "nats://b:b@" + strings.TrimPrefix(js.ClientURL(), "nats://") + ncA, err := nats.Connect(urlA, nats.UseOldRequestStyle()) + if err != nil { + t.Fatalf("could not connect nats client: %s", err) + } + defer ncA.Close() + ncB, err := nats.Connect(urlB) + if err != nil { + t.Fatalf("could not connect nats client: %s", err) + } + defer ncB.Close() + + mgrA, err := jsm.New(ncA, jsm.WithTimeout(1100*time.Millisecond)) + if err != nil { + t.Fatalf("could not get manager: %s", err) + } + mgrB, err := jsm.New(ncB, jsm.WithTimeout(1100*time.Millisecond)) + if err != nil { + t.Fatalf("could not get manager: %s", err) + } + + if known, _ := mgrA.IsKnownStream("StreamA"); known { + t.Fatalf("SURVEYOR stream already exist") + } + if known, _ := mgrB.IsKnownStream("StreamB"); known { + t.Fatalf("SURVEYOR stream already exist") + } + + strA, err := mgrA.NewStream("StreamA", jsm.Subjects("js.in.streamA"), jsm.MemoryStorage()) + if err != nil { + t.Fatalf("could not create stream: %s", err) + } + _, err = mgrB.NewStream("StreamB", jsm.Subjects("js.in.streamB"), jsm.MemoryStorage()) + if err != nil { + t.Fatalf("could not create stream: %s", err) + } + + msg, err := ncA.Request("js.in.streamA", []byte("1"), time.Second) + if err != nil { + t.Fatalf("publish failed: %s", err) + } + if jsm.IsErrorResponse(msg) { + t.Fatalf("publish failed: %s", string(msg.Data)) + } + + consumer, err := strA.NewConsumer(jsm.AckWait(500*time.Millisecond), jsm.DurableName("OUT"), jsm.MaxDeliveryAttempts(1), jsm.SamplePercent(100)) + if err != nil { + t.Fatalf("could not create consumer: %s", err) + } + + consumer.NextMsg() + consumer.NextMsg() + + msg, err = ncA.Request("js.in.streamA", []byte("2"), time.Second) + if err != nil { + t.Fatalf("publish failed: %s", err) + } + if jsm.IsErrorResponse(msg) { + t.Fatalf("publish failed: %s", string(msg.Data)) + } + + msg, err = consumer.NextMsg() + if err != nil { + t.Fatalf("next failed: %s", err) + } + msg.Respond(nil) + + msg, err = ncA.Request("js.in.streamA", []byte("3"), time.Second) + if err != nil { + t.Fatalf("publish failed: %s", err) + } + if jsm.IsErrorResponse(msg) { + t.Fatalf("publish failed: %s", string(msg.Data)) + } + + msg, err = consumer.NextMsg() + if err != nil { + t.Fatalf("next failed: %s", err) + } + msg.Nak() + + // time for advisories to be sent and handled + time.Sleep(5 * time.Millisecond) + + expected := ` +# HELP nats_jetstream_delivery_exceeded_count Advisories about JetStream Consumer Delivery Exceeded events +# TYPE nats_jetstream_delivery_exceeded_count counter +nats_jetstream_delivery_exceeded_count{account="a",consumer="OUT",stream="StreamA"} 1 +` + err = ptu.CollectAndCompare(metrics.jsDeliveryExceededCtr, bytes.NewReader([]byte(expected))) + if err != nil { + t.Fatalf("metrics failed: %s", err) + } + + expected = ` +# HELP nats_jetstream_api_audit JetStream API access audit events +# TYPE nats_jetstream_api_audit counter +nats_jetstream_api_audit{account="a",subject="$JS.API.CONSUMER.DURABLE.CREATE"} 1 +nats_jetstream_api_audit{account="a",subject="$JS.API.STREAM.CREATE"} 1 +nats_jetstream_api_audit{account="b",subject="$JS.API.STREAM.CREATE"} 1 +nats_jetstream_api_audit{account="a",subject="$JS.API.STREAM.INFO"} 1 +nats_jetstream_api_audit{account="b",subject="$JS.API.STREAM.INFO"} 1 +` + err = ptu.CollectAndCompare(metrics.jsAPIAuditCtr, bytes.NewReader([]byte(expected))) + if err != nil { + t.Fatalf("metrics failed: %s", err) + } + + expected = ` +# HELP nats_jetstream_acknowledgement_deliveries How many times messages took to be delivered and Acknowledged +# TYPE nats_jetstream_acknowledgement_deliveries counter +nats_jetstream_acknowledgement_deliveries{account="a",consumer="OUT",stream="StreamA"} 1 +` + err = ptu.CollectAndCompare(metrics.jsAckMetricDeliveries, bytes.NewReader([]byte(expected))) + if err != nil { + t.Fatalf("metrics failed: %s", err) + } + + expected = ` + # HELP nats_jetstream_consumer_nak How many times a consumer sent a NAK + # TYPE nats_jetstream_consumer_nak counter + nats_jetstream_consumer_nak{account="a",consumer="OUT",stream="StreamA"} 1 + ` + err = ptu.CollectAndCompare(metrics.jsConsumerDeliveryNAK, bytes.NewReader([]byte(expected))) + if err != nil { + t.Fatalf("metrics failed: %s", err) + } + }) + } +} + +func errorsMatch(t *testing.T, err error, expectedErrors []string) { + t.Helper() + if err == nil && len(expectedErrors) > 0 { + t.Fatalf("Expected error; got nil") + } + for _, expectedError := range expectedErrors { + if !strings.Contains(err.Error(), expectedError) { + t.Fatalf("Expected error: %s; got: %s", expectedError, err.Error()) + } + } +} + func TestSurveyor_AdvisoriesFromFile(t *testing.T) { js := st.NewJetStreamServer(t) defer js.Shutdown() diff --git a/surveyor/observation.go b/surveyor/observation.go index 61887d7..513fce4 100644 --- a/surveyor/observation.go +++ b/surveyor/observation.go @@ -58,49 +58,49 @@ func NewServiceObservationMetrics(registry *prometheus.Registry, constLabels pro Name: prometheus.BuildFQName("nats", "latency", "observations_received_count"), Help: "Number of observations received by this surveyor across all services", ConstLabels: constLabels, - }, []string{"service", "app"}), + }, []string{"service", "app", "account"}), serviceRequestStatus: prometheus.NewCounterVec(prometheus.CounterOpts{ Name: prometheus.BuildFQName("nats", "latency", "observation_status_count"), Help: "The status result codes for requests to a service", ConstLabels: constLabels, - }, []string{"service", "status"}), + }, []string{"service", "status", "account"}), invalidObservationsReceived: prometheus.NewCounterVec(prometheus.CounterOpts{ Name: prometheus.BuildFQName("nats", "latency", "observation_error_count"), Help: "Number of observations received by this surveyor across all services that could not be handled", ConstLabels: constLabels, - }, []string{"service"}), + }, []string{"service", "account"}), serviceLatency: prometheus.NewHistogramVec(prometheus.HistogramOpts{ Name: prometheus.BuildFQName("nats", "latency", "service_duration"), Help: "Time spent serving the request in the service", ConstLabels: constLabels, - }, []string{"service", "app"}), + }, []string{"service", "app", "account"}), totalLatency: prometheus.NewHistogramVec(prometheus.HistogramOpts{ Name: prometheus.BuildFQName("nats", "latency", "total_duration"), Help: "Total time spent serving a service including network overheads", ConstLabels: constLabels, - }, []string{"service", "app"}), + }, []string{"service", "app", "account"}), requestorRTT: prometheus.NewHistogramVec(prometheus.HistogramOpts{ Name: prometheus.BuildFQName("nats", "latency", "requestor_rtt"), Help: "The RTT to the client making a request", ConstLabels: constLabels, - }, []string{"service", "app"}), + }, []string{"service", "app", "account"}), responderRTT: prometheus.NewHistogramVec(prometheus.HistogramOpts{ Name: prometheus.BuildFQName("nats", "latency", "responder_rtt"), Help: "The RTT to the service serving the request", ConstLabels: constLabels, - }, []string{"service", "app"}), + }, []string{"service", "app", "account"}), systemRTT: prometheus.NewHistogramVec(prometheus.HistogramOpts{ Name: prometheus.BuildFQName("nats", "latency", "system_rtt"), Help: "The RTT within the NATS system - time traveling clusters, gateways and leaf nodes", ConstLabels: constLabels, - }, []string{"service", "app"}), + }, []string{"service", "app", "account"}), } registry.MustRegister(metrics.invalidObservationsReceived) @@ -125,6 +125,10 @@ type ServiceObsConfig struct { ServiceName string `json:"name"` Topic string `json:"topic"` + // optional configuration for importing observations from other accounts + // it can only be set via ServiceObsConfig directly (not from config file) + ExternalAccountConfig *ServiceObservationExternalAccountConfig `json:"-"` + // connection options JWT string `json:"jwt"` Seed string `json:"seed"` @@ -142,6 +146,14 @@ type ServiceObsConfig struct { TLSConfig *tls.Config `json:"-"` } +type ServiceObservationExternalAccountConfig struct { + // account name position in subject, used when aggregating services across multiple accounts + AccountTokenPosition int `json:"account_token_position"` + // optional service name position in subject, useful when aggregating services across multiple accounts + // if not set, account name from account_token_position is used + ServiceNamePosition int `json:"service_name_position"` +} + // Validate is used to validate a ServiceObsConfig func (o *ServiceObsConfig) Validate() error { if o == nil { @@ -160,6 +172,25 @@ func (o *ServiceObsConfig) Validate() error { if o.Topic == "" { errs = append(errs, "topic is required") } + if o.ExternalAccountConfig != nil { + topicTokens := strings.Split(o.Topic, ".") + switch { + case o.ExternalAccountConfig.AccountTokenPosition <= 0: + errs = append(errs, "external_account_config.account_token_position is required") + case o.ExternalAccountConfig.AccountTokenPosition > len(topicTokens): + errs = append(errs, "external_account_config.account_token_position is greater than the number of tokens in the topic") + case topicTokens[o.ExternalAccountConfig.AccountTokenPosition-1] != "*": + errs = append(errs, "external_account_config.account_token_position must point to a wildcard token in the topic") + } + switch { + case o.ExternalAccountConfig.ServiceNamePosition < 0: + errs = append(errs, "external_account_config.service_name_position must be greater than 0") + case o.ExternalAccountConfig.ServiceNamePosition > len(topicTokens): + errs = append(errs, "external_account_config.service_name_position is greater than the number of tokens in the topic") + case topicTokens[o.ExternalAccountConfig.ServiceNamePosition-1] != "*": + errs = append(errs, "external_account_config.service_name_position must point to a wildcard token in the topic") + } + } if len(errs) == 0 { return nil @@ -275,35 +306,66 @@ func (o *serviceObsListener) Start() error { } func (o *serviceObsListener) observationHandler(m *nats.Msg) { + serviceName := o.config.ServiceName + var accountName string + var err error + if o.config.ExternalAccountConfig != nil { + if o.config.ExternalAccountConfig.AccountTokenPosition != 0 { + accountName, err = getTokenFromSubject(m.Subject, o.config.ExternalAccountConfig.AccountTokenPosition) + if err != nil { + o.metrics.invalidObservationsReceived.WithLabelValues(o.config.ServiceName, accountName).Inc() + o.logger.Warnf("invalid service observation subject received for id: %s, service name: %s, error: %v, subject: %s", o.config.ID, o.config.ServiceName, err, m.Subject) + return + } + } + if o.config.ExternalAccountConfig.ServiceNamePosition != 0 { + serviceName, err = getTokenFromSubject(m.Subject, o.config.ExternalAccountConfig.ServiceNamePosition) + if err != nil { + o.metrics.invalidObservationsReceived.WithLabelValues(o.config.ServiceName, accountName).Inc() + o.logger.Warnf("invalid service observation subject received for id: %s, service name: %s, error: %v, subject: %s", o.config.ID, o.config.ServiceName, err, m.Subject) + return + } + } else { + serviceName = accountName + } + } kind, obs, err := jsm.ParseEvent(m.Data) if err != nil { - o.metrics.invalidObservationsReceived.WithLabelValues(o.config.ServiceName).Inc() - o.logger.Warnf("unparsable service observation received for id: %s, service name: %s, error: %v, data: %q", o.config.ID, o.config.ServiceName, err, m.Data) + o.metrics.invalidObservationsReceived.WithLabelValues(serviceName, accountName).Inc() + o.logger.Warnf("unparsable service observation received for id: %s, service name: %s, error: %v, data: %q", o.config.ID, serviceName, err, m.Data) return } switch obs := obs.(type) { case *metric.ServiceLatencyV1: - o.metrics.observationsReceived.WithLabelValues(o.config.ServiceName, obs.Responder.Name).Inc() - o.metrics.serviceLatency.WithLabelValues(o.config.ServiceName, obs.Responder.Name).Observe(obs.ServiceLatency.Seconds()) - o.metrics.totalLatency.WithLabelValues(o.config.ServiceName, obs.Responder.Name).Observe(obs.TotalLatency.Seconds()) - o.metrics.requestorRTT.WithLabelValues(o.config.ServiceName, obs.Responder.Name).Observe(obs.Requestor.RTT.Seconds()) - o.metrics.responderRTT.WithLabelValues(o.config.ServiceName, obs.Responder.Name).Observe(obs.Responder.RTT.Seconds()) - o.metrics.systemRTT.WithLabelValues(o.config.ServiceName, obs.Responder.Name).Observe(obs.SystemLatency.Seconds()) + o.metrics.observationsReceived.WithLabelValues(serviceName, obs.Responder.Name, accountName).Inc() + o.metrics.serviceLatency.WithLabelValues(serviceName, obs.Responder.Name, accountName).Observe(obs.ServiceLatency.Seconds()) + o.metrics.totalLatency.WithLabelValues(serviceName, obs.Responder.Name, accountName).Observe(obs.TotalLatency.Seconds()) + o.metrics.requestorRTT.WithLabelValues(serviceName, obs.Responder.Name, accountName).Observe(obs.Requestor.RTT.Seconds()) + o.metrics.responderRTT.WithLabelValues(serviceName, obs.Responder.Name, accountName).Observe(obs.Responder.RTT.Seconds()) + o.metrics.systemRTT.WithLabelValues(serviceName, obs.Responder.Name, accountName).Observe(obs.SystemLatency.Seconds()) if obs.Status == 0 { - o.metrics.serviceRequestStatus.WithLabelValues(o.config.ServiceName, "500").Inc() + o.metrics.serviceRequestStatus.WithLabelValues(serviceName, "500", accountName).Inc() } else { - o.metrics.serviceRequestStatus.WithLabelValues(o.config.ServiceName, strconv.Itoa(obs.Status)).Inc() + o.metrics.serviceRequestStatus.WithLabelValues(serviceName, strconv.Itoa(obs.Status), accountName).Inc() } default: - o.metrics.invalidObservationsReceived.WithLabelValues(o.config.ServiceName).Inc() - o.logger.Warnf("unsupported service observation received for id: %s, service name: %s, kind: %s", o.config.ID, o.config.ServiceName, kind) + o.metrics.invalidObservationsReceived.WithLabelValues(serviceName, accountName).Inc() + o.logger.Warnf("unsupported service observation received for id: %s, service name: %s, kind: %s", o.config.ID, serviceName, kind) return } } +func getTokenFromSubject(subject string, token int) (string, error) { + parts := strings.Split(subject, ".") + if token-1 > len(parts) { + return "", fmt.Errorf("invalid subject: %q: expected service name on token position %d", subject, token) + } + return parts[token-1], nil +} + // Stop stops listening for observations func (o *serviceObsListener) Stop() { o.Lock() diff --git a/surveyor/observation_test.go b/surveyor/observation_test.go index 4085804..117435e 100644 --- a/surveyor/observation_test.go +++ b/surveyor/observation_test.go @@ -19,11 +19,13 @@ import ( "fmt" "os" "reflect" + "strings" "testing" "time" "github.com/nats-io/nats-server/v2/server" st "github.com/nats-io/nats-surveyor/test" + "github.com/nats-io/nats.go" "github.com/nats-io/nuid" "github.com/prometheus/client_golang/prometheus" ptu "github.com/prometheus/client_golang/prometheus/testutil" @@ -122,7 +124,7 @@ nats_latency_observations_count 1 t.Fatalf("subscribe failed: %s", err) } - statusses := []int{0, 200, 400, 500} + statuses := []int{0, 200, 400, 500} // send a bunch of observations for i := 0; i < 10; i++ { @@ -145,7 +147,7 @@ nats_latency_observations_count 1 RequestStart: time.Now(), ServiceLatency: 333 * time.Microsecond, SystemLatency: 333 * time.Microsecond, - Status: statusses[i%4], + Status: statuses[i%4], } oj, err := json.Marshal(observation) if err != nil { @@ -176,7 +178,7 @@ nats_latency_observations_count 1 expected = ` # HELP nats_latency_observations_received_count Number of observations received by this surveyor across all services # TYPE nats_latency_observations_received_count counter -nats_latency_observations_received_count{app="testing_service",service="testing"} 10 +nats_latency_observations_received_count{account="",app="testing_service",service="testing"} 10 ` err = ptu.CollectAndCompare(metrics.observationsReceived, bytes.NewReader([]byte(expected))) if err != nil { @@ -208,7 +210,7 @@ nats_latency_observations_received_count{app="testing_service",service="testing" expected = ` # HELP nats_latency_observation_error_count Number of observations received by this surveyor across all services that could not be handled # TYPE nats_latency_observation_error_count counter -nats_latency_observation_error_count{service="testing"} 10 +nats_latency_observation_error_count{account="",service="testing"} 10 ` err = ptu.CollectAndCompare(metrics.invalidObservationsReceived, bytes.NewReader([]byte(expected))) if err != nil { @@ -218,7 +220,7 @@ nats_latency_observation_error_count{service="testing"} 10 expected = ` # HELP nats_latency_observations_received_count Number of observations received by this surveyor across all services # TYPE nats_latency_observations_received_count counter -nats_latency_observations_received_count{app="testing_service",service="testing"} 10 +nats_latency_observations_received_count{account="",app="testing_service",service="testing"} 10 ` err = ptu.CollectAndCompare(metrics.observationsReceived, bytes.NewReader([]byte(expected))) if err != nil { @@ -228,9 +230,9 @@ nats_latency_observations_received_count{app="testing_service",service="testing" expected = ` # HELP nats_latency_observation_status_count The status result codes for requests to a service # TYPE nats_latency_observation_status_count counter -nats_latency_observation_status_count{service="testing",status="200"} 3 -nats_latency_observation_status_count{service="testing",status="400"} 2 -nats_latency_observation_status_count{service="testing",status="500"} 5 +nats_latency_observation_status_count{account="",service="testing",status="200"} 3 +nats_latency_observation_status_count{account="",service="testing",status="400"} 2 +nats_latency_observation_status_count{account="",service="testing",status="500"} 5 ` err = ptu.CollectAndCompare(metrics.serviceRequestStatus, bytes.NewReader([]byte(expected))) if err != nil { @@ -238,6 +240,196 @@ nats_latency_observation_status_count{service="testing",status="500"} 5 } } +func TestServiceObservation_Aggregate(t *testing.T) { + tests := []struct { + name string + obsConfig *ServiceObsConfig + configErrors []string + }{ + { + name: "aggregate stream export from config", + obsConfig: &ServiceObsConfig{ + ID: "test", + ServiceName: "aggregate", + Topic: "test.service.latency.ACC.*.*", + Username: "agg_stream", + Password: "agg_stream", + ExternalAccountConfig: &ServiceObservationExternalAccountConfig{ + AccountTokenPosition: 5, + ServiceNamePosition: 6, + }, + }, + }, + { + name: "aggregate service export from config", + obsConfig: &ServiceObsConfig{ + ID: "test", + ServiceName: "aggregate", + Topic: "test.service.latency.ACC.*.*", + Username: "agg_service", + Password: "agg_service", + ExternalAccountConfig: &ServiceObservationExternalAccountConfig{ + AccountTokenPosition: 5, + ServiceNamePosition: 6, + }, + }, + }, + { + name: "invalid config, empty account token position", + obsConfig: &ServiceObsConfig{ + ID: "test", + ServiceName: "aggregate", + Topic: "test.service.latency.ACC.*.*", + Username: "agg_service", + Password: "agg_service", + ExternalAccountConfig: &ServiceObservationExternalAccountConfig{ + AccountTokenPosition: 0, + ServiceNamePosition: -1, + }, + }, + configErrors: []string{ + "external_account_config.account_token_position is required", + "external_account_config.service_name_position must be greater than 0", + }, + }, + { + name: "invalid config, token positions out of range", + obsConfig: &ServiceObsConfig{ + ID: "test", + ServiceName: "aggregate", + Topic: "test.service.latency.ACC.*.*", + Username: "agg_service", + Password: "agg_service", + ExternalAccountConfig: &ServiceObservationExternalAccountConfig{ + AccountTokenPosition: 7, + ServiceNamePosition: 7, + }, + }, + configErrors: []string{ + "external_account_config.account_token_position is greater than the number of tokens in the topic", + "external_account_config.service_name_position is greater than the number of tokens in the topic", + }, + }, + { + name: "token positions do not match to wildcard positions in topic", + obsConfig: &ServiceObsConfig{ + ID: "test", + ServiceName: "aggregate", + Topic: "test.service.latency.ACC.*.*", + Username: "agg_service", + Password: "agg_service", + ExternalAccountConfig: &ServiceObservationExternalAccountConfig{ + AccountTokenPosition: 1, + ServiceNamePosition: 2, + }, + }, + configErrors: []string{ + "external_account_config.account_token_position must point to a wildcard token in the topic", + "external_account_config.service_name_position must point to a wildcard token in the topic", + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + srv := st.NewServerFromConfig(t, "../test/services.conf") + defer srv.Shutdown() + + opt := GetDefaultOptions() + opt.URLs = srv.ClientURL() + metrics := NewServiceObservationMetrics(prometheus.NewRegistry(), nil) + s, err := NewSurveyor(opt) + if err != nil { + t.Fatalf("couldn't create surveyor: %v", err) + } + if err = s.Start(); err != nil { + t.Fatalf("start error: %v", err) + } + defer s.Stop() + config := test.obsConfig + obsManager := s.ServiceObservationManager() + obsManager.metrics = metrics + err = obsManager.Set(config) + if len(test.configErrors) > 0 { + errorsMatch(t, err, test.configErrors) + return + } + if err != nil { + t.Fatalf("Error setting advisory config: %s", err) + } + waitForObsUpdate(t, obsManager, map[string]*ServiceObsConfig{config.ID: config}) + + urlAgg := fmt.Sprintf("nats://%s:%s@", config.Username, config.Password) + strings.TrimPrefix(srv.ClientURL(), "nats://") + urlA := "nats://a:a@" + strings.TrimPrefix(srv.ClientURL(), "nats://") + urlB := "nats://b:b@" + strings.TrimPrefix(srv.ClientURL(), "nats://") + ncAgg, err := nats.Connect(urlAgg) + if err != nil { + t.Fatalf("could not connect nats client: %s", err) + } + defer ncAgg.Close() + ncA, err := nats.Connect(urlA, nats.UseOldRequestStyle(), nats.Name("testing_service")) + if err != nil { + t.Fatalf("could not connect nats client: %s", err) + } + defer ncA.Close() + ncB, err := nats.Connect(urlB) + if err != nil { + t.Fatalf("could not connect nats client: %s", err) + } + defer ncB.Close() + + expected := ` +# HELP nats_latency_observations_count Number of Service Latency listeners that are running +# TYPE nats_latency_observations_count gauge +nats_latency_observations_count 1 +` + err = ptu.CollectAndCompare(metrics.observationsGauge, bytes.NewReader([]byte(expected))) + if err != nil { + t.Fatalf("Invalid observations counter: %s", err) + } + + sub, err := ncAgg.SubscribeSync("test.service.latency.>") + if err != nil { + t.Fatalf("subscribe failed: %s", err) + } + + replySub, err := ncA.Subscribe("test.service", func(m *nats.Msg) { + m.Respond([]byte("hello")) + }) + if err != nil { + t.Fatalf("subscribe failed: %s", err) + } + defer replySub.Unsubscribe() + // send a bunch of observations + for i := 0; i < 10; i++ { + _, err := ncB.Request("test.service", []byte("hello"), time.Second) + if err != nil { + t.Fatalf("request failed: %s", err) + } + } + + // wait for all observations to be received in the test subscription + for i := 0; i < 10; i++ { + _, err = sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("test subscriber didn't receive all published messages") + } + } + + // sleep a bit just in case of slower delivery to the observer + time.Sleep(time.Second) + expected = ` + # HELP nats_latency_observations_received_count Number of observations received by this surveyor across all services + # TYPE nats_latency_observations_received_count counter + nats_latency_observations_received_count{account="a",app="testing_service",service="myservice"} 10 + ` + err = ptu.CollectAndCompare(metrics.observationsReceived, bytes.NewReader([]byte(expected))) + if err != nil { + t.Fatalf("Invalid observations counter: %s", err) + } + }) + } +} + func TestSurveyor_ObservationsFromFile(t *testing.T) { sc := st.NewSuperCluster(t) defer sc.Shutdown() diff --git a/test/jetstream.conf b/test/jetstream.conf index 0930287..da7635d 100644 --- a/test/jetstream.conf +++ b/test/jetstream.conf @@ -13,12 +13,28 @@ accounts { users: [ {user: a, password: a} ] + # for testing aggregate account with stream imports + exports [ + { stream: '$JS.EVENT.*.>' } + ] + # for testing aggregate account with service imports + imports [ + { service: { account: aggregate_service, subject: '$JS.EVENT.*.ACC.a.>' }, to: '$JS.EVENT.*.>' } + ] } b: { jetstream: enabled, users: [ { user: b, password: b } ] + # for testing aggregate account with stream imports + exports [ + { stream: '$JS.EVENT.*.>' } + ] + # for testing aggregate account with service imports + imports [ + { service: { account: aggregate_service, subject: '$JS.EVENT.*.ACC.b.>' }, to: '$JS.EVENT.*.>' } + ] } c: { jetstream: enabled, @@ -38,5 +54,26 @@ accounts { {user: global, password: global} ] } + ## this account aggregates js advisory events from accounts a and b via stream imports + aggregate_stream: { + jetstream: enabled, + users: [ + {user: agg_stream, password: agg_stream} + ] + imports [ + { stream: { account: a, subject: "$JS.EVENT.*.>" }, to: "$JS.EVENT.*.ACC.a.>" } + { stream: { account: b, subject: "$JS.EVENT.*.>" }, to: "$JS.EVENT.*.ACC.b.>" } + ] + } + # this account aggregates js advisory events from accounts a and b via service export + aggregate_service: { + jetstream: enabled, + users: [ + {user: agg_service, password: agg_service} + ] + exports [ + { service: '$JS.EVENT.*.ACC.*.>', account_token_position: 5 } + ] + } sys: {} } diff --git a/test/services.conf b/test/services.conf new file mode 100644 index 0000000..328dca8 --- /dev/null +++ b/test/services.conf @@ -0,0 +1,54 @@ +server_name: service_observations + +listen: 0.0.0.0:-1 + +no_auth_user: a + +accounts { + a: { + users: [ + {user: a, password: a} + ] + # for testing aggregate account with stream imports + exports [ + { + service: 'test.service', + latency: { + subject: test.service.latency + sampling: "100%" + } + } + { stream: 'test.service.latency' } + ] + imports [ + { service: { account: aggregate_service, subject: 'test.service.latency.ACC.a.myservice' }, to: 'test.service.latency' } + ] + } + b: { + users: [ + { user: b, password: b } + ] + # for testing aggregate account with service imports + imports [ + { service: { account: a, subject: 'test.service' }, share: true } + ] + } + ## this account aggregates service observation events from accounts a and b via stream imports + aggregate_stream: { + users: [ + {user: agg_stream, password: agg_stream} + ] + imports [ + { stream: { account: a, subject: "test.service.latency" }, to: "test.service.latency.ACC.a.myservice" } + ] + } + # this account aggregates service observation events from accounts a and b via service export + aggregate_service: { + users: [ + {user: agg_service, password: agg_service} + ] + exports [ + { service: 'test.service.latency.ACC.*.*', account_token_position: 5 } + ] + } +} diff --git a/test/test.go b/test/test.go index bf799e4..ea3e4fc 100644 --- a/test/test.go +++ b/test/test.go @@ -163,6 +163,13 @@ func NewJetStreamServer(t *testing.T) *server.Server { return s } +// NewServerFromConfig creates a single NATS server using provided config file +func NewServerFromConfig(t *testing.T, configFile string) *server.Server { + s := StartServer(t, configFile) + ConnectAndVerify(t, s.ClientURL()) + return s +} + func NewJetStreamCluster(t *testing.T) *SuperCluster { t.Helper() cluster := SuperCluster{