Skip to content

Commit

Permalink
Merge pull request #1603 from josephschorr/experimental-secondary-dis…
Browse files Browse the repository at this point in the history
…patch

Add support for experimental secondary dispatching
  • Loading branch information
josephschorr authored Oct 23, 2023
2 parents 16ee407 + 7b7f2de commit ddbecd6
Show file tree
Hide file tree
Showing 8 changed files with 606 additions and 25 deletions.
62 changes: 52 additions & 10 deletions internal/dispatch/combined/combined.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package combined

import (
"fmt"
"time"

"github.com/authzed/grpcutil"
Expand All @@ -23,15 +24,17 @@ import (
type Option func(*optionState)

type optionState struct {
metricsEnabled bool
prometheusSubsystem string
upstreamAddr string
upstreamCAPath string
grpcPresharedKey string
grpcDialOpts []grpc.DialOption
cache cache.Cache
concurrencyLimits graph.ConcurrencyLimits
remoteDispatchTimeout time.Duration
metricsEnabled bool
prometheusSubsystem string
upstreamAddr string
upstreamCAPath string
grpcPresharedKey string
grpcDialOpts []grpc.DialOption
cache cache.Cache
concurrencyLimits graph.ConcurrencyLimits
remoteDispatchTimeout time.Duration
secondaryUpstreamAddrs map[string]string
secondaryUpstreamExprs map[string]string
}

// MetricsEnabled enables issuing prometheus metrics
Expand Down Expand Up @@ -63,6 +66,23 @@ func UpstreamCAPath(path string) Option {
}
}

// SecondaryUpstreamAddrs sets a named map of upstream addresses for secondary
// dispatching.
func SecondaryUpstreamAddrs(addrs map[string]string) Option {
return func(state *optionState) {
state.secondaryUpstreamAddrs = addrs
}
}

// SecondaryUpstreamExprs sets a named map from dispatch type to the associated
// CEL expression to run to determine which secondary dispatch addresses (if any)
// to use for that incoming request.
func SecondaryUpstreamExprs(addrs map[string]string) Option {
return func(state *optionState) {
state.secondaryUpstreamExprs = addrs
}
}

// GrpcPresharedKey sets the preshared key used to authenticate for optional
// cluster dispatching.
func GrpcPresharedKey(key string) Option {
Expand Down Expand Up @@ -141,10 +161,32 @@ func NewDispatcher(options ...Option) (dispatch.Dispatcher, error) {
if err != nil {
return nil, err
}

secondaryClients := make(map[string]remote.SecondaryDispatch, len(opts.secondaryUpstreamAddrs))
for name, addr := range opts.secondaryUpstreamAddrs {
secondaryConn, err := grpc.Dial(addr, opts.grpcDialOpts...)
if err != nil {
return nil, err
}
secondaryClients[name] = remote.SecondaryDispatch{
Name: name,
Client: v1.NewDispatchServiceClient(secondaryConn),
}
}

secondaryExprs := make(map[string]*remote.DispatchExpr, len(opts.secondaryUpstreamExprs))
for name, exprString := range opts.secondaryUpstreamExprs {
parsed, err := remote.ParseDispatchExpression(name, exprString)
if err != nil {
return nil, fmt.Errorf("error parsing secondary dispatch expr `%s` for method `%s`: %w", exprString, name, err)
}
secondaryExprs[name] = parsed
}

redispatch = remote.NewClusterDispatcher(v1.NewDispatchServiceClient(conn), conn, remote.ClusterDispatcherConfig{
KeyHandler: &keys.CanonicalKeyHandler{},
DispatchOverallTimeout: opts.remoteDispatchTimeout,
})
}, secondaryClients, secondaryExprs)
}

cachingRedispatch.SetDelegate(redispatch)
Expand Down
142 changes: 135 additions & 7 deletions internal/dispatch/remote/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@ import (
"errors"
"fmt"
"io"
"strings"
"time"

"github.com/authzed/consistent"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/protobuf/proto"

"github.com/authzed/spicedb/internal/dispatch"
"github.com/authzed/spicedb/internal/dispatch/keys"
Expand All @@ -18,7 +22,18 @@ import (
"github.com/authzed/spicedb/pkg/spiceerrors"
)

type clusterClient interface {
var dispatchCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "spicedb",
Subsystem: "dispatch",
Name: "remote_dispatch_handler_total",
Help: "which dispatcher handled a request",
}, []string{"request_kind", "handler_name"})

func init() {
prometheus.MustRegister(dispatchCounter)
}

type ClusterClient interface {
DispatchCheck(ctx context.Context, req *v1.DispatchCheckRequest, opts ...grpc.CallOption) (*v1.DispatchCheckResponse, error)
DispatchExpand(ctx context.Context, req *v1.DispatchExpandRequest, opts ...grpc.CallOption) (*v1.DispatchExpandResponse, error)
DispatchReachableResources(ctx context.Context, in *v1.DispatchReachableResourcesRequest, opts ...grpc.CallOption) (v1.DispatchService_DispatchReachableResourcesClient, error)
Expand All @@ -35,9 +50,16 @@ type ClusterDispatcherConfig struct {
DispatchOverallTimeout time.Duration
}

// SecondaryDispatch defines a struct holding a client and its name for secondary
// dispatching.
type SecondaryDispatch struct {
Name string
Client ClusterClient
}

// NewClusterDispatcher creates a dispatcher implementation that uses the provided client
// to dispatch requests to peer nodes in the cluster.
func NewClusterDispatcher(client clusterClient, conn *grpc.ClientConn, config ClusterDispatcherConfig) dispatch.Dispatcher {
func NewClusterDispatcher(client ClusterClient, conn *grpc.ClientConn, config ClusterDispatcherConfig, secondaryDispatch map[string]SecondaryDispatch, secondaryDispatchExprs map[string]*DispatchExpr) dispatch.Dispatcher {
keyHandler := config.KeyHandler
if keyHandler == nil {
keyHandler = &keys.DirectKeyHandler{}
Expand All @@ -53,14 +75,18 @@ func NewClusterDispatcher(client clusterClient, conn *grpc.ClientConn, config Cl
conn: conn,
keyHandler: keyHandler,
dispatchOverallTimeout: dispatchOverallTimeout,
secondaryDispatch: secondaryDispatch,
secondaryDispatchExprs: secondaryDispatchExprs,
}
}

type clusterDispatcher struct {
clusterClient clusterClient
clusterClient ClusterClient
conn *grpc.ClientConn
keyHandler keys.Handler
dispatchOverallTimeout time.Duration
secondaryDispatch map[string]SecondaryDispatch
secondaryDispatchExprs map[string]*DispatchExpr
}

func (cr *clusterDispatcher) DispatchCheck(ctx context.Context, req *v1.DispatchCheckRequest) (*v1.DispatchCheckResponse, error) {
Expand All @@ -75,18 +101,120 @@ func (cr *clusterDispatcher) DispatchCheck(ctx context.Context, req *v1.Dispatch

ctx = context.WithValue(ctx, consistent.CtxKey, requestKey)

withTimeout, cancelFn := context.WithTimeout(ctx, cr.dispatchOverallTimeout)
defer cancelFn()
resp, err := dispatchRequest(ctx, cr, "check", req, func(ctx context.Context, client ClusterClient) (*v1.DispatchCheckResponse, error) {
resp, err := client.DispatchCheck(ctx, req)
if err != nil {
return resp, err
}

resp, err := cr.clusterClient.DispatchCheck(withTimeout, req)
err = adjustMetadataForDispatch(resp.Metadata)
return resp, err
})
if err != nil {
return &v1.DispatchCheckResponse{Metadata: requestFailureMetadata}, err
}

err = adjustMetadataForDispatch(resp.Metadata)
return resp, err
}

type requestMessage interface {
zerolog.LogObjectMarshaler

GetMetadata() *v1.ResolverMeta
}

type responseMessage interface {
proto.Message

GetMetadata() *v1.ResponseMeta
}

type respTuple[S responseMessage] struct {
resp S
err error
}

type secondaryRespTuple[S responseMessage] struct {
handlerName string
resp S
}

func dispatchRequest[Q requestMessage, S responseMessage](ctx context.Context, cr *clusterDispatcher, reqKey string, req Q, handler func(context.Context, ClusterClient) (S, error)) (S, error) {
withTimeout, cancelFn := context.WithTimeout(ctx, cr.dispatchOverallTimeout)
defer cancelFn()

if len(cr.secondaryDispatchExprs) == 0 || len(cr.secondaryDispatch) == 0 {
return handler(withTimeout, cr.clusterClient)
}

// If no secondary dispatches are defined, just invoke directly.
expr, ok := cr.secondaryDispatchExprs[reqKey]
if !ok {
return handler(withTimeout, cr.clusterClient)
}

// Otherwise invoke in parallel with any secondary matches.
primaryResultChan := make(chan respTuple[S], 1)
secondaryResultChan := make(chan secondaryRespTuple[S], len(cr.secondaryDispatch))

// Run the main dispatch.
go func() {
resp, err := handler(withTimeout, cr.clusterClient)
primaryResultChan <- respTuple[S]{resp, err}
}()

result, err := RunDispatchExpr(expr, req)
if err != nil {
log.Warn().Err(err).Msg("error when trying to evaluate the dispatch expression")
}

log.Trace().Str("secondary-dispatchers", strings.Join(result, ",")).Object("request", req).Msg("running secondary dispatchers")

for _, secondaryDispatchName := range result {
secondary, ok := cr.secondaryDispatch[secondaryDispatchName]
if !ok {
log.Warn().Str("secondary-dispatcher-name", secondaryDispatchName).Msg("received unknown secondary dispatcher")
continue
}

log.Trace().Str("secondary-dispatcher", secondary.Name).Object("request", req).Msg("running secondary dispatcher")
go func() {
resp, err := handler(withTimeout, secondary.Client)
if err != nil {
// For secondary dispatches, ignore any errors, as only the primary will be handled in
// that scenario.
log.Trace().Str("secondary", secondary.Name).Err(err).Msg("got ignored secondary dispatch error")
return
}

secondaryResultChan <- secondaryRespTuple[S]{resp: resp, handlerName: secondary.Name}
}()
}

var foundError error
select {
case <-withTimeout.Done():
return *new(S), fmt.Errorf("check dispatch has timed out")

case r := <-primaryResultChan:
if r.err == nil {
dispatchCounter.WithLabelValues(reqKey, "(primary)").Add(1)
return r.resp, nil
}

// Otherwise, if an error was found, log it and we'll return after *all* the secondaries have run.
// This allows an otherwise error-state to be handled by one of the secondaries.
foundError = r.err

case r := <-secondaryResultChan:
dispatchCounter.WithLabelValues(reqKey, r.handlerName).Add(1)
return r.resp, nil
}

dispatchCounter.WithLabelValues(reqKey, "(primary)").Add(1)
return *new(S), foundError
}

func adjustMetadataForDispatch(metadata *v1.ResponseMeta) error {
if metadata == nil {
return spiceerrors.MustBugf("received a nil metadata")
Expand Down
Loading

0 comments on commit ddbecd6

Please sign in to comment.