Skip to content

Commit

Permalink
Introduce group/replica key to store clients
Browse files Browse the repository at this point in the history
  • Loading branch information
hczhu-db committed Apr 28, 2024
1 parent da7ccc1 commit aa08156
Show file tree
Hide file tree
Showing 8 changed files with 226 additions and 51 deletions.
16 changes: 13 additions & 3 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ const (
promqlNegativeOffset = "promql-negative-offset"
promqlAtModifier = "promql-at-modifier"
queryPushdown = "query-pushdown"
dnsPrefix = "dnssrv+"
)

type queryMode string
Expand Down Expand Up @@ -881,9 +882,18 @@ func prepareEndpointSet(

for _, dnsProvider := range dnsProviders {
var tmpSpecs []*query.GRPCEndpointSpec

for _, addr := range dnsProvider.Addresses() {
tmpSpecs = append(tmpSpecs, query.NewGRPCEndpointSpec(addr, false))
// The dns name is like "dnssrv+range-store-api:10901"
for dnsName, addrs := range dnsProvider.AddressesWithDNS() {
// The dns name is like "dnssrv+range-store-api:10901" whose group key is "range-store-api".
// TODO: have a more robust protocol to extract the group key.
groupKey := dnsName
if strings.HasPrefix(groupKey, dnsPrefix) {
groupKey = groupKey[len(dnsPrefix):]
}
groupKey = strings.Split(groupKey, ":")[0]
for _, addr := range addrs {
tmpSpecs = append(tmpSpecs, query.NewGRPCEndpointSpecWithGroupKey(groupKey, addr, false))
}
}
tmpSpecs = removeDuplicateEndpointSpecs(logger, duplicatedStores, tmpSpecs)
specs = append(specs, tmpSpecs...)
Expand Down
11 changes: 11 additions & 0 deletions pkg/discovery/dns/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,14 @@ func (p *Provider) Addresses() []string {
}
return result
}

func (p *Provider) AddressesWithDNS() map[string][]string {
p.RLock()
defer p.RUnlock()

result := make(map[string][]string, len(p.resolved))
for dns, addrs := range p.resolved {
result[dns] = addrs
}
return result
}
109 changes: 99 additions & 10 deletions pkg/query/endpointset.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"encoding/json"
"fmt"
"math"
"regexp"
"sort"
"strings"
"sync"
"time"
"unicode/utf8"
Expand Down Expand Up @@ -44,12 +46,22 @@ type queryConnMetricLabel string
const (
ExternalLabels queryConnMetricLabel = "external_labels"
StoreType queryConnMetricLabel = "store_type"
GroupKey queryConnMetricLabel = "group_key"
ReplicaKey queryConnMetricLabel = "replica_key"
)

var gReplicaKeySuffixRegex *regexp.Regexp

func init() {
gReplicaKeySuffixRegex = regexp.MustCompile(`-rep[0-9]$`)
}

type GRPCEndpointSpec struct {
addr string
isStrictStatic bool
dialOpts []grpc.DialOption
groupKey string
replicaKey string
}

const externalLabelLimit = 1000
Expand All @@ -64,11 +76,26 @@ func NewGRPCEndpointSpec(addr string, isStrictStatic bool, dialOpts ...grpc.Dial
}
}

func NewGRPCEndpointSpecWithGroupKey(replicaKey, addr string, isStrictStatic bool, dialOpts ...grpc.DialOption) *GRPCEndpointSpec {
spec := NewGRPCEndpointSpec(addr, isStrictStatic, dialOpts...)
spec.replicaKey = replicaKey
spec.groupKey = gReplicaKeySuffixRegex.ReplaceAllString(replicaKey, "")
return spec
}

func (es *GRPCEndpointSpec) Addr() string {
// API address should not change between state changes.
return es.addr
}

func (es *GRPCEndpointSpec) GroupKey() string {
return es.groupKey
}

func (es *GRPCEndpointSpec) ReplicaKey() string {
return es.replicaKey
}

// Metadata method for gRPC endpoint tries to call InfoAPI exposed by Thanos components until context timeout. If we are unable to get metadata after
// that time, we assume that the host is unhealthy and return error.
func (es *endpointRef) Metadata(ctx context.Context, infoClient infopb.InfoClient, storeClient storepb.StoreClient) (*endpointMetadata, error) {
Expand Down Expand Up @@ -196,23 +223,38 @@ type endpointSetNodeCollector struct {
mtx sync.Mutex
storeNodes map[component.Component]map[string]int
storePerExtLset map[string]int
storeNodesAddr map[string]map[string]int
storeNodesKeys map[string]map[string]int

connectionsDesc *prometheus.Desc
labels []string
connectionsDesc *prometheus.Desc
labels []string
connectionsWithAddr *prometheus.Desc
connectionsWithKeys *prometheus.Desc
}

func newEndpointSetNodeCollector(labels ...string) *endpointSetNodeCollector {
if len(labels) == 0 {
labels = []string{string(ExternalLabels), string(StoreType)}
}
desc := "Number of gRPC connection to Store APIs. Opened connection means healthy store APIs available for Querier."
return &endpointSetNodeCollector{
storeNodes: map[component.Component]map[string]int{},
connectionsDesc: prometheus.NewDesc(
"thanos_store_nodes_grpc_connections",
"Number of gRPC connection to Store APIs. Opened connection means healthy store APIs available for Querier.",
desc,
labels, nil,
),
labels: labels,
connectionsWithAddr: prometheus.NewDesc(
"thanos_store_nodes_grpc_connections_addr",
desc,
[]string{string(ReplicaKey), "addr"}, nil,
),
connectionsWithKeys: prometheus.NewDesc(
"thanos_store_nodes_grpc_connections_keys",
desc,
[]string{string(GroupKey), string(ReplicaKey)}, nil,
),
}
}

Expand All @@ -229,7 +271,11 @@ func truncateExtLabels(s string, threshold int) string {
}
return s
}
func (c *endpointSetNodeCollector) Update(nodes map[component.Component]map[string]int) {
func (c *endpointSetNodeCollector) Update(
nodes map[component.Component]map[string]int,
nodesAddr map[string]map[string]int,
nodesKeys map[string]map[string]int,
) {
storeNodes := make(map[component.Component]map[string]int, len(nodes))
storePerExtLset := map[string]int{}

Expand All @@ -246,10 +292,14 @@ func (c *endpointSetNodeCollector) Update(nodes map[component.Component]map[stri
defer c.mtx.Unlock()
c.storeNodes = storeNodes
c.storePerExtLset = storePerExtLset
c.storeNodesAddr = nodesAddr
c.storeNodesKeys = nodesKeys
}

func (c *endpointSetNodeCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- c.connectionsDesc
ch <- c.connectionsWithAddr
ch <- c.connectionsWithKeys
}

func (c *endpointSetNodeCollector) Collect(ch chan<- prometheus.Metric) {
Expand All @@ -275,6 +325,22 @@ func (c *endpointSetNodeCollector) Collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(c.connectionsDesc, prometheus.GaugeValue, float64(occurrences), lbls...)
}
}
for replicaKey, occurrencesPerAddr := range c.storeNodesAddr {
for addr, occurrences := range occurrencesPerAddr {
ch <- prometheus.MustNewConstMetric(
c.connectionsWithAddr, prometheus.GaugeValue,
float64(occurrences),
replicaKey, addr)
}
}
for groupKey, occurrencesPerReplicaKey := range c.storeNodesKeys {
for replicaKeys, occurrences := range occurrencesPerReplicaKey {
ch <- prometheus.MustNewConstMetric(
c.connectionsWithKeys, prometheus.GaugeValue,
float64(occurrences),
groupKey, replicaKeys)
}
}
}

// EndpointSet maintains a set of active Thanos endpoints. It is backed up by Endpoint Specifications that are dynamically fetched on
Expand Down Expand Up @@ -434,6 +500,14 @@ func (e *EndpointSet) Update(ctx context.Context) {

// Update stats.
stats := newEndpointAPIStats()
statsAddr := make(map[string]map[string]int)
statsKeys := make(map[string]map[string]int)
bumpCounter := func(key1, key2 string, mp map[string]map[string]int) {
if _, ok := mp[key1]; !ok {
mp[key1] = make(map[string]int)
}
mp[key1][key2]++
}
for addr, er := range e.endpoints {
if !er.isQueryable() {
continue
Expand All @@ -449,9 +523,11 @@ func (e *EndpointSet) Update(ctx context.Context) {
"address", addr, "extLset", extLset, "duplicates", fmt.Sprintf("%v", stats[component.Sidecar][extLset]+stats[component.Rule][extLset]+1))
}
stats[er.ComponentType()][extLset]++
bumpCounter(er.replicaKey, strings.Split(addr, ":")[0], statsAddr)
bumpCounter(er.groupKey, er.replicaKey, statsKeys)
}

e.endpointsMetric.Update(stats)
e.endpointsMetric.Update(stats, statsAddr, statsKeys)
}

func (e *EndpointSet) updateEndpoint(ctx context.Context, spec *GRPCEndpointSpec, er *endpointRef) {
Expand Down Expand Up @@ -639,9 +715,20 @@ type endpointRef struct {
metadata *endpointMetadata
status *EndpointStatus

groupKey string
replicaKey string

logger log.Logger
}

func (er *endpointRef) GroupKey() string {
return er.groupKey
}

func (er *endpointRef) ReplicaKey() string {
return er.replicaKey
}

// newEndpointRef creates a new endpointRef with a gRPC channel to the given the IP address.
// The call to newEndpointRef will return an error if establishing the channel fails.
func (e *EndpointSet) newEndpointRef(ctx context.Context, spec *GRPCEndpointSpec) (*endpointRef, error) {
Expand All @@ -660,11 +747,13 @@ func (e *EndpointSet) newEndpointRef(ctx context.Context, spec *GRPCEndpointSpec
}

return &endpointRef{
logger: e.logger,
created: e.now(),
addr: spec.Addr(),
isStrict: spec.isStrictStatic,
cc: conn,
logger: e.logger,
created: e.now(),
addr: spec.Addr(),
isStrict: spec.isStrictStatic,
cc: conn,
groupKey: spec.groupKey,
replicaKey: spec.replicaKey,
}, nil
}

Expand Down
26 changes: 26 additions & 0 deletions pkg/query/endpointset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,14 @@ func TestEndpointSetUpdate(t *testing.T) {
# HELP thanos_store_nodes_grpc_connections Number of gRPC connection to Store APIs. Opened connection means healthy store APIs available for Querier.
# TYPE thanos_store_nodes_grpc_connections gauge
`
const metricsMetaAddr = `
# HELP thanos_store_nodes_grpc_connections_addr Number of gRPC connection to Store APIs. Opened connection means healthy store APIs available for Querier.
# TYPE thanos_store_nodes_grpc_connections_addr gauge
`
const metricsMetaKeys = `
# HELP thanos_store_nodes_grpc_connections_keys Number of gRPC connection to Store APIs. Opened connection means healthy store APIs available for Querier.
# TYPE thanos_store_nodes_grpc_connections_keys gauge
`
testCases := []struct {
name string
endpoints []testEndpointMeta
Expand Down Expand Up @@ -343,6 +351,12 @@ func TestEndpointSetUpdate(t *testing.T) {
expectedConnMetrics: metricsMeta +
`
thanos_store_nodes_grpc_connections{store_type="sidecar"} 1
` + metricsMetaAddr +
`
thanos_store_nodes_grpc_connections_addr{addr="127.0.0.1",replica_key=""} 1
` + metricsMetaKeys +
`
thanos_store_nodes_grpc_connections_keys{group_key="",replica_key=""} 1
`,
},
{
Expand Down Expand Up @@ -397,6 +411,12 @@ func TestEndpointSetUpdate(t *testing.T) {
expectedConnMetrics: metricsMeta +
`
thanos_store_nodes_grpc_connections{store_type="sidecar"} 1
` + metricsMetaAddr +
`
thanos_store_nodes_grpc_connections_addr{addr="127.0.0.1",replica_key=""} 1
` + metricsMetaKeys +
`
thanos_store_nodes_grpc_connections_keys{group_key="",replica_key=""} 1
`,
},
{
Expand All @@ -420,6 +440,12 @@ func TestEndpointSetUpdate(t *testing.T) {
expectedEndpoints: 1,
expectedConnMetrics: metricsMeta + `
thanos_store_nodes_grpc_connections{external_labels="{lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val}",store_type="sidecar"} 1
` + metricsMetaAddr +
`
thanos_store_nodes_grpc_connections_addr{addr="127.0.0.1",replica_key=""} 1
` + metricsMetaKeys +
`
thanos_store_nodes_grpc_connections_keys{group_key="",replica_key=""} 1
`,
},
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@ type Client interface {
// Addr returns address of the store client. If second parameter is true, the client
// represents a local client (server-as-client) and has no remote address.
Addr() (addr string, isLocalClient bool)

// A group key is like "db" or "long-range-store".
// Two groups don't overlap their data.
GroupKey() string
// ReplicaKey is a unique key for each replica of a group,
// E.g, "db-rep0" and "db-rep1" for group "db".
// The replicas of a group serve exactly the same partition of data.
// ReplicaKey can be the same as the group key. That means the group has only one replica.
ReplicaKey() string
}

// ProxyStore implements the store API that proxies request to all given underlying stores.
Expand Down Expand Up @@ -354,7 +363,18 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.

storeResponses := make([]respSet, 0, len(stores))

// groupReplicaStores[groupKey][replicaKey] = number of stores with the groupKey and replicaKey
groupReplicaStores := make(map[string]map[string]int)
// failedStores[groupKey][replicaKey] = number of store failures
failedStores := make(map[string]map[string]int)

Check failure on line 369 in pkg/store/proxy.go

View workflow job for this annotation

GitHub Actions / Go build for different platforms

failedStores declared and not used

Check failure on line 369 in pkg/store/proxy.go

View workflow job for this annotation

GitHub Actions / Thanos unit tests

failedStores declared and not used
bumpCounter := func(key1, key2 string, mp map[string]map[string]int) {
if _, ok := mp[key1]; !ok {
mp[key1] = make(map[string]int)
}
mp[key1][key2]++
}
for _, st := range stores {
bumpCounter(st.GroupKey(), st.ReplicaKey(), groupReplicaStores)
st := st
if s.debugLogging {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s queried", st))
Expand Down
5 changes: 5 additions & 0 deletions pkg/store/storepb/testutil/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ type TestClient struct {
WithoutReplicaLabelsEnabled bool
IsLocalStore bool
StoreTSDBInfos []infopb.TSDBInfo

GroupKeyStr string
ReplicaKeyStr string
}

func (c TestClient) LabelSets() []labels.Labels { return c.ExtLset }
Expand All @@ -30,3 +33,5 @@ func (c TestClient) SupportsSharding() bool { return c.Shardable }
func (c TestClient) SupportsWithoutReplicaLabels() bool { return c.WithoutReplicaLabelsEnabled }
func (c TestClient) String() string { return c.Name }
func (c TestClient) Addr() (string, bool) { return c.Name, c.IsLocalStore }
func (c TestClient) GroupKey() string { return c.GroupKeyStr }
func (c TestClient) ReplicaKey() string { return c.ReplicaKeyStr }
Loading

0 comments on commit aa08156

Please sign in to comment.