From 2b8b72e01f014fb1b34fb832ef7013cdb8e2182b Mon Sep 17 00:00:00 2001 From: Caleb Lloyd Date: Wed, 6 Mar 2024 17:38:25 -0500 Subject: [PATCH] use counters for msgs/bytes sent/recv Signed-off-by: Caleb Lloyd --- surveyor/collector_statz.go | 140 ++++++++++++++++++++++-------------- 1 file changed, 86 insertions(+), 54 deletions(-) diff --git a/surveyor/collector_statz.go b/surveyor/collector_statz.go index 3c14b92..0b987d1 100644 --- a/surveyor/collector_statz.go +++ b/surveyor/collector_statz.go @@ -96,6 +96,7 @@ type statzDescs struct { // Account scope metrics accCount *prometheus.Desc accConnCount *prometheus.Desc + accTotalConnCount *prometheus.Desc accLeafCount *prometheus.Desc accSubCount *prometheus.Desc accBytesSent *prometheus.Desc @@ -156,6 +157,14 @@ type StatzCollector struct { noReplies *prometheus.CounterVec } +type accountStatByServer struct { + totalConnCount float64 + bytesSent float64 + bytesRecv float64 + msgsSent float64 + msgsRecv float64 +} + type accountStats struct { accountID string @@ -163,10 +172,7 @@ type accountStats struct { subCount float64 leafCount float64 - bytesSent float64 - bytesRecv float64 - msgsSent float64 - msgsRecv float64 + byServer map[string]*accountStatByServer jetstreamEnabled float64 jetstreamMemoryUsed float64 @@ -237,7 +243,7 @@ func (sc *StatzCollector) buildDescs() { // A unlabelled description for the up/down sc.natsUp = prometheus.NewDesc(prometheus.BuildFQName("nats", "core", "nats_up"), - "1 if connected to NATS, 0 otherwise. A gauge.", nil, sc.constLabels) + "1 if connected to NATS, 0 otherwise. A counter.", nil, sc.constLabels) sc.descs.Info = newPromDesc("info", "General Server information Summary gauge", sc.serverInfoLabels) sc.descs.Start = newPromDesc("start_time", "Server start time gauge", sc.serverLabels) @@ -246,30 +252,30 @@ func (sc *StatzCollector) buildDescs() { sc.descs.Cores = newPromDesc("core_count", "Machine cores gauge", sc.serverLabels) sc.descs.CPU = newPromDesc("cpu_percentage", "Server cpu utilization gauge", sc.serverLabels) sc.descs.Connections = newPromDesc("connection_count", "Current number of client connections gauge", sc.serverLabels) - sc.descs.TotalConnections = newPromDesc("total_connection_count", "Total number of client connections serviced gauge", sc.serverLabels) + sc.descs.TotalConnections = newPromDesc("total_connection_count", "Total number of client connections serviced counter", sc.serverLabels) sc.descs.ActiveAccounts = newPromDesc("active_account_count", "Number of active accounts gauge", sc.serverLabels) sc.descs.NumSubs = newPromDesc("subs_count", "Current number of subscriptions gauge", sc.serverLabels) - sc.descs.SentMsgs = newPromDesc("sent_msgs_count", "Number of messages sent gauge", sc.serverLabels) - sc.descs.SentBytes = newPromDesc("sent_bytes", "Number of messages sent gauge", sc.serverLabels) - sc.descs.RecvMsgs = newPromDesc("recv_msgs_count", "Number of messages received gauge", sc.serverLabels) - sc.descs.RecvBytes = newPromDesc("recv_bytes", "Number of messages received gauge", sc.serverLabels) + sc.descs.SentMsgs = newPromDesc("sent_msgs_count", "Number of messages sent counter", sc.serverLabels) + sc.descs.SentBytes = newPromDesc("sent_bytes", "Number of bytes sent counter", sc.serverLabels) + sc.descs.RecvMsgs = newPromDesc("recv_msgs_count", "Number of messages received counter", sc.serverLabels) + sc.descs.RecvBytes = newPromDesc("recv_bytes", "Number of bytes received counter", sc.serverLabels) sc.descs.SlowConsumers = newPromDesc("slow_consumer_count", "Number of slow consumers gauge", sc.serverLabels) sc.descs.RTT = newPromDesc("rtt_nanoseconds", "RTT in nanoseconds gauge", sc.serverLabels) sc.descs.Routes = newPromDesc("route_count", "Number of active routes gauge", sc.serverLabels) sc.descs.Gateways = newPromDesc("gateway_count", "Number of active gateways gauge", sc.serverLabels) // Routes - sc.descs.RouteSentMsgs = newPromDesc("route_sent_msg_count", "Number of messages sent over the route gauge", sc.routeLabels) - sc.descs.RouteSentBytes = newPromDesc("route_sent_bytes", "Number of bytes sent over the route gauge", sc.routeLabels) - sc.descs.RouteRecvMsgs = newPromDesc("route_recv_msg_count", "Number of messages received over the route gauge", sc.routeLabels) - sc.descs.RouteRecvBytes = newPromDesc("route_recv_bytes", "Number of bytes received over the route gauge", sc.routeLabels) + sc.descs.RouteSentMsgs = newPromDesc("route_sent_msg_count", "Number of messages sent over the route counter", sc.routeLabels) + sc.descs.RouteSentBytes = newPromDesc("route_sent_bytes", "Number of bytes sent over the route counter", sc.routeLabels) + sc.descs.RouteRecvMsgs = newPromDesc("route_recv_msg_count", "Number of messages received over the route counter", sc.routeLabels) + sc.descs.RouteRecvBytes = newPromDesc("route_recv_bytes", "Number of bytes received over the route counter", sc.routeLabels) sc.descs.RoutePending = newPromDesc("route_pending_bytes", "Number of bytes pending in the route gauge", sc.routeLabels) // Gateways - sc.descs.GatewaySentMsgs = newPromDesc("gateway_sent_msgs_count", "Number of messages sent over the gateway gauge", sc.gatewayLabels) - sc.descs.GatewaySentBytes = newPromDesc("gateway_sent_bytes", "Number of messages sent over the gateway gauge", sc.gatewayLabels) - sc.descs.GatewayRecvMsgs = newPromDesc("gateway_recv_msg_count", "Number of messages sent over the gateway gauge", sc.gatewayLabels) - sc.descs.GatewayRecvBytes = newPromDesc("gateway_recv_bytes", "Number of messages sent over the gateway gauge", sc.gatewayLabels) + sc.descs.GatewaySentMsgs = newPromDesc("gateway_sent_msgs_count", "Number of messages sent over the gateway counter", sc.gatewayLabels) + sc.descs.GatewaySentBytes = newPromDesc("gateway_sent_bytes", "Number of messages sent over the gateway counter", sc.gatewayLabels) + sc.descs.GatewayRecvMsgs = newPromDesc("gateway_recv_msg_count", "Number of messages sent over the gateway counter", sc.gatewayLabels) + sc.descs.GatewayRecvBytes = newPromDesc("gateway_recv_bytes", "Number of messages sent over the gateway counter", sc.gatewayLabels) sc.descs.GatewayNumInbound = newPromDesc("gateway_inbound_msg_count", "Number inbound messages through the gateway gauge", sc.gatewayLabels) // Jetstream Info @@ -311,13 +317,14 @@ func (sc *StatzCollector) buildDescs() { sc.descs.accCount = newPromDesc("account_count", "The number of accounts detected", nil) sc.descs.accConnCount = newPromDesc("account_conn_count", "The number of client connections to this account", accLabel) + sc.descs.accTotalConnCount = newPromDesc("account_total_conn_count", "Total number of client connections serviced for this account", accLabel) sc.descs.accLeafCount = newPromDesc("account_leaf_count", "The number of leafnode connections to this account", accLabel) sc.descs.accSubCount = newPromDesc("account_sub_count", "The number of subscriptions on this account", accLabel) - sc.descs.accBytesSent = newPromDesc("account_bytes_sent", "The number of bytes sent on this account", accLabel) - sc.descs.accBytesRecv = newPromDesc("account_bytes_recv", "The number of bytes received on this account", accLabel) - sc.descs.accMsgsSent = newPromDesc("account_msgs_sent", "The number of messages sent on this account", accLabel) - sc.descs.accMsgsRecv = newPromDesc("account_msgs_recv", "The number of messages received on this account", accLabel) + sc.descs.accBytesSent = newPromDesc("account_bytes_sent", "The number of bytes sent on this account", append(accLabel, "server")) + sc.descs.accBytesRecv = newPromDesc("account_bytes_recv", "The number of bytes received on this account", append(accLabel, "server")) + sc.descs.accMsgsSent = newPromDesc("account_msgs_sent", "The number of messages sent on this account", append(accLabel, "server")) + sc.descs.accMsgsRecv = newPromDesc("account_msgs_recv", "The number of messages received on this account", append(accLabel, "server")) sc.descs.accJetstreamEnabled = newPromDesc("account_jetstream_enabled", "Whether JetStream is enabled or not for this account", accLabel) sc.descs.accJetstreamMemoryUsed = newPromDesc("account_jetstream_memory_used", "The number of bytes used by JetStream memory", accLabel) @@ -565,15 +572,24 @@ func (sc *StatzCollector) pollAccountInfo() error { for accID, acc := range accs { sts := accountStats{accountID: accID} - sts.leafCount = float64(acc.LeafNodes) - sts.subCount = float64(acc.NumSubs) - sts.connCount = float64(acc.Conns) - sts.bytesSent = float64(acc.Sent.Bytes) - sts.bytesRecv = float64(acc.Received.Bytes) - sts.msgsSent = float64(acc.Sent.Msgs) - sts.msgsRecv = float64(acc.Received.Msgs) + // gauge metrics get aggregated counts + sts.leafCount = float64(acc.Aggregated.LeafNodes) + sts.subCount = float64(acc.Aggregated.NumSubs) + sts.connCount = float64(acc.Aggregated.Conns) + + // counters get mapped by server + sts.byServer = make(map[string]*accountStatByServer, len(acc.ByServer)) + for serverID, serverAccStat := range acc.ByServer { + sts.byServer[serverID] = &accountStatByServer{ + totalConnCount: float64(serverAccStat.TotalConns), + bytesSent: float64(serverAccStat.Sent.Bytes), + bytesRecv: float64(serverAccStat.Received.Bytes), + msgsSent: float64(serverAccStat.Sent.Msgs), + msgsRecv: float64(serverAccStat.Received.Msgs), + } + } - accStats[acc.Account] = sts + accStats[accID] = sts } jsInfos := sc.getJSInfos(nc) for accID, jsInfo := range jsInfos { @@ -692,7 +708,12 @@ func (sc *StatzCollector) getJSInfos(nc *nats.Conn) map[string]*server.AccountDe return jsAccInfos } -func (sc *StatzCollector) getAccStatz(nc *nats.Conn) (map[string]*server.AccountStat, error) { +type accStat struct { + Aggregated *server.AccountStat + ByServer map[string]*server.AccountStat +} + +func (sc *StatzCollector) getAccStatz(nc *nats.Conn) (map[string]*accStat, error) { req := &server.AccountStatzOptions{ IncludeUnused: true, } @@ -729,20 +750,27 @@ func (sc *StatzCollector) getAccStatz(nc *nats.Conn) (map[string]*server.Account } } - accStatz := make(map[string]*server.AccountStat) + accStats := make(map[string]*accStat) for _, statz := range res { for _, acc := range statz.Accounts { - accInfo, ok := accStatz[acc.Account] + accInfo, ok := accStats[acc.Account] if !ok { - accStatz[acc.Account] = acc + accCopy := *acc + accStats[acc.Account] = &accStat{ + Aggregated: &accCopy, + ByServer: map[string]*server.AccountStat{ + statz.ID: acc, + }, + } continue } - mergeAccountStats(accInfo, acc) - accStatz[acc.Account] = acc + + mergeAccountStats(acc, accInfo.Aggregated) + accInfo.ByServer[statz.ID] = acc } } - return accStatz, nil + return accStats, nil } func mergeStreamDetails(from, to *server.AccountDetail) { @@ -936,13 +964,13 @@ func (sc *StatzCollector) Collect(ch chan<- prometheus.Metric) { metrics.newGaugeMetric(sc.descs.Cores, float64(sm.Stats.Cores), labels) metrics.newGaugeMetric(sc.descs.CPU, sm.Stats.CPU, labels) metrics.newGaugeMetric(sc.descs.Connections, float64(sm.Stats.Connections), labels) - metrics.newGaugeMetric(sc.descs.TotalConnections, float64(sm.Stats.TotalConnections), labels) + metrics.newCounterMetric(sc.descs.TotalConnections, float64(sm.Stats.TotalConnections), labels) metrics.newGaugeMetric(sc.descs.ActiveAccounts, float64(sm.Stats.ActiveAccounts), labels) metrics.newGaugeMetric(sc.descs.NumSubs, float64(sm.Stats.NumSubs), labels) - metrics.newGaugeMetric(sc.descs.SentMsgs, float64(sm.Stats.Sent.Msgs), labels) - metrics.newGaugeMetric(sc.descs.SentBytes, float64(sm.Stats.Sent.Bytes), labels) - metrics.newGaugeMetric(sc.descs.RecvMsgs, float64(sm.Stats.Received.Msgs), labels) - metrics.newGaugeMetric(sc.descs.RecvBytes, float64(sm.Stats.Received.Bytes), labels) + metrics.newCounterMetric(sc.descs.SentMsgs, float64(sm.Stats.Sent.Msgs), labels) + metrics.newCounterMetric(sc.descs.SentBytes, float64(sm.Stats.Sent.Bytes), labels) + metrics.newCounterMetric(sc.descs.RecvMsgs, float64(sm.Stats.Received.Msgs), labels) + metrics.newCounterMetric(sc.descs.RecvBytes, float64(sm.Stats.Received.Bytes), labels) metrics.newGaugeMetric(sc.descs.SlowConsumers, float64(sm.Stats.SlowConsumers), labels) metrics.newGaugeMetric(sc.descs.RTT, float64(sc.rtts[sm.Server.ID]), labels) metrics.newGaugeMetric(sc.descs.Routes, float64(len(sm.Stats.Routes)), labels) @@ -1015,19 +1043,19 @@ func (sc *StatzCollector) Collect(ch chan<- prometheus.Metric) { } for _, rs := range sm.Stats.Routes { labels = sc.routeLabelValues(sm, rs) - metrics.newGaugeMetric(sc.descs.RouteSentMsgs, float64(rs.Sent.Msgs), labels) - metrics.newGaugeMetric(sc.descs.RouteSentBytes, float64(rs.Sent.Bytes), labels) - metrics.newGaugeMetric(sc.descs.RouteRecvMsgs, float64(rs.Received.Msgs), labels) - metrics.newGaugeMetric(sc.descs.RouteRecvBytes, float64(rs.Received.Bytes), labels) + metrics.newCounterMetric(sc.descs.RouteSentMsgs, float64(rs.Sent.Msgs), labels) + metrics.newCounterMetric(sc.descs.RouteSentBytes, float64(rs.Sent.Bytes), labels) + metrics.newCounterMetric(sc.descs.RouteRecvMsgs, float64(rs.Received.Msgs), labels) + metrics.newCounterMetric(sc.descs.RouteRecvBytes, float64(rs.Received.Bytes), labels) metrics.newGaugeMetric(sc.descs.RoutePending, float64(rs.Pending), labels) } for _, gw := range sm.Stats.Gateways { labels = sc.gatewayLabelValues(sm, gw) - metrics.newGaugeMetric(sc.descs.GatewaySentMsgs, float64(gw.Sent.Msgs), labels) - metrics.newGaugeMetric(sc.descs.GatewaySentBytes, float64(gw.Sent.Bytes), labels) - metrics.newGaugeMetric(sc.descs.GatewayRecvMsgs, float64(gw.Received.Msgs), labels) - metrics.newGaugeMetric(sc.descs.GatewayRecvBytes, float64(gw.Received.Bytes), labels) + metrics.newCounterMetric(sc.descs.GatewaySentMsgs, float64(gw.Sent.Msgs), labels) + metrics.newCounterMetric(sc.descs.GatewaySentBytes, float64(gw.Sent.Bytes), labels) + metrics.newCounterMetric(sc.descs.GatewayRecvMsgs, float64(gw.Received.Msgs), labels) + metrics.newCounterMetric(sc.descs.GatewayRecvBytes, float64(gw.Received.Bytes), labels) metrics.newGaugeMetric(sc.descs.GatewayNumInbound, float64(gw.NumInbound), labels) } } @@ -1042,10 +1070,14 @@ func (sc *StatzCollector) Collect(ch chan<- prometheus.Metric) { metrics.newGaugeMetric(sc.descs.accLeafCount, stat.leafCount, id) metrics.newGaugeMetric(sc.descs.accSubCount, stat.subCount, id) - metrics.newCounterMetric(sc.descs.accBytesSent, stat.bytesSent, id) - metrics.newCounterMetric(sc.descs.accBytesRecv, stat.bytesRecv, id) - metrics.newCounterMetric(sc.descs.accMsgsSent, stat.msgsSent, id) - metrics.newCounterMetric(sc.descs.accMsgsRecv, stat.msgsRecv, id) + for serverID, serverAccStats := range stat.byServer { + byServerLabels := append(id, serverID) + metrics.newCounterMetric(sc.descs.accTotalConnCount, serverAccStats.totalConnCount, byServerLabels) + metrics.newCounterMetric(sc.descs.accBytesSent, serverAccStats.bytesSent, byServerLabels) + metrics.newCounterMetric(sc.descs.accBytesRecv, serverAccStats.bytesRecv, byServerLabels) + metrics.newCounterMetric(sc.descs.accMsgsSent, serverAccStats.msgsSent, byServerLabels) + metrics.newCounterMetric(sc.descs.accMsgsRecv, serverAccStats.msgsRecv, byServerLabels) + } metrics.newGaugeMetric(sc.descs.accJetstreamEnabled, stat.jetstreamEnabled, id) metrics.newGaugeMetric(sc.descs.accJetstreamMemoryUsed, stat.jetstreamMemoryUsed, id)