diff --git a/surveyor/collector_statz.go b/surveyor/collector_statz.go index c1168a8..28acfa9 100644 --- a/surveyor/collector_statz.go +++ b/surveyor/collector_statz.go @@ -34,6 +34,9 @@ import ( "golang.org/x/sync/singleflight" ) +// skip reporting account on a server after this many subsequent polls with 0 conns +const accStatZeroConnSkip = 3 + // statzDescs holds the metric descriptions type statzDescs struct { Info *prometheus.Desc @@ -98,8 +101,10 @@ type statzDescs struct { // Account scope metrics accCount *prometheus.Desc accConnCount *prometheus.Desc + accTotalConnCount *prometheus.Desc accLeafCount *prometheus.Desc accSubCount *prometheus.Desc + accSlowConsumerCount *prometheus.Desc accBytesSent *prometheus.Desc accBytesRecv *prometheus.Desc accMsgsSent *prometheus.Desc @@ -128,7 +133,7 @@ type StatzCollector struct { uptime time.Duration stats []*server.ServerStatsMsg statsChan chan *server.ServerStatsMsg - accStats []accountStats + accStats []*accountStats rtts map[string]time.Duration pollTimeout time.Duration reply string @@ -140,6 +145,7 @@ type StatzCollector struct { doneCh chan struct{} descs statzDescs collectAccounts bool + accStatZeroConn map[string]int natsUp *prometheus.Desc routeIDRemap map[string]map[uint64]int gatewayIDRemap map[string]map[uint64]int @@ -163,14 +169,7 @@ type StatzCollector struct { type accountStats struct { accountID string - connCount float64 - subCount float64 - leafCount float64 - - bytesSent float64 - bytesRecv float64 - msgsSent float64 - msgsRecv float64 + stats []*accStat jetstreamEnabled float64 jetstreamMemoryUsed float64 @@ -191,12 +190,12 @@ type streamAccountStats struct { replicaCount float64 } -func serverName(sm *server.ServerStatsMsg) string { - if sm.Server.Name == "" { - return sm.Server.ID +func serverName(sm *server.ServerInfo) string { + if sm.Name == "" { + return sm.ID } - return sm.Server.Name + return sm.Name } func jsDomainLabelValue(sm *server.ServerStatsMsg) string { @@ -216,15 +215,15 @@ func jetstreamInfoLabelValues(sm *server.ServerStatsMsg) []string { } } -func (sc *StatzCollector) serverLabelValues(sm *server.ServerStatsMsg) []string { - return []string{sm.Server.Cluster, serverName(sm), sm.Server.ID} +func (sc *StatzCollector) serverLabelValues(sm *server.ServerInfo) []string { + return []string{sm.Cluster, serverName(sm), sm.ID} } -func (sc *StatzCollector) serverInfoLabelValues(sm *server.ServerStatsMsg) []string { - return []string{sm.Server.Cluster, serverName(sm), sm.Server.ID, sm.Server.Version} +func (sc *StatzCollector) serverInfoLabelValues(sm *server.ServerInfo) []string { + return []string{sm.Cluster, serverName(sm), sm.ID, sm.Version} } -func (sc *StatzCollector) routeLabelValues(sm *server.ServerStatsMsg, rStat *server.RouteStat) []string { +func (sc *StatzCollector) routeLabelValues(sm *server.ServerInfo, rStat *server.RouteStat) []string { idxS := strconv.FormatUint(rStat.ID, 10) if byName, ok := sc.routeIDRemap[rStat.Name]; ok { if idx, ok := byName[rStat.ID]; ok { @@ -232,10 +231,10 @@ func (sc *StatzCollector) routeLabelValues(sm *server.ServerStatsMsg, rStat *ser } } - return []string{sm.Server.Cluster, serverName(sm), sm.Server.ID, rStat.Name, idxS} + return []string{sm.Cluster, serverName(sm), sm.ID, rStat.Name, idxS} } -func (sc *StatzCollector) gatewayLabelValues(sm *server.ServerStatsMsg, gStat *server.GatewayStat) []string { +func (sc *StatzCollector) gatewayLabelValues(sm *server.ServerInfo, gStat *server.GatewayStat) []string { idxS := strconv.FormatUint(gStat.ID, 10) if byName, ok := sc.gatewayIDRemap[gStat.Name]; ok { if idx, ok := byName[gStat.ID]; ok { @@ -243,7 +242,7 @@ func (sc *StatzCollector) gatewayLabelValues(sm *server.ServerStatsMsg, gStat *s } } - return []string{sm.Server.Cluster, serverName(sm), sm.Server.ID, gStat.Name, idxS} + return []string{sm.Cluster, serverName(sm), sm.ID, gStat.Name, idxS} } // Up/Down on servers - look at discovery mechanisms in Prometheus - aging out, how does it work? @@ -264,30 +263,30 @@ func (sc *StatzCollector) buildDescs() { sc.descs.Cores = newPromDesc("core_count", "Machine cores gauge", sc.serverLabels) sc.descs.CPU = newPromDesc("cpu_percentage", "Server cpu utilization gauge", sc.serverLabels) sc.descs.Connections = newPromDesc("connection_count", "Current number of client connections gauge", sc.serverLabels) - sc.descs.TotalConnections = newPromDesc("total_connection_count", "Total number of client connections serviced gauge", sc.serverLabels) + sc.descs.TotalConnections = newPromDesc("total_connection_count", "Total number of client connections serviced counter", sc.serverLabels) sc.descs.ActiveAccounts = newPromDesc("active_account_count", "Number of active accounts gauge", sc.serverLabels) sc.descs.NumSubs = newPromDesc("subs_count", "Current number of subscriptions gauge", sc.serverLabels) - sc.descs.SentMsgs = newPromDesc("sent_msgs_count", "Number of messages sent gauge", sc.serverLabels) - sc.descs.SentBytes = newPromDesc("sent_bytes", "Number of messages sent gauge", sc.serverLabels) - sc.descs.RecvMsgs = newPromDesc("recv_msgs_count", "Number of messages received gauge", sc.serverLabels) - sc.descs.RecvBytes = newPromDesc("recv_bytes", "Number of messages received gauge", sc.serverLabels) + sc.descs.SentMsgs = newPromDesc("sent_msgs_count", "Number of messages sent counter", sc.serverLabels) + sc.descs.SentBytes = newPromDesc("sent_bytes", "Number of bytes sent counter", sc.serverLabels) + sc.descs.RecvMsgs = newPromDesc("recv_msgs_count", "Number of messages received counter", sc.serverLabels) + sc.descs.RecvBytes = newPromDesc("recv_bytes", "Number of bytes received counter", sc.serverLabels) sc.descs.SlowConsumers = newPromDesc("slow_consumer_count", "Number of slow consumers gauge", sc.serverLabels) sc.descs.RTT = newPromDesc("rtt_nanoseconds", "RTT in nanoseconds gauge", sc.serverLabels) sc.descs.Routes = newPromDesc("route_count", "Number of active routes gauge", sc.serverLabels) sc.descs.Gateways = newPromDesc("gateway_count", "Number of active gateways gauge", sc.serverLabels) // Routes - sc.descs.RouteSentMsgs = newPromDesc("route_sent_msg_count", "Number of messages sent over the route gauge", sc.routeLabels) - sc.descs.RouteSentBytes = newPromDesc("route_sent_bytes", "Number of bytes sent over the route gauge", sc.routeLabels) - sc.descs.RouteRecvMsgs = newPromDesc("route_recv_msg_count", "Number of messages received over the route gauge", sc.routeLabels) - sc.descs.RouteRecvBytes = newPromDesc("route_recv_bytes", "Number of bytes received over the route gauge", sc.routeLabels) + sc.descs.RouteSentMsgs = newPromDesc("route_sent_msg_count", "Number of messages sent over the route counter", sc.routeLabels) + sc.descs.RouteSentBytes = newPromDesc("route_sent_bytes", "Number of bytes sent over the route counter", sc.routeLabels) + sc.descs.RouteRecvMsgs = newPromDesc("route_recv_msg_count", "Number of messages received over the route counter", sc.routeLabels) + sc.descs.RouteRecvBytes = newPromDesc("route_recv_bytes", "Number of bytes received over the route counter", sc.routeLabels) sc.descs.RoutePending = newPromDesc("route_pending_bytes", "Number of bytes pending in the route gauge", sc.routeLabels) // Gateways - sc.descs.GatewaySentMsgs = newPromDesc("gateway_sent_msgs_count", "Number of messages sent over the gateway gauge", sc.gatewayLabels) - sc.descs.GatewaySentBytes = newPromDesc("gateway_sent_bytes", "Number of messages sent over the gateway gauge", sc.gatewayLabels) - sc.descs.GatewayRecvMsgs = newPromDesc("gateway_recv_msg_count", "Number of messages sent over the gateway gauge", sc.gatewayLabels) - sc.descs.GatewayRecvBytes = newPromDesc("gateway_recv_bytes", "Number of messages sent over the gateway gauge", sc.gatewayLabels) + sc.descs.GatewaySentMsgs = newPromDesc("gateway_sent_msgs_count", "Number of messages sent over the gateway counter", sc.gatewayLabels) + sc.descs.GatewaySentBytes = newPromDesc("gateway_sent_bytes", "Number of messages sent over the gateway counter", sc.gatewayLabels) + sc.descs.GatewayRecvMsgs = newPromDesc("gateway_recv_msg_count", "Number of messages sent over the gateway counter", sc.gatewayLabels) + sc.descs.GatewayRecvBytes = newPromDesc("gateway_recv_bytes", "Number of messages sent over the gateway counter", sc.gatewayLabels) sc.descs.GatewayNumInbound = newPromDesc("gateway_inbound_msg_count", "Number inbound messages through the gateway gauge", sc.gatewayLabels) // Jetstream Info @@ -326,17 +325,21 @@ func (sc *StatzCollector) buildDescs() { // Account scope metrics if sc.collectAccounts { accLabel := []string{"account"} + serverAndAccLabel := append(sc.serverLabels, accLabel...) sc.descs.accCount = newPromDesc("account_count", "The number of accounts detected", nil) - sc.descs.accConnCount = newPromDesc("account_conn_count", "The number of client connections to this account", accLabel) - sc.descs.accLeafCount = newPromDesc("account_leaf_count", "The number of leafnode connections to this account", accLabel) - sc.descs.accSubCount = newPromDesc("account_sub_count", "The number of subscriptions on this account", accLabel) - - sc.descs.accBytesSent = newPromDesc("account_bytes_sent", "The number of bytes sent on this account", accLabel) - sc.descs.accBytesRecv = newPromDesc("account_bytes_recv", "The number of bytes received on this account", accLabel) - sc.descs.accMsgsSent = newPromDesc("account_msgs_sent", "The number of messages sent on this account", accLabel) - sc.descs.accMsgsRecv = newPromDesc("account_msgs_recv", "The number of messages received on this account", accLabel) - + // Metrics reported per-server + sc.descs.accConnCount = newPromDesc("account_conn_count", "The number of client connections to this account", serverAndAccLabel) + sc.descs.accTotalConnCount = newPromDesc("account_total_conn_count", "Total number of client connections serviced for this account", serverAndAccLabel) + sc.descs.accLeafCount = newPromDesc("account_leaf_count", "The number of leafnode connections to this account", serverAndAccLabel) + sc.descs.accSubCount = newPromDesc("account_sub_count", "The number of subscriptions on this account", serverAndAccLabel) + sc.descs.accSlowConsumerCount = newPromDesc("account_slow_consumer_count", "The number of slow consumers detected in this account", serverAndAccLabel) + sc.descs.accBytesSent = newPromDesc("account_bytes_sent", "The number of bytes sent on this account", serverAndAccLabel) + sc.descs.accBytesRecv = newPromDesc("account_bytes_recv", "The number of bytes received on this account", serverAndAccLabel) + sc.descs.accMsgsSent = newPromDesc("account_msgs_sent", "The number of messages sent on this account", serverAndAccLabel) + sc.descs.accMsgsRecv = newPromDesc("account_msgs_recv", "The number of messages received on this account", serverAndAccLabel) + + // Aggregated metrics sc.descs.accJetstreamEnabled = newPromDesc("account_jetstream_enabled", "Whether JetStream is enabled or not for this account", accLabel) sc.descs.accJetstreamMemoryUsed = newPromDesc("account_jetstream_memory_used", "The number of bytes used by JetStream memory", accLabel) sc.descs.accJetstreamStorageUsed = newPromDesc("account_jetstream_storage_used", "The number of bytes used by JetStream storage", accLabel) @@ -401,6 +404,7 @@ func NewStatzCollector(nc *nats.Conn, logger *logrus.Logger, numServers int, ser servers: make(map[string]bool), doneCh: make(chan struct{}, 1), collectAccounts: accounts, + accStatZeroConn: make(map[string]int), routeIDRemap: make(map[string]map[uint64]int), gatewayIDRemap: make(map[string]map[uint64]int), @@ -442,10 +446,10 @@ func (sc *StatzCollector) handleResponse(msg *nats.Msg) { } sc.rtts[m.Server.ID] = rtt } else if !isCurrent { - sc.logger.Infof("Late reply for server [%15s : %15s : %s]: %v", m.Server.Cluster, serverName(m), m.Server.ID, rtt) + sc.logger.Infof("Late reply for server [%15s : %15s : %s]: %v", m.Server.Cluster, serverName(&m.Server), m.Server.ID, rtt) sc.lateReplies.WithLabelValues(fmt.Sprintf("%.1f", sc.pollTimeout.Seconds())).Inc() } else { - sc.logger.Infof("Extra reply from server [%15s : %15s : %s]: %v", m.Server.Cluster, serverName(m), m.Server.ID, rtt) + sc.logger.Infof("Extra reply from server [%15s : %15s : %s]: %v", m.Server.Cluster, serverName(&m.Server), m.Server.ID, rtt) } } @@ -526,8 +530,8 @@ func (sc *StatzCollector) poll() error { // If we do not see expected number of servers complain. if sc.numServers != -1 && ns != sc.numServers { sort.Slice(stats, func(i, j int) bool { - a := fmt.Sprintf("%s-%s", stats[i].Server.Cluster, serverName(stats[i])) - b := fmt.Sprintf("%s-%s", stats[j].Server.Cluster, serverName(stats[j])) + a := fmt.Sprintf("%s-%s", stats[i].Server.Cluster, serverName(&stats[i].Server)) + b := fmt.Sprintf("%s-%s", stats[j].Server.Cluster, serverName(&stats[j].Server)) return a < b }) @@ -542,7 +546,7 @@ func (sc *StatzCollector) poll() error { key := fmt.Sprintf("%s:%s", stat.Server.Cluster, stat.Server.ID) // Mark this server has been seen sc.servers[key] = true - sc.logger.Debugf("Server [%15s : %15s : %15s : %s]: %v", stat.Server.Cluster, serverName(stat), stat.Server.Host, stat.Server.ID, rtts[stat.Server.ID]) + sc.logger.Debugf("Server [%15s : %15s : %15s : %s]: %v", stat.Server.Cluster, serverName(&stat.Server), stat.Server.Host, stat.Server.ID, rtts[stat.Server.ID]) } sc.logger.Debugln("Missing servers:") @@ -581,20 +585,16 @@ func (sc *StatzCollector) pollAccountInfo() error { return err } - accStats := make(map[string]accountStats, len(accs)) - for accID, acc := range accs { - sts := accountStats{accountID: accID} - - sts.leafCount = float64(acc.LeafNodes) - sts.subCount = float64(acc.NumSubs) - sts.connCount = float64(acc.Conns) - sts.bytesSent = float64(acc.Sent.Bytes) - sts.bytesRecv = float64(acc.Received.Bytes) - sts.msgsSent = float64(acc.Sent.Msgs) - sts.msgsRecv = float64(acc.Received.Msgs) + accStats := make(map[string]*accountStats, len(accs)) + for accID, stats := range accs { + sts := &accountStats{ + accountID: accID, + stats: stats, + } - accStats[acc.Account] = sts + accStats[accID] = sts } + jsInfos := sc.getJSInfos(nc) for accID, jsInfo := range jsInfos { sts, ok := accStats[accID] @@ -647,7 +647,7 @@ func (sc *StatzCollector) pollAccountInfo() error { } sc.Lock() - sc.accStats = make([]accountStats, 0, len(accStats)) + sc.accStats = make([]*accountStats, 0, len(accStats)) for _, acc := range accStats { sc.accStats = append(sc.accStats, acc) } @@ -712,7 +712,17 @@ func (sc *StatzCollector) getJSInfos(nc *nats.Conn) map[string]*server.AccountDe return jsAccInfos } -func (sc *StatzCollector) getAccStatz(nc *nats.Conn) (map[string]*server.AccountStat, error) { +type accStatz struct { + server.ServerAPIResponse + Data server.AccountStatz `json:"data,omitempty"` +} + +type accStat struct { + Server *server.ServerInfo + Data *server.AccountStat +} + +func (sc *StatzCollector) getAccStatz(nc *nats.Conn) (map[string][]*accStat, error) { req := &server.AccountStatzOptions{ IncludeUnused: true, } @@ -720,7 +730,7 @@ func (sc *StatzCollector) getAccStatz(nc *nats.Conn) (map[string]*server.Account if err != nil { return nil, err } - res := make([]*server.AccountStatz, 0) + res := make([]*accStatz, 0) const subj = "$SYS.REQ.ACCOUNT.PING.STATZ" msgs, err := requestMany(nc, sc, subj, reqJSON, true) @@ -729,40 +739,55 @@ func (sc *StatzCollector) getAccStatz(nc *nats.Conn) (map[string]*server.Account } for _, msg := range msgs { - var r server.ServerAPIResponse - var d server.AccountStatz - - r.Data = &d - if err := unmarshalMsg(msg, &r); err != nil { + var a accStatz + if err = unmarshalMsg(msg, &a); err != nil { sc.logger.Warnf("Error deserializing account stats: %s", err.Error()) continue } - if r.Error != nil { - sc.logger.Warnf("Error in Account stats response: %s", r.Error.Error()) + if a.Error != nil { + sc.logger.Warnf("Error in Account stats response: %s", a.Error.Error()) continue } - res = append(res, &d) + res = append(res, &a) if sc.numServers != -1 && len(res) == sc.numServers { break } } - accStatz := make(map[string]*server.AccountStat) + accStats := make(map[string][]*accStat) for _, statz := range res { - for _, acc := range statz.Accounts { - accInfo, ok := accStatz[acc.Account] - if !ok { - accStatz[acc.Account] = acc + for _, acc := range statz.Data.Accounts { + // always skip if account has never connected + if acc.TotalConns == 0 { continue } - mergeAccountStats(accInfo, acc) - accStatz[acc.Account] = acc + + // optimization to stop reporting a server/account pair + // when a server is continuously reporting 0 conns for that account + zeroConnKey := statz.Server.ID + ":" + acc.Account + if acc.Conns == 0 { + count := sc.accStatZeroConn[zeroConnKey] + if count >= accStatZeroConnSkip { + // at limit for continuous polls with 0 conns + continue + } + + sc.accStatZeroConn[zeroConnKey] = count + 1 + } else { + sc.accStatZeroConn[zeroConnKey] = 0 + } + + accInfo := accStats[acc.Account] + accStats[acc.Account] = append(accInfo, &accStat{ + Server: statz.Server, + Data: acc, + }) } } - return accStatz, nil + return accStats, nil } func mergeStreamDetails(from, to *server.AccountDetail) { @@ -791,18 +816,6 @@ Outer: } } -func mergeAccountStats(from, to *server.AccountStat) { - to.Conns += from.Conns - to.LeafNodes += from.LeafNodes - to.TotalConns += from.TotalConns - to.NumSubs += from.NumSubs - to.Sent.Msgs += from.Sent.Msgs - to.Sent.Bytes += from.Sent.Bytes - to.Received.Msgs += from.Received.Msgs - to.Received.Bytes += from.Received.Bytes - to.SlowConsumers += from.SlowConsumers -} - // Describe is the Prometheus interface to describe metrics for // the prometheus system func (sc *StatzCollector) Describe(ch chan<- *prometheus.Desc) { @@ -870,8 +883,10 @@ func (sc *StatzCollector) Describe(ch chan<- *prometheus.Desc) { if sc.collectAccounts { ch <- sc.descs.accCount ch <- sc.descs.accConnCount + ch <- sc.descs.accTotalConnCount ch <- sc.descs.accLeafCount ch <- sc.descs.accSubCount + ch <- sc.descs.accSlowConsumerCount ch <- sc.descs.accBytesSent ch <- sc.descs.accBytesRecv ch <- sc.descs.accMsgsSent @@ -933,7 +948,7 @@ func (sc *StatzCollector) Collect(ch chan<- prometheus.Metric) { if err := sc.poll(); err != nil { sc.logger.Warnf("Error polling NATS server: %v", err) sc.pollErrCnt.WithLabelValues().Inc() - metrics.newCounterMetric(sc.natsUp, 0, nil) + metrics.newGaugeMetric(sc.natsUp, 0, nil) return metrics.metrics, nil } @@ -941,28 +956,28 @@ func (sc *StatzCollector) Collect(ch chan<- prometheus.Metric) { sc.Lock() defer sc.Unlock() - metrics.newCounterMetric(sc.natsUp, 1, nil) + metrics.newGaugeMetric(sc.natsUp, 1, nil) sc.surveyedCnt.WithLabelValues().Set(0) for _, sm := range sc.stats { sc.surveyedCnt.WithLabelValues().Inc() - metrics.newGaugeMetric(sc.descs.Info, 1, sc.serverInfoLabelValues(sm)) + metrics.newGaugeMetric(sc.descs.Info, 1, sc.serverInfoLabelValues(&sm.Server)) - labels := sc.serverLabelValues(sm) + labels := sc.serverLabelValues(&sm.Server) metrics.newGaugeMetric(sc.descs.Start, float64(sm.Stats.Start.UnixNano()), labels) metrics.newGaugeMetric(sc.descs.Uptime, time.Since(sm.Stats.Start).Seconds(), labels) metrics.newGaugeMetric(sc.descs.Mem, float64(sm.Stats.Mem), labels) metrics.newGaugeMetric(sc.descs.Cores, float64(sm.Stats.Cores), labels) metrics.newGaugeMetric(sc.descs.CPU, sm.Stats.CPU, labels) metrics.newGaugeMetric(sc.descs.Connections, float64(sm.Stats.Connections), labels) - metrics.newGaugeMetric(sc.descs.TotalConnections, float64(sm.Stats.TotalConnections), labels) + metrics.newCounterMetric(sc.descs.TotalConnections, float64(sm.Stats.TotalConnections), labels) metrics.newGaugeMetric(sc.descs.ActiveAccounts, float64(sm.Stats.ActiveAccounts), labels) metrics.newGaugeMetric(sc.descs.NumSubs, float64(sm.Stats.NumSubs), labels) - metrics.newGaugeMetric(sc.descs.SentMsgs, float64(sm.Stats.Sent.Msgs), labels) - metrics.newGaugeMetric(sc.descs.SentBytes, float64(sm.Stats.Sent.Bytes), labels) - metrics.newGaugeMetric(sc.descs.RecvMsgs, float64(sm.Stats.Received.Msgs), labels) - metrics.newGaugeMetric(sc.descs.RecvBytes, float64(sm.Stats.Received.Bytes), labels) + metrics.newCounterMetric(sc.descs.SentMsgs, float64(sm.Stats.Sent.Msgs), labels) + metrics.newCounterMetric(sc.descs.SentBytes, float64(sm.Stats.Sent.Bytes), labels) + metrics.newCounterMetric(sc.descs.RecvMsgs, float64(sm.Stats.Received.Msgs), labels) + metrics.newCounterMetric(sc.descs.RecvBytes, float64(sm.Stats.Received.Bytes), labels) metrics.newGaugeMetric(sc.descs.SlowConsumers, float64(sm.Stats.SlowConsumers), labels) metrics.newGaugeMetric(sc.descs.RTT, float64(sc.rtts[sm.Server.ID]), labels) metrics.newGaugeMetric(sc.descs.Routes, float64(len(sm.Stats.Routes)), labels) @@ -989,19 +1004,18 @@ func (sc *StatzCollector) Collect(ch chan<- prometheus.Metric) { metrics.newGaugeMetric(sc.descs.JetstreamMemstoreReservedBytes, float64(sm.Stats.JetStream.Stats.ReservedMemory), lblServerID) metrics.newGaugeMetric(sc.descs.JetstreamAccounts, float64(sm.Stats.JetStream.Stats.Accounts), lblServerID) metrics.newGaugeMetric(sc.descs.JetstreamHAAssets, float64(sm.Stats.JetStream.Stats.HAAssets), lblServerID) - // NIT: Technically these should be Counters, not Gauges. // At present, Total does not include Errors. Keeping them separate - metrics.newGaugeMetric(sc.descs.JetstreamAPIRequests, float64(sm.Stats.JetStream.Stats.API.Total), lblServerID) - metrics.newGaugeMetric(sc.descs.JetstreamAPIErrors, float64(sm.Stats.JetStream.Stats.API.Errors), lblServerID) + metrics.newCounterMetric(sc.descs.JetstreamAPIRequests, float64(sm.Stats.JetStream.Stats.API.Total), lblServerID) + metrics.newCounterMetric(sc.descs.JetstreamAPIErrors, float64(sm.Stats.JetStream.Stats.API.Errors), lblServerID) } if sm.Stats.JetStream.Meta == nil { - metrics.newGaugeMetric(sc.descs.JetstreamClusterRaftGroupInfo, float64(0), []string{"", "", sm.Server.ID, serverName(sm), "", ""}) + metrics.newGaugeMetric(sc.descs.JetstreamClusterRaftGroupInfo, float64(0), []string{"", "", sm.Server.ID, serverName(&sm.Server), "", ""}) } else { - jsRaftGroupInfoLabelValues := []string{jsDomainLabelValue(sm), "_meta_", sm.Server.ID, serverName(sm), sm.Stats.JetStream.Meta.Name, sm.Stats.JetStream.Meta.Leader} + jsRaftGroupInfoLabelValues := []string{jsDomainLabelValue(sm), "_meta_", sm.Server.ID, serverName(&sm.Server), sm.Stats.JetStream.Meta.Name, sm.Stats.JetStream.Meta.Leader} metrics.newGaugeMetric(sc.descs.JetstreamClusterRaftGroupInfo, float64(1), jsRaftGroupInfoLabelValues) - jsRaftGroupLabelValues := []string{sm.Server.ID, serverName(sm), sm.Server.Cluster} + jsRaftGroupLabelValues := []string{sm.Server.ID, serverName(&sm.Server), sm.Server.Cluster} // FIXME: add labels needed or remove... metrics.newGaugeMetric(sc.descs.JetstreamClusterRaftGroupSize, float64(sm.Stats.JetStream.Meta.Size), jsRaftGroupLabelValues) @@ -1018,7 +1032,7 @@ func (sc *StatzCollector) Collect(ch chan<- prometheus.Metric) { if jsr == nil { continue } - jsClusterReplicaLabelValues := []string{sm.Server.ID, serverName(sm), jsr.Name, sm.Server.Cluster} + jsClusterReplicaLabelValues := []string{sm.Server.ID, serverName(&sm.Server), jsr.Name, sm.Server.Cluster} metrics.newGaugeMetric(sc.descs.JetstreamClusterRaftGroupReplicaActive, float64(jsr.Active), jsClusterReplicaLabelValues) if jsr.Current { metrics.newGaugeMetric(sc.descs.JetstreamClusterRaftGroupReplicaCurrent, float64(1), jsClusterReplicaLabelValues) @@ -1042,11 +1056,11 @@ func (sc *StatzCollector) Collect(ch chan<- prometheus.Metric) { sc.routeIDRemap = remapIDToIdx(pairs, sc.routeIDRemap) for _, rs := range sm.Stats.Routes { - labels = sc.routeLabelValues(sm, rs) - metrics.newGaugeMetric(sc.descs.RouteSentMsgs, float64(rs.Sent.Msgs), labels) - metrics.newGaugeMetric(sc.descs.RouteSentBytes, float64(rs.Sent.Bytes), labels) - metrics.newGaugeMetric(sc.descs.RouteRecvMsgs, float64(rs.Received.Msgs), labels) - metrics.newGaugeMetric(sc.descs.RouteRecvBytes, float64(rs.Received.Bytes), labels) + labels = sc.routeLabelValues(&sm.Server, rs) + metrics.newCounterMetric(sc.descs.RouteSentMsgs, float64(rs.Sent.Msgs), labels) + metrics.newCounterMetric(sc.descs.RouteSentBytes, float64(rs.Sent.Bytes), labels) + metrics.newCounterMetric(sc.descs.RouteRecvMsgs, float64(rs.Received.Msgs), labels) + metrics.newCounterMetric(sc.descs.RouteRecvBytes, float64(rs.Received.Bytes), labels) metrics.newGaugeMetric(sc.descs.RoutePending, float64(rs.Pending), labels) } @@ -1058,11 +1072,11 @@ func (sc *StatzCollector) Collect(ch chan<- prometheus.Metric) { sc.gatewayIDRemap = remapIDToIdx(pairs, sc.gatewayIDRemap) for _, gw := range sm.Stats.Gateways { - labels = sc.gatewayLabelValues(sm, gw) - metrics.newGaugeMetric(sc.descs.GatewaySentMsgs, float64(gw.Sent.Msgs), labels) - metrics.newGaugeMetric(sc.descs.GatewaySentBytes, float64(gw.Sent.Bytes), labels) - metrics.newGaugeMetric(sc.descs.GatewayRecvMsgs, float64(gw.Received.Msgs), labels) - metrics.newGaugeMetric(sc.descs.GatewayRecvBytes, float64(gw.Received.Bytes), labels) + labels = sc.gatewayLabelValues(&sm.Server, gw) + metrics.newCounterMetric(sc.descs.GatewaySentMsgs, float64(gw.Sent.Msgs), labels) + metrics.newCounterMetric(sc.descs.GatewaySentBytes, float64(gw.Sent.Bytes), labels) + metrics.newCounterMetric(sc.descs.GatewayRecvMsgs, float64(gw.Received.Msgs), labels) + metrics.newCounterMetric(sc.descs.GatewayRecvBytes, float64(gw.Received.Bytes), labels) metrics.newGaugeMetric(sc.descs.GatewayNumInbound, float64(gw.NumInbound), labels) } } @@ -1071,39 +1085,42 @@ func (sc *StatzCollector) Collect(ch chan<- prometheus.Metric) { if sc.collectAccounts { metrics.newGaugeMetric(sc.descs.accCount, float64(len(sc.accStats)), nil) for _, stat := range sc.accStats { - id := []string{stat.accountID} - - metrics.newGaugeMetric(sc.descs.accConnCount, stat.connCount, id) - metrics.newGaugeMetric(sc.descs.accLeafCount, stat.leafCount, id) - metrics.newGaugeMetric(sc.descs.accSubCount, stat.subCount, id) - - metrics.newCounterMetric(sc.descs.accBytesSent, stat.bytesSent, id) - metrics.newCounterMetric(sc.descs.accBytesRecv, stat.bytesRecv, id) - metrics.newCounterMetric(sc.descs.accMsgsSent, stat.msgsSent, id) - metrics.newCounterMetric(sc.descs.accMsgsRecv, stat.msgsRecv, id) - - metrics.newGaugeMetric(sc.descs.accJetstreamEnabled, stat.jetstreamEnabled, id) - metrics.newGaugeMetric(sc.descs.accJetstreamMemoryUsed, stat.jetstreamMemoryUsed, id) - metrics.newGaugeMetric(sc.descs.accJetstreamStorageUsed, stat.jetstreamStorageUsed, id) - metrics.newGaugeMetric(sc.descs.accJetstreamMemoryReserved, stat.jetstreamMemoryReserved, id) - metrics.newGaugeMetric(sc.descs.accJetstreamStorageReserved, stat.jetstreamStorageReserved, id) + accLabels := []string{stat.accountID} + for _, as := range stat.stats { + serverAndAccLabels := append(sc.serverLabelValues(as.Server), accLabels...) + metrics.newGaugeMetric(sc.descs.accConnCount, float64(as.Data.Conns), serverAndAccLabels) + metrics.newCounterMetric(sc.descs.accTotalConnCount, float64(as.Data.TotalConns), serverAndAccLabels) + metrics.newGaugeMetric(sc.descs.accLeafCount, float64(as.Data.LeafNodes), serverAndAccLabels) + metrics.newGaugeMetric(sc.descs.accSubCount, float64(as.Data.NumSubs), serverAndAccLabels) + metrics.newGaugeMetric(sc.descs.accSlowConsumerCount, float64(as.Data.SlowConsumers), serverAndAccLabels) + metrics.newCounterMetric(sc.descs.accBytesSent, float64(as.Data.Sent.Bytes), serverAndAccLabels) + metrics.newCounterMetric(sc.descs.accBytesRecv, float64(as.Data.Received.Bytes), serverAndAccLabels) + metrics.newCounterMetric(sc.descs.accMsgsSent, float64(as.Data.Sent.Msgs), serverAndAccLabels) + metrics.newCounterMetric(sc.descs.accMsgsRecv, float64(as.Data.Sent.Bytes), serverAndAccLabels) + } + + metrics.newGaugeMetric(sc.descs.accJetstreamEnabled, stat.jetstreamEnabled, accLabels) + metrics.newGaugeMetric(sc.descs.accJetstreamMemoryUsed, stat.jetstreamMemoryUsed, accLabels) + metrics.newGaugeMetric(sc.descs.accJetstreamStorageUsed, stat.jetstreamStorageUsed, accLabels) + metrics.newGaugeMetric(sc.descs.accJetstreamMemoryReserved, stat.jetstreamMemoryReserved, accLabels) + metrics.newGaugeMetric(sc.descs.accJetstreamStorageReserved, stat.jetstreamStorageReserved, accLabels) for tier, size := range stat.jetstreamTieredMemoryUsed { - metrics.newGaugeMetric(sc.descs.accJetstreamTieredMemoryUsed, size, append(id, fmt.Sprintf("R%d", tier))) + metrics.newGaugeMetric(sc.descs.accJetstreamTieredMemoryUsed, size, append(accLabels, fmt.Sprintf("R%d", tier))) } for tier, size := range stat.jetstreamTieredStorageUsed { - metrics.newGaugeMetric(sc.descs.accJetstreamTieredStorageUsed, size, append(id, fmt.Sprintf("R%d", tier))) + metrics.newGaugeMetric(sc.descs.accJetstreamTieredStorageUsed, size, append(accLabels, fmt.Sprintf("R%d", tier))) } for tier, size := range stat.jetstreamTieredMemoryReserved { - metrics.newGaugeMetric(sc.descs.accJetstreamTieredMemoryReserved, size, append(id, fmt.Sprintf("R%d", tier))) + metrics.newGaugeMetric(sc.descs.accJetstreamTieredMemoryReserved, size, append(accLabels, fmt.Sprintf("R%d", tier))) } for tier, size := range stat.jetstreamTieredStorageReserved { - metrics.newGaugeMetric(sc.descs.accJetstreamTieredStorageReserved, size, append(id, fmt.Sprintf("R%d", tier))) + metrics.newGaugeMetric(sc.descs.accJetstreamTieredStorageReserved, size, append(accLabels, fmt.Sprintf("R%d", tier))) } - metrics.newGaugeMetric(sc.descs.accJetstreamStreamCount, stat.jetstreamStreamCount, id) + metrics.newGaugeMetric(sc.descs.accJetstreamStreamCount, stat.jetstreamStreamCount, accLabels) for _, streamStat := range stat.jetstreamStreams { - metrics.newGaugeMetric(sc.descs.accJetstreamConsumerCount, streamStat.consumerCount, append(id, streamStat.streamName)) - metrics.newGaugeMetric(sc.descs.accJetstreamReplicaCount, streamStat.replicaCount, append(id, streamStat.streamName)) + metrics.newGaugeMetric(sc.descs.accJetstreamConsumerCount, streamStat.consumerCount, append(accLabels, streamStat.streamName)) + metrics.newGaugeMetric(sc.descs.accJetstreamReplicaCount, streamStat.replicaCount, append(accLabels, streamStat.streamName)) } } } diff --git a/surveyor/surveyor_test.go b/surveyor/surveyor_test.go index 5cfdcba..3235172 100644 --- a/surveyor/surveyor_test.go +++ b/surveyor/surveyor_test.go @@ -245,16 +245,18 @@ func TestSurveyor_Account(t *testing.T) { } want := []string{ - "nats_core_account_bytes_recv", - "nats_core_account_bytes_sent", - "nats_core_account_conn_count", "nats_core_account_count", - "nats_core_account_jetstream_enabled", - "nats_core_account_jetstream_stream_count", + "nats_core_account_conn_count", + "nats_core_account_total_conn_count", "nats_core_account_leaf_count", - "nats_core_account_msgs_recv", - "nats_core_account_msgs_sent", "nats_core_account_sub_count", + "nats_core_account_slow_consumer_count", + "nats_core_account_bytes_sent", + "nats_core_account_bytes_recv", + "nats_core_account_msgs_sent", + "nats_core_account_msgs_recv", + "nats_core_account_jetstream_enabled", + "nats_core_account_jetstream_stream_count", } for _, m := range want { if !strings.Contains(output, m) {