Skip to content

Commit

Permalink
balancer: rewrite the consistent hashring balancer to avoid recomputates
Browse files Browse the repository at this point in the history
The previous implementation was recomputing the hashring any time a
subconnection moved from ready->idle or back, which happened frequently.

The new implementation includes idle and connecting subconns in the ring,
and triggers a connection if one is selected. It also adds/removes from
a long-lived ring instead of recomputing a ring from scratch each time.

ReplicationFactor and Spread can now be passed in as part of the
service config instead of registered globally with the balancer
  • Loading branch information
ecordell committed May 11, 2023
1 parent 09313c9 commit 1ca1002
Show file tree
Hide file tree
Showing 6 changed files with 267 additions and 100 deletions.
3 changes: 1 addition & 2 deletions cmd/spicedb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
_ "google.golang.org/grpc/xds"

log "github.com/authzed/spicedb/internal/logging"
consistentbalancer "github.com/authzed/spicedb/pkg/balancer"
"github.com/authzed/spicedb/pkg/cmd"
cmdutil "github.com/authzed/spicedb/pkg/cmd/server"
"github.com/authzed/spicedb/pkg/cmd/testserver"
Expand All @@ -24,7 +23,7 @@ func main() {
kuberesolver.RegisterInCluster()

// Enable consistent hashring gRPC load balancer
balancer.Register(consistentbalancer.NewConsistentHashringBuilder(cmdutil.ConsistentHashringPicker))
balancer.Register(cmdutil.ConsistentHashringBuilder)

log.SetGlobalLogger(zerolog.New(os.Stdout))

Expand Down
2 changes: 1 addition & 1 deletion internal/middleware/streamtimeout/streamtimeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (s *sendWrapper) Context() context.Context {
return s.ctx
}

func (s *sendWrapper) SetTrailer(trailer metadata.MD) {
func (s *sendWrapper) SetTrailer(_ metadata.MD) {
s.timer.Stop()
}

Expand Down
9 changes: 5 additions & 4 deletions internal/testserver/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@ var testResolverBuilder = &SafeManualResolverBuilder{}

func init() {
// register hashring balancer
balancer.Register(hashbalancer.NewConsistentHashringBuilder(
hashbalancer.NewConsistentHashringPickerBuilder(xxhash.Sum64, 1500, 1)),
)
balancer.Register(hashbalancer.NewConsistentHashringBuilder(xxhash.Sum64))

// Register a manual resolver.Builder that we can feed addresses for tests
// Registration is not thread safe, so we register a single resolver.Builder
Expand Down Expand Up @@ -168,7 +166,10 @@ func TestClusterWithDispatchAndCacheConfig(t testing.TB, size uint, ds datastore
combineddispatch.UpstreamAddr("test://" + prefix),
combineddispatch.PrometheusSubsystem(fmt.Sprintf("%s_%d_client_dispatch", prefix, i)),
combineddispatch.GrpcDialOpts(
grpc.WithDefaultServiceConfig(hashbalancer.BalancerServiceConfig),
grpc.WithDefaultServiceConfig((&hashbalancer.ConsistentHashringBalancerConfig{
ReplicationFactor: 1500,
Spread: 1,
}).MustToServiceConfigJSON()),
grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) {
// it's possible grpc tries to dial before we have set the
// buffconn dialers, we have to return a "TempError" so that
Expand Down
Loading

0 comments on commit 1ca1002

Please sign in to comment.