Skip to content

Commit

Permalink
Support remote write v2 by converting request
Browse files Browse the repository at this point in the history
Signed-off-by: SungJin1212 <[email protected]>
  • Loading branch information
SungJin1212 committed Nov 11, 2024
1 parent 1d09628 commit 83d0ba6
Show file tree
Hide file tree
Showing 15 changed files with 1,218 additions and 108 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test-build-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ jobs:
- integration_querier
- integration_ruler
- integration_query_fuzz
- integration_remote_write_v2
steps:
- name: Upgrade golang
uses: actions/setup-go@41dfa10bad2bb2ae585af6ee5bb4d7d973ad74ed # v5.1.0
Expand Down
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,4 @@ run:
- integration_querier
- integration_ruler
- integration_query_fuzz
- integration_remote_write_v2
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* [CHANGE] Change all max async concurrency default values `50` to `3` #6268
* [CHANGE] Change default value of `-blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency` from `50` to `3` #6265
* [CHANGE] Enable Compactor and Alertmanager in target all. #6204
* [FEATURE] Support Prometheus remote write 2.0. #6330
* [FEATURE] Ruler: Pagination support for List Rules API. #6299
* [FEATURE] Query Frontend/Querier: Add protobuf codec `-api.querier-default-codec` and the option to choose response compression type `-querier.response-compression`. #5527
* [FEATURE] Ruler: Experimental: Add `ruler.frontend-address` to allow query to query frontends instead of ingesters. #6151
Expand Down
76 changes: 76 additions & 0 deletions integration/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
Expand Down Expand Up @@ -334,3 +335,78 @@ func CreateBlock(

return id, nil
}

func GenerateHistogramSeriesV2(name string, ts time.Time, i uint32, floatHistogram bool, additionalLabels ...prompb.Label) (symbols []string, series []writev2.TimeSeries) {
tsMillis := TimeToMilliseconds(ts)

st := writev2.NewSymbolTable()

lbs := labels.Labels{labels.Label{Name: "__name__", Value: name}}
for _, lbl := range additionalLabels {
lbs = append(lbs, labels.Label{Name: lbl.Name, Value: lbl.Value})
}

var (
h *histogram.Histogram
fh *histogram.FloatHistogram
ph writev2.Histogram
)
if floatHistogram {
fh = tsdbutil.GenerateTestFloatHistogram(int(i))
ph = writev2.FromFloatHistogram(tsMillis, fh)
} else {
h = tsdbutil.GenerateTestHistogram(int(i))
ph = writev2.FromIntHistogram(tsMillis, h)
}

// Generate the series
series = append(series, writev2.TimeSeries{
LabelsRefs: st.SymbolizeLabels(lbs, nil),
Histograms: []writev2.Histogram{ph},
})

symbols = st.Symbols()

return
}

func GenerateSeriesV2(name string, ts time.Time, additionalLabels ...prompb.Label) (symbols []string, series []writev2.TimeSeries, vector model.Vector) {
tsMillis := TimeToMilliseconds(ts)
value := rand.Float64()

st := writev2.NewSymbolTable()
lbs := labels.Labels{{Name: labels.MetricName, Value: name}}

for _, label := range additionalLabels {
lbs = append(lbs, labels.Label{
Name: label.Name,
Value: label.Value,
})
}
series = append(series, writev2.TimeSeries{
// Generate the series
LabelsRefs: st.SymbolizeLabels(lbs, nil),
Samples: []writev2.Sample{
{Value: value, Timestamp: tsMillis},
},
Metadata: writev2.Metadata{
Type: writev2.Metadata_METRIC_TYPE_GAUGE,
},
})
symbols = st.Symbols()

// Generate the expected vector when querying it
metric := model.Metric{}
metric[labels.MetricName] = model.LabelValue(name)
for _, lbl := range additionalLabels {
metric[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
}

vector = append(vector, &model.Sample{
Metric: metric,
Value: model.SampleValue(value),
Timestamp: model.Time(tsMillis),
})

return
}
40 changes: 40 additions & 0 deletions integration/e2ecortex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/rulefmt"
"github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
yaml "gopkg.in/yaml.v3"
Expand Down Expand Up @@ -147,6 +148,39 @@ func (c *Client) Push(timeseries []prompb.TimeSeries) (*http.Response, error) {
return res, nil
}

// PushV2 the input timeseries to the remote endpoint
func (c *Client) PushV2(symbols []string, timeseries []writev2.TimeSeries) (*http.Response, error) {
// Create write request
data, err := proto.Marshal(&writev2.Request{Symbols: symbols, Timeseries: timeseries})
if err != nil {
return nil, err
}

// Create HTTP request
compressed := snappy.Encode(nil, data)
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/prom/push", c.distributorAddress), bytes.NewReader(compressed))
if err != nil {
return nil, err
}

req.Header.Add("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf;proto=io.prometheus.write.v2.Request")
req.Header.Set("X-Prometheus-Remote-Write-Version", "2.0.0")
req.Header.Set("X-Scope-OrgID", c.orgID)

ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

// Execute HTTP request
res, err := c.httpClient.Do(req.WithContext(ctx))
if err != nil {
return nil, err
}

defer res.Body.Close()
return res, nil
}

func getNameAndAttributes(ts prompb.TimeSeries) (string, map[string]any) {
var metricName string
attributes := make(map[string]any)
Expand Down Expand Up @@ -356,6 +390,12 @@ func (c *Client) Query(query string, ts time.Time) (model.Value, error) {
return value, err
}

// Metadata runs a metadata query
func (c *Client) Metadata(name, limit string) (map[string][]promv1.Metadata, error) {
metadata, err := c.querierClient.Metadata(context.Background(), name, limit)
return metadata, err
}

// QueryExemplars runs an exemplars query
func (c *Client) QueryExemplars(query string, start, end time.Time) ([]promv1.ExemplarQueryResult, error) {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
Expand Down
211 changes: 211 additions & 0 deletions integration/remote_write_v2_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
//go:build integration_remote_write_v2
// +build integration_remote_write_v2

package integration

import (
"math/rand"
"net/http"
"path"
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
"github.com/cortexproject/cortex/pkg/storage/tsdb"
)

func TestIngest(t *testing.T) {
const blockRangePeriod = 5 * time.Second

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
consul := e2edb.NewConsulWithName("consul")
require.NoError(t, s.StartAndWaitReady(consul))

flags := mergeFlags(
AlertmanagerLocalFlags(),
map[string]string{
"-store.engine": blocksStorageEngine,
"-blocks-storage.backend": "filesystem",
"-blocks-storage.tsdb.head-compaction-interval": "4m",
"-blocks-storage.bucket-store.sync-interval": "15m",
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
"-querier.query-store-for-labels-enabled": "true",
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
"-blocks-storage.tsdb.enable-native-histograms": "true",
// Ingester.
"-ring.store": "consul",
"-consul.hostname": consul.NetworkHTTPEndpoint(),
// Distributor.
"-distributor.replication-factor": "1",
// Store-gateway.
"-store-gateway.sharding-enabled": "false",
// alert manager
"-alertmanager.web.external-url": "http://localhost/alertmanager",
},
)

// make alert manager config dir
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

path := path.Join(s.SharedDir(), "cortex-1")

flags = mergeFlags(flags, map[string]string{"-blocks-storage.filesystem.dir": path})
// Start Cortex replicas.
cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
require.NoError(t, s.StartAndWaitReady(cortex))

// Wait until Cortex replicas have updated the ring state.
require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))

c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

now := time.Now()

// series push
symbols1, series, expectedVector := e2e.GenerateSeriesV2("test_series", now, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "foo", Value: "bar"})
res, err := c.PushV2(symbols1, series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
testPushHeader(t, res.Header, "1", "1", "0")

// sample
result, err := c.Query("test_series", now)
require.NoError(t, err)
assert.Equal(t, expectedVector, result.(model.Vector))

// metadata
metadata, err := c.Metadata("test_series", "")
require.NoError(t, err)
require.Equal(t, 1, len(metadata["test_series"]))

// histogram
histogramIdx := rand.Uint32()
symbols2, histogramSeries := e2e.GenerateHistogramSeriesV2("test_histogram", now, histogramIdx, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "false"})
res, err = c.PushV2(symbols2, histogramSeries)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
testPushHeader(t, res.Header, "1", "1", "0")

symbols3, histogramFloatSeries := e2e.GenerateHistogramSeriesV2("test_histogram", now, histogramIdx, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"})
res, err = c.PushV2(symbols3, histogramFloatSeries)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
testPushHeader(t, res.Header, "1", "1", "0")

testHistogramTimestamp := now.Add(blockRangePeriod * 2)
expectedHistogram := tsdbutil.GenerateTestHistogram(int(histogramIdx))
result, err = c.Query(`test_histogram`, testHistogramTimestamp)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
v := result.(model.Vector)
require.Equal(t, 2, v.Len())
for _, s := range v {
require.NotNil(t, s.Histogram)
require.Equal(t, float64(expectedHistogram.Count), float64(s.Histogram.Count))
require.Equal(t, float64(expectedHistogram.Sum), float64(s.Histogram.Sum))
}
}

func TestExemplar(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
consul := e2edb.NewConsulWithName("consul")
require.NoError(t, s.StartAndWaitReady(consul))

flags := mergeFlags(
AlertmanagerLocalFlags(),
map[string]string{
"-store.engine": blocksStorageEngine,
"-blocks-storage.backend": "filesystem",
"-blocks-storage.tsdb.head-compaction-interval": "4m",
"-blocks-storage.bucket-store.sync-interval": "15m",
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
"-querier.query-store-for-labels-enabled": "true",
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.tsdb.enable-native-histograms": "true",
// Ingester.
"-ring.store": "consul",
"-consul.hostname": consul.NetworkHTTPEndpoint(),
"-ingester.max-exemplars": "100",
// Distributor.
"-distributor.replication-factor": "1",
// Store-gateway.
"-store-gateway.sharding-enabled": "false",
// alert manager
"-alertmanager.web.external-url": "http://localhost/alertmanager",
},
)

// make alert manager config dir
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

path := path.Join(s.SharedDir(), "cortex-1")

flags = mergeFlags(flags, map[string]string{"-blocks-storage.filesystem.dir": path})
// Start Cortex replicas.
cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
require.NoError(t, s.StartAndWaitReady(cortex))

// Wait until Cortex replicas have updated the ring state.
require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))

c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

now := time.Now()
tsMillis := e2e.TimeToMilliseconds(now)

symbols := []string{"", "__name__", "test_metric", "b", "c", "baz", "qux", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"}
timeseries := []writev2.TimeSeries{
{
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Symbolized writeRequestFixture.Timeseries[0].Labels
Metadata: writev2.Metadata{
Type: writev2.Metadata_METRIC_TYPE_COUNTER, // writeV2RequestSeries1Metadata.Type.

HelpRef: 15, // Symbolized writeV2RequestSeries1Metadata.Help.
UnitRef: 16, // Symbolized writeV2RequestSeries1Metadata.Unit.
},
Samples: []writev2.Sample{{Value: 1, Timestamp: tsMillis}},
Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: tsMillis}},
},
}

res, err := c.PushV2(symbols, timeseries)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
testPushHeader(t, res.Header, "1", "1", "1")

start := time.Now().Add(-time.Minute)
end := now.Add(time.Minute)

exemplars, err := c.QueryExemplars("test_metric", start, end)
require.NoError(t, err)
require.Equal(t, 1, len(exemplars))
}

func testPushHeader(t *testing.T, header http.Header, expectedSamples, expectedHistogram, expectedExemplars string) {
require.Equal(t, expectedSamples, header.Get("X-Prometheus-Remote-Write-Samples-Written"))
require.Equal(t, expectedHistogram, header.Get("X-Prometheus-Remote-Write-Histograms-Written"))
require.Equal(t, expectedExemplars, header.Get("X-Prometheus-Remote-Write-Exemplars-Written"))
}
Loading

0 comments on commit 83d0ba6

Please sign in to comment.