From 99585ef3a61a9ab21ea327ba9fe18397a893b120 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Sat, 14 Dec 2024 17:42:38 +0000 Subject: [PATCH] Add JetStream server stats --- surveyor/collector_statz.go | 66 ++++++++++++++++++++++++++++++------- surveyor/surveyor_test.go | 39 ++++++++++++++++++++++ 2 files changed, 94 insertions(+), 11 deletions(-) diff --git a/surveyor/collector_statz.go b/surveyor/collector_statz.go index 75e138d..c96a01c 100644 --- a/surveyor/collector_statz.go +++ b/surveyor/collector_statz.go @@ -96,6 +96,14 @@ type statzDescs struct { JetstreamClusterRaftGroupReplicaActive *prometheus.Desc JetstreamClusterRaftGroupReplicaCurrent *prometheus.Desc JetstreamClusterRaftGroupReplicaOffline *prometheus.Desc + // JetStream server stats + JetstreamServerDisabled *prometheus.Desc + JetstreamServerStreams *prometheus.Desc + JetstreamServerConsumers *prometheus.Desc + JetstreamServerMessages *prometheus.Desc + JetstreamServerBytes *prometheus.Desc + JetstreamServerMaxMemory *prometheus.Desc + JetstreamServerMaxStorage *prometheus.Desc // Account scope metrics accCount *prometheus.Desc @@ -132,6 +140,7 @@ type StatzCollector struct { uptime time.Duration stats []*server.ServerStatsMsg statsChan chan *server.ServerStatsMsg + jsStats []*jsStat accStats []*accountStats rtts map[string]time.Duration pollTimeout time.Duration @@ -307,6 +316,15 @@ func (sc *StatzCollector) buildDescs() { sc.descs.JetstreamClusterRaftGroupReplicaCurrent = newPromDesc("jetstream_cluster_raft_group_replica_peer_current", "Jetstream RAFT Group Peer is current: 1 or not: 0", jsClusterReplicaLabelKeys) sc.descs.JetstreamClusterRaftGroupReplicaOffline = newPromDesc("jetstream_cluster_raft_group_replica_peer_offline", "Jetstream RAFT Group Peer is offline: 1 or online: 0", jsClusterReplicaLabelKeys) + jsServerLabelKeys := []string{"server_id", "server_name", "cluster_name"} + sc.descs.JetstreamServerDisabled = newPromDesc("jetstream_server_jetstream_disabled", "JetStream disabled or not", jsServerLabelKeys) + sc.descs.JetstreamServerStreams = newPromDesc("jetstream_server_total_streams", "Total number of streams in JetStream", jsServerLabelKeys) + sc.descs.JetstreamServerConsumers = newPromDesc("jetstream_server_total_consumers", "Total number of consumers in JetStream", jsServerLabelKeys) + sc.descs.JetstreamServerMessages = newPromDesc("jetstream_server_total_messages", "Total number of stored messages in JetStream", jsServerLabelKeys) + sc.descs.JetstreamServerBytes = newPromDesc("jetstream_server_total_message_bytes", "Total number of bytes stored in JetStream", jsServerLabelKeys) + sc.descs.JetstreamServerMaxMemory = newPromDesc("jetstream_server_max_memory", "JetStream Max Memory", jsServerLabelKeys) + sc.descs.JetstreamServerMaxStorage = newPromDesc("jetstream_server_max_storage", "JetStream Max Storage", jsServerLabelKeys) + // Account scope metrics if sc.collectAccounts { accLabel := []string{"account"} @@ -578,8 +596,8 @@ func (sc *StatzCollector) pollAccountInfo() error { accStats[accID] = sts } - jsInfos := sc.getJSInfos(nc) - for accID, jsInfo := range jsInfos { + accDetails, jsStats := sc.getJSInfos(nc) + for accID, accDetail := range accDetails { sts, ok := accStats[accID] // If no account stats returned, still report JS metrics if !ok { @@ -588,17 +606,17 @@ func (sc *StatzCollector) pollAccountInfo() error { } } sts.jetstreamEnabled = 1.0 - sts.jetstreamMemoryUsed = float64(jsInfo.Memory) - sts.jetstreamStorageUsed = float64(jsInfo.Store) - sts.jetstreamMemoryReserved = float64(jsInfo.ReservedMemory) - sts.jetstreamStorageReserved = float64(jsInfo.ReservedStore) + sts.jetstreamMemoryUsed = float64(accDetail.Memory) + sts.jetstreamStorageUsed = float64(accDetail.Store) + sts.jetstreamMemoryReserved = float64(accDetail.ReservedMemory) + sts.jetstreamStorageReserved = float64(accDetail.ReservedStore) sts.jetstreamTieredMemoryUsed = make(map[int]float64) sts.jetstreamTieredStorageUsed = make(map[int]float64) sts.jetstreamTieredMemoryReserved = make(map[int]float64) sts.jetstreamTieredStorageReserved = make(map[int]float64) - sts.jetstreamStreamCount = float64(len(jsInfo.Streams)) - for _, stream := range jsInfo.Streams { + sts.jetstreamStreamCount = float64(len(accDetail.Streams)) + for _, stream := range accDetail.Streams { sts.jetstreamStreams = append(sts.jetstreamStreams, streamAccountStats{ streamName: stream.Name, raftGroup: stream.RaftGroup, @@ -634,6 +652,7 @@ func (sc *StatzCollector) pollAccountInfo() error { } sc.Lock() + sc.jsStats = jsStats sc.accStats = make([]*accountStats, 0, len(accStats)) for _, acc := range accStats { sc.accStats = append(sc.accStats, acc) @@ -643,7 +662,7 @@ func (sc *StatzCollector) pollAccountInfo() error { return nil } -func (sc *StatzCollector) getJSInfos(nc *nats.Conn) map[string]*server.AccountDetail { +func (sc *StatzCollector) getJSInfos(nc *nats.Conn) (map[string]*server.AccountDetail, []*jsStat) { opts := server.JSzOptions{ Accounts: true, Streams: true, @@ -651,6 +670,7 @@ func (sc *StatzCollector) getJSInfos(nc *nats.Conn) map[string]*server.AccountDe Config: true, RaftGroups: true, } + jss := make([]*jsStat, 0) res := make([]*server.JSInfo, 0) req, err := json.Marshal(opts) if err != nil { @@ -674,10 +694,14 @@ func (sc *StatzCollector) getJSInfos(nc *nats.Conn) map[string]*server.AccountDe if r.Error != nil { if strings.Contains(r.Error.Description, "jetstream not enabled") { // jetstream is not enabled on server - return nil + return nil, nil } continue } + jss = append(jss, &jsStat{ + Server: r.Server, + Data: &d, + }) res = append(res, &d) if sc.numServers != -1 && len(res) == sc.numServers { break @@ -697,7 +721,7 @@ func (sc *StatzCollector) getJSInfos(nc *nats.Conn) map[string]*server.AccountDe } } - return jsAccInfos + return jsAccInfos, jss } type accStatz struct { @@ -705,6 +729,11 @@ type accStatz struct { Data server.AccountStatz `json:"data,omitempty"` } +type jsStat struct { + Server *server.ServerInfo + Data *server.JSInfo +} + type accStat struct { Server *server.ServerInfo Data *server.AccountStat @@ -1094,6 +1123,21 @@ func (sc *StatzCollector) Collect(ch chan<- prometheus.Metric) { metrics.newGaugeMetric(sc.descs.accJetstreamReplicaCount, streamStat.replicaCount, append(accLabels, streamStat.streamName, streamStat.raftGroup)) } } + + for _, jss := range sc.jsStats { + jsServerLabelValues := []string{jss.Server.ID, jss.Server.Name, jss.Server.Cluster} + var isJetStreamDisabled float64 + if jss.Data.Disabled { + isJetStreamDisabled = 1 + } + metrics.newGaugeMetric(sc.descs.JetstreamServerDisabled, isJetStreamDisabled, jsServerLabelValues) + metrics.newGaugeMetric(sc.descs.JetstreamServerStreams, float64(jss.Data.Streams), jsServerLabelValues) + metrics.newGaugeMetric(sc.descs.JetstreamServerConsumers, float64(jss.Data.Consumers), jsServerLabelValues) + metrics.newGaugeMetric(sc.descs.JetstreamServerMessages, float64(jss.Data.Messages), jsServerLabelValues) + metrics.newGaugeMetric(sc.descs.JetstreamServerBytes, float64(jss.Data.Bytes), jsServerLabelValues) + metrics.newGaugeMetric(sc.descs.JetstreamServerMaxMemory, float64(jss.Data.Config.MaxMemory), jsServerLabelValues) + metrics.newGaugeMetric(sc.descs.JetstreamServerMaxStorage, float64(jss.Data.Config.MaxStore), jsServerLabelValues) + } } collectCh := make(chan prometheus.Metric) diff --git a/surveyor/surveyor_test.go b/surveyor/surveyor_test.go index 9beb538..50b8913 100644 --- a/surveyor/surveyor_test.go +++ b/surveyor/surveyor_test.go @@ -350,6 +350,45 @@ func TestSurveyor_AccountJetStreamAssets(t *testing.T) { } } +func TestSurveyor_JetStream_Server(t *testing.T) { + sc := st.NewSuperCluster(t) + defer sc.Shutdown() + + opt := getTestOptions() + opt.Accounts = true + opt.ExpectedServers = 3 + s, err := NewSurveyor(opt) + if err != nil { + t.Fatalf("couldn't create surveyor: %v", err) + } + if err = s.Start(); err != nil { + t.Fatalf("start error: %v", err) + } + + defer s.Stop() + + output, err := PollSurveyorEndpoint(t, "http://127.0.0.1:7777/metrics", false, http.StatusOK) + if err != nil { + t.Fatal(err) + } + + want := []string{ + "nats_core_jetstream_server_jetstream_disabled", + "nats_core_jetstream_server_total_streams", + "nats_core_jetstream_server_total_consumers", + "nats_core_jetstream_server_total_messages", + "nats_core_jetstream_server_total_message_bytes", + "nats_core_jetstream_server_max_memory", + "nats_core_jetstream_server_max_storage", + } + for _, m := range want { + if !strings.Contains(output, m) { + t.Logf("output: %s", output) + t.Fatalf("missing: %s", m) + } + } +} + func TestSurveyor_Reconnect(t *testing.T) { ns := st.NewSingleServer(t) defer ns.Shutdown()