Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce group_replica partial response strategy #33

Merged
merged 7 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ const (
promqlNegativeOffset = "promql-negative-offset"
promqlAtModifier = "promql-at-modifier"
queryPushdown = "query-pushdown"
dnsPrefix = "dnssrv+"
)

type queryMode string
Expand Down Expand Up @@ -188,6 +189,9 @@ func registerQuery(app *extkingpin.App) {
enableQueryPartialResponse := cmd.Flag("query.partial-response", "Enable partial response for queries if no partial_response param is specified. --no-query.partial-response for disabling.").
Default("true").Bool()

enableGroupReplicaPartialStrategy := cmd.Flag("query.group-replica-strategy", "Enable group-replica partial response strategy.").
Default("false").Bool()

enableRulePartialResponse := cmd.Flag("rule.partial-response", "Enable partial response for rules endpoint. --no-rule.partial-response for disabling.").
Hidden().Default("true").Bool()

Expand Down Expand Up @@ -369,6 +373,7 @@ func registerQuery(app *extkingpin.App) {
*tenantCertField,
*enforceTenancy,
*tenantLabel,
*enableGroupReplicaPartialStrategy,
)
})
}
Expand Down Expand Up @@ -451,6 +456,7 @@ func runQuery(
tenantCertField string,
enforceTenancy bool,
tenantLabel string,
groupReplicaPartialResponseStrategy bool,
) error {
if alertQueryURL == "" {
lastColon := strings.LastIndex(httpBindAddr, ":")
Expand Down Expand Up @@ -548,6 +554,8 @@ func runQuery(
dialOpts,
unhealthyStoreTimeout,
endpointInfoTimeout,
// ignoreErrors when group_replica partial response strategy is enabled.
groupReplicaPartialResponseStrategy,
queryConnMetricLabels...,
)

Expand All @@ -556,14 +564,26 @@ func runQuery(
targetsProxy = targets.NewProxy(logger, endpoints.GetTargetsClients)
metadataProxy = metadata.NewProxy(logger, endpoints.GetMetricMetadataClients)
exemplarsProxy = exemplars.NewProxy(logger, endpoints.GetExemplarsStores, selectorLset)
queryableCreator query.QueryableCreator
)
if groupReplicaPartialResponseStrategy {
level.Info(logger).Log("msg", "Enabled group-replica partial response strategy")
queryableCreator = query.NewQueryableCreatorWithGroupReplicaPartialResponseStrategy(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_", reg),
proxy,
maxConcurrentSelects,
queryTimeout,
)
} else {
queryableCreator = query.NewQueryableCreator(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_", reg),
proxy,
maxConcurrentSelects,
queryTimeout,
)
)
}

// Run File Service Discovery and update the store set when the files are modified.
if fileSD != nil {
Expand Down Expand Up @@ -863,6 +883,7 @@ func prepareEndpointSet(
dialOpts []grpc.DialOption,
unhealthyStoreTimeout time.Duration,
endpointInfoTimeout time.Duration,
ignoreStoreErrors bool,
queryConnMetricLabels ...string,
) *query.EndpointSet {
endpointSet := query.NewEndpointSet(
Expand All @@ -881,9 +902,13 @@ func prepareEndpointSet(

for _, dnsProvider := range dnsProviders {
var tmpSpecs []*query.GRPCEndpointSpec

for _, addr := range dnsProvider.Addresses() {
tmpSpecs = append(tmpSpecs, query.NewGRPCEndpointSpec(addr, false))
for dnsName, addrs := range dnsProvider.AddressesWithDNS() {
// The dns name is like "dnssrv+pantheon-db-rep0:10901" whose replica key is "pantheon-db-rep0".
// TODO: have a more robust protocol to extract the replica key.
replicaKey := strings.Split(strings.TrimPrefix(dnsName, dnsPrefix), ":")[0]
for _, addr := range addrs {
tmpSpecs = append(tmpSpecs, query.NewGRPCEndpointSpecWithReplicaKey(replicaKey, addr, false, ignoreStoreErrors))
}
}
tmpSpecs = removeDuplicateEndpointSpecs(logger, duplicatedStores, tmpSpecs)
specs = append(specs, tmpSpecs...)
Expand Down
3 changes: 2 additions & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ func runRule(
dialOpts,
5*time.Minute,
5*time.Second,
false,
)

// Periodically update the GRPC addresses from query config by resolving them using DNS SD if necessary.
Expand Down Expand Up @@ -911,7 +912,7 @@ func queryFuncCreator(
var spanID string

switch partialResponseStrategy {
case storepb.PartialResponseStrategy_WARN:
case storepb.PartialResponseStrategy_WARN, storepb.PartialResponseStrategy_GROUP_REPLICA:
spanID = "/rule_instant_query HTTP[client]"
case storepb.PartialResponseStrategy_ABORT:
spanID = "/rule_instant_query_part_resp_abort HTTP[client]"
Expand Down
11 changes: 11 additions & 0 deletions pkg/discovery/dns/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,14 @@ func (p *Provider) Addresses() []string {
}
return result
}

func (p *Provider) AddressesWithDNS() map[string][]string {
p.RLock()
defer p.RUnlock()

result := make(map[string][]string, len(p.resolved))
for dns, addrs := range p.resolved {
result[dns] = addrs
}
return result
}
2 changes: 1 addition & 1 deletion pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ func (p *QueryOptions) AddTo(values url.Values) error {

var partialResponseValue string
switch p.PartialResponseStrategy {
case storepb.PartialResponseStrategy_WARN:
case storepb.PartialResponseStrategy_WARN, storepb.PartialResponseStrategy_GROUP_REPLICA:
partialResponseValue = strconv.FormatBool(true)
case storepb.PartialResponseStrategy_ABORT:
partialResponseValue = strconv.FormatBool(false)
Expand Down
Loading
Loading