From 673822e9c895498d8d27961501e4f80bba89f0d6 Mon Sep 17 00:00:00 2001 From: Emad Mohamadi Date: Fri, 17 Feb 2023 10:45:11 +0100 Subject: [PATCH] Add gRPC render metadata header In headers, we now send the number of metrics gRPC render will stream. This can be beneficial for the receiver to know how many metrics it will receive over the stream. --- carbonserver/render.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/carbonserver/render.go b/carbonserver/render.go index 969afb492..0b37f36a8 100644 --- a/carbonserver/render.go +++ b/carbonserver/render.go @@ -18,6 +18,7 @@ import ( "go.uber.org/zap" "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" "google.golang.org/grpc/status" @@ -646,6 +647,13 @@ func (listener *CarbonserverListener) streamMetrics(stream grpcv2.CarbonV2_Rende return } +const gRPCRenderMetricsCountHeaderKey = "metrics-count" + +func sendRenderMetadataHeader(stream grpcv2.CarbonV2_RenderServer, filesCount int) error { + header := metadata.Pairs(gRPCRenderMetricsCountHeaderKey, strconv.Itoa(filesCount)) + return stream.SendHeader(header) +} + // Render implements Render rpc of CarbonV2 gRPC service func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, stream grpcv2.CarbonV2_RenderServer) (rpcErr error) { t0 := time.Now() @@ -726,6 +734,10 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str metricGlobMap := getMetricGlobMapFromExpandedGlobs(expandedGlobs) tle.MetricGlobMapLength = len(metricGlobMap) filesCount := countFilesInExpandedGlobs(expandedGlobs) + err = sendRenderMetadataHeader(stream, filesCount) + if err != nil { + return nil, err + } prepareChan := make(chan response, getStreamingChannelSize(filesCount)) go func() { prepareT0 := time.Now() @@ -768,6 +780,7 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str case res != nil: atomic.AddUint64(&listener.metrics.QueryCacheHit, 1) cachedResponses := res.([]response) + err = sendRenderMetadataHeader(stream, len(cachedResponses)) responseChanToStream = make(chan response, getStreamingChannelSize(len(cachedResponses))) go func() { for _, r := range cachedResponses {