Skip to content

Commit

Permalink
Add JetStream server stats
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmk committed Dec 14, 2024
1 parent 743894b commit 99585ef
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 11 deletions.
66 changes: 55 additions & 11 deletions surveyor/collector_statz.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ type statzDescs struct {
JetstreamClusterRaftGroupReplicaActive *prometheus.Desc
JetstreamClusterRaftGroupReplicaCurrent *prometheus.Desc
JetstreamClusterRaftGroupReplicaOffline *prometheus.Desc
// JetStream server stats
JetstreamServerDisabled *prometheus.Desc
JetstreamServerStreams *prometheus.Desc
JetstreamServerConsumers *prometheus.Desc
JetstreamServerMessages *prometheus.Desc
JetstreamServerBytes *prometheus.Desc
JetstreamServerMaxMemory *prometheus.Desc
JetstreamServerMaxStorage *prometheus.Desc

// Account scope metrics
accCount *prometheus.Desc
Expand Down Expand Up @@ -132,6 +140,7 @@ type StatzCollector struct {
uptime time.Duration
stats []*server.ServerStatsMsg
statsChan chan *server.ServerStatsMsg
jsStats []*jsStat
accStats []*accountStats
rtts map[string]time.Duration
pollTimeout time.Duration
Expand Down Expand Up @@ -307,6 +316,15 @@ func (sc *StatzCollector) buildDescs() {
sc.descs.JetstreamClusterRaftGroupReplicaCurrent = newPromDesc("jetstream_cluster_raft_group_replica_peer_current", "Jetstream RAFT Group Peer is current: 1 or not: 0", jsClusterReplicaLabelKeys)
sc.descs.JetstreamClusterRaftGroupReplicaOffline = newPromDesc("jetstream_cluster_raft_group_replica_peer_offline", "Jetstream RAFT Group Peer is offline: 1 or online: 0", jsClusterReplicaLabelKeys)

jsServerLabelKeys := []string{"server_id", "server_name", "cluster_name"}
sc.descs.JetstreamServerDisabled = newPromDesc("jetstream_server_jetstream_disabled", "JetStream disabled or not", jsServerLabelKeys)
sc.descs.JetstreamServerStreams = newPromDesc("jetstream_server_total_streams", "Total number of streams in JetStream", jsServerLabelKeys)
sc.descs.JetstreamServerConsumers = newPromDesc("jetstream_server_total_consumers", "Total number of consumers in JetStream", jsServerLabelKeys)
sc.descs.JetstreamServerMessages = newPromDesc("jetstream_server_total_messages", "Total number of stored messages in JetStream", jsServerLabelKeys)
sc.descs.JetstreamServerBytes = newPromDesc("jetstream_server_total_message_bytes", "Total number of bytes stored in JetStream", jsServerLabelKeys)
sc.descs.JetstreamServerMaxMemory = newPromDesc("jetstream_server_max_memory", "JetStream Max Memory", jsServerLabelKeys)
sc.descs.JetstreamServerMaxStorage = newPromDesc("jetstream_server_max_storage", "JetStream Max Storage", jsServerLabelKeys)

// Account scope metrics
if sc.collectAccounts {
accLabel := []string{"account"}
Expand Down Expand Up @@ -578,8 +596,8 @@ func (sc *StatzCollector) pollAccountInfo() error {
accStats[accID] = sts
}

jsInfos := sc.getJSInfos(nc)
for accID, jsInfo := range jsInfos {
accDetails, jsStats := sc.getJSInfos(nc)
for accID, accDetail := range accDetails {
sts, ok := accStats[accID]
// If no account stats returned, still report JS metrics
if !ok {
Expand All @@ -588,17 +606,17 @@ func (sc *StatzCollector) pollAccountInfo() error {
}
}
sts.jetstreamEnabled = 1.0
sts.jetstreamMemoryUsed = float64(jsInfo.Memory)
sts.jetstreamStorageUsed = float64(jsInfo.Store)
sts.jetstreamMemoryReserved = float64(jsInfo.ReservedMemory)
sts.jetstreamStorageReserved = float64(jsInfo.ReservedStore)
sts.jetstreamMemoryUsed = float64(accDetail.Memory)
sts.jetstreamStorageUsed = float64(accDetail.Store)
sts.jetstreamMemoryReserved = float64(accDetail.ReservedMemory)
sts.jetstreamStorageReserved = float64(accDetail.ReservedStore)
sts.jetstreamTieredMemoryUsed = make(map[int]float64)
sts.jetstreamTieredStorageUsed = make(map[int]float64)
sts.jetstreamTieredMemoryReserved = make(map[int]float64)
sts.jetstreamTieredStorageReserved = make(map[int]float64)

sts.jetstreamStreamCount = float64(len(jsInfo.Streams))
for _, stream := range jsInfo.Streams {
sts.jetstreamStreamCount = float64(len(accDetail.Streams))
for _, stream := range accDetail.Streams {
sts.jetstreamStreams = append(sts.jetstreamStreams, streamAccountStats{
streamName: stream.Name,
raftGroup: stream.RaftGroup,
Expand Down Expand Up @@ -634,6 +652,7 @@ func (sc *StatzCollector) pollAccountInfo() error {
}

sc.Lock()
sc.jsStats = jsStats
sc.accStats = make([]*accountStats, 0, len(accStats))
for _, acc := range accStats {
sc.accStats = append(sc.accStats, acc)
Expand All @@ -643,14 +662,15 @@ func (sc *StatzCollector) pollAccountInfo() error {
return nil
}

func (sc *StatzCollector) getJSInfos(nc *nats.Conn) map[string]*server.AccountDetail {
func (sc *StatzCollector) getJSInfos(nc *nats.Conn) (map[string]*server.AccountDetail, []*jsStat) {
opts := server.JSzOptions{
Accounts: true,
Streams: true,
Consumer: true,
Config: true,
RaftGroups: true,
}
jss := make([]*jsStat, 0)
res := make([]*server.JSInfo, 0)
req, err := json.Marshal(opts)
if err != nil {
Expand All @@ -674,10 +694,14 @@ func (sc *StatzCollector) getJSInfos(nc *nats.Conn) map[string]*server.AccountDe
if r.Error != nil {
if strings.Contains(r.Error.Description, "jetstream not enabled") {
// jetstream is not enabled on server
return nil
return nil, nil
}
continue
}
jss = append(jss, &jsStat{
Server: r.Server,
Data: &d,
})
res = append(res, &d)
if sc.numServers != -1 && len(res) == sc.numServers {
break
Expand All @@ -697,14 +721,19 @@ func (sc *StatzCollector) getJSInfos(nc *nats.Conn) map[string]*server.AccountDe
}
}

return jsAccInfos
return jsAccInfos, jss
}

type accStatz struct {
server.ServerAPIResponse
Data server.AccountStatz `json:"data,omitempty"`
}

type jsStat struct {
Server *server.ServerInfo
Data *server.JSInfo
}

type accStat struct {
Server *server.ServerInfo
Data *server.AccountStat
Expand Down Expand Up @@ -1094,6 +1123,21 @@ func (sc *StatzCollector) Collect(ch chan<- prometheus.Metric) {
metrics.newGaugeMetric(sc.descs.accJetstreamReplicaCount, streamStat.replicaCount, append(accLabels, streamStat.streamName, streamStat.raftGroup))
}
}

for _, jss := range sc.jsStats {
jsServerLabelValues := []string{jss.Server.ID, jss.Server.Name, jss.Server.Cluster}
var isJetStreamDisabled float64
if jss.Data.Disabled {
isJetStreamDisabled = 1
}
metrics.newGaugeMetric(sc.descs.JetstreamServerDisabled, isJetStreamDisabled, jsServerLabelValues)
metrics.newGaugeMetric(sc.descs.JetstreamServerStreams, float64(jss.Data.Streams), jsServerLabelValues)
metrics.newGaugeMetric(sc.descs.JetstreamServerConsumers, float64(jss.Data.Consumers), jsServerLabelValues)
metrics.newGaugeMetric(sc.descs.JetstreamServerMessages, float64(jss.Data.Messages), jsServerLabelValues)
metrics.newGaugeMetric(sc.descs.JetstreamServerBytes, float64(jss.Data.Bytes), jsServerLabelValues)
metrics.newGaugeMetric(sc.descs.JetstreamServerMaxMemory, float64(jss.Data.Config.MaxMemory), jsServerLabelValues)
metrics.newGaugeMetric(sc.descs.JetstreamServerMaxStorage, float64(jss.Data.Config.MaxStore), jsServerLabelValues)
}
}

collectCh := make(chan prometheus.Metric)
Expand Down
39 changes: 39 additions & 0 deletions surveyor/surveyor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,45 @@ func TestSurveyor_AccountJetStreamAssets(t *testing.T) {
}
}

func TestSurveyor_JetStream_Server(t *testing.T) {
sc := st.NewSuperCluster(t)
defer sc.Shutdown()

opt := getTestOptions()
opt.Accounts = true
opt.ExpectedServers = 3
s, err := NewSurveyor(opt)
if err != nil {
t.Fatalf("couldn't create surveyor: %v", err)
}
if err = s.Start(); err != nil {
t.Fatalf("start error: %v", err)
}

defer s.Stop()

output, err := PollSurveyorEndpoint(t, "http://127.0.0.1:7777/metrics", false, http.StatusOK)
if err != nil {
t.Fatal(err)
}

want := []string{
"nats_core_jetstream_server_jetstream_disabled",
"nats_core_jetstream_server_total_streams",
"nats_core_jetstream_server_total_consumers",
"nats_core_jetstream_server_total_messages",
"nats_core_jetstream_server_total_message_bytes",
"nats_core_jetstream_server_max_memory",
"nats_core_jetstream_server_max_storage",
}
for _, m := range want {
if !strings.Contains(output, m) {
t.Logf("output: %s", output)
t.Fatalf("missing: %s", m)
}
}
}

func TestSurveyor_Reconnect(t *testing.T) {
ns := st.NewSingleServer(t)
defer ns.Shutdown()
Expand Down

0 comments on commit 99585ef

Please sign in to comment.