Skip to content

Commit

Permalink
Add raft group for JetStream metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmk committed Nov 7, 2024
1 parent c847275 commit acb4e96
Showing 1 changed file with 11 additions and 8 deletions.
19 changes: 11 additions & 8 deletions surveyor/collector_statz.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ type accountStats struct {

type streamAccountStats struct {
streamName string
raftGroup string
consumerCount float64
replicaCount float64
}
Expand Down Expand Up @@ -334,8 +335,8 @@ func (sc *StatzCollector) buildDescs() {
sc.descs.accJetstreamTieredMemoryReserved = newPromDesc("account_jetstream_tiered_memory_reserved", "The number of bytes reserved by JetStream memory tier", append(accLabel, "tier"))
sc.descs.accJetstreamTieredStorageReserved = newPromDesc("account_jetstream_tiered_storage_reserved", "The number of bytes reserved by JetStream storage tier", append(accLabel, "tier"))
sc.descs.accJetstreamStreamCount = newPromDesc("account_jetstream_stream_count", "The number of streams in this account", accLabel)
sc.descs.accJetstreamConsumerCount = newPromDesc("account_jetstream_consumer_count", "The number of consumers per stream for this account", append(accLabel, "stream"))
sc.descs.accJetstreamReplicaCount = newPromDesc("account_jetstream_replica_count", "The number of replicas per stream for this account", append(accLabel, "stream"))
sc.descs.accJetstreamConsumerCount = newPromDesc("account_jetstream_consumer_count", "The number of consumers per stream for this account", append(accLabel, "stream", "raft_group"))
sc.descs.accJetstreamReplicaCount = newPromDesc("account_jetstream_replica_count", "The number of replicas per stream for this account", append(accLabel, "stream", "raft_group"))
}

// Surveyor
Expand Down Expand Up @@ -600,6 +601,7 @@ func (sc *StatzCollector) pollAccountInfo() error {
for _, stream := range jsInfo.Streams {
sts.jetstreamStreams = append(sts.jetstreamStreams, streamAccountStats{
streamName: stream.Name,
raftGroup: stream.RaftGroup,
consumerCount: float64(len(stream.Consumer)),
replicaCount: float64(stream.Config.Replicas),
})
Expand Down Expand Up @@ -643,10 +645,11 @@ func (sc *StatzCollector) pollAccountInfo() error {

func (sc *StatzCollector) getJSInfos(nc *nats.Conn) map[string]*server.AccountDetail {
opts := server.JSzOptions{
Accounts: true,
Streams: true,
Consumer: true,
Config: true,
Accounts: true,
Streams: true,
Consumer: true,
Config: true,
RaftGroups: true,
}
res := make([]*server.JSInfo, 0)
req, err := json.Marshal(opts)
Expand Down Expand Up @@ -1087,8 +1090,8 @@ func (sc *StatzCollector) Collect(ch chan<- prometheus.Metric) {

metrics.newGaugeMetric(sc.descs.accJetstreamStreamCount, stat.jetstreamStreamCount, accLabels)
for _, streamStat := range stat.jetstreamStreams {
metrics.newGaugeMetric(sc.descs.accJetstreamConsumerCount, streamStat.consumerCount, append(accLabels, streamStat.streamName))
metrics.newGaugeMetric(sc.descs.accJetstreamReplicaCount, streamStat.replicaCount, append(accLabels, streamStat.streamName))
metrics.newGaugeMetric(sc.descs.accJetstreamConsumerCount, streamStat.consumerCount, append(accLabels, streamStat.streamName, streamStat.raftGroup))
metrics.newGaugeMetric(sc.descs.accJetstreamReplicaCount, streamStat.replicaCount, append(accLabels, streamStat.streamName, streamStat.raftGroup))
}
}
}
Expand Down

0 comments on commit acb4e96

Please sign in to comment.