Skip to content

Commit

Permalink
Merge pull request #1685 from josephschorr/schema-watch-integration-test
Browse files Browse the repository at this point in the history
Add an integration test for schema watch
  • Loading branch information
josephschorr authored Dec 14, 2023
2 parents 8e677b2 + 9c744ce commit 280f3e1
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 6 deletions.
127 changes: 127 additions & 0 deletions cmd/spicedb/schemawatch_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
//go:build docker && image
// +build docker,image

package main

import (
"context"
"fmt"
"testing"
"time"

"github.com/google/uuid"
"github.com/ory/dockertest/v3"
"github.com/ory/dockertest/v3/docker"
"github.com/stretchr/testify/require"

testdatastore "github.com/authzed/spicedb/internal/testserver/datastore"
"github.com/authzed/spicedb/pkg/datastore"
)

func TestSchemaWatch(t *testing.T) {
engines := map[string]bool{
"postgres": false,
"mysql": false,
"cockroachdb": true,
"spanner": false,
}
require.Equal(t, len(engines), len(datastore.Engines))

for driverName, shouldRun := range engines {
if !shouldRun {
continue
}

t.Run(driverName, func(t *testing.T) {
bridgeNetworkName := fmt.Sprintf("bridge-%s", uuid.New().String())

pool, err := dockertest.NewPool("")
require.NoError(t, err)

// Create a bridge network for testing.
network, err := pool.Client.CreateNetwork(docker.CreateNetworkOptions{
Name: bridgeNetworkName,
})
require.NoError(t, err)
t.Cleanup(func() {
pool.Client.RemoveNetwork(network.ID)
})

engine := testdatastore.RunDatastoreEngineWithBridge(t, driverName, bridgeNetworkName)

envVars := []string{}
if wev, ok := engine.(testdatastore.RunningEngineForTestWithEnvVars); ok {
envVars = wev.ExternalEnvVars()
}

// Run the migrate command and wait for it to complete.
db := engine.NewDatabase(t)
migrateResource, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: "authzed/spicedb",
Tag: "ci",
Cmd: []string{"migrate", "head", "--datastore-engine", driverName, "--datastore-conn-uri", db},
NetworkID: bridgeNetworkName,
Env: envVars,
}, func(config *docker.HostConfig) {
config.RestartPolicy = docker.RestartPolicy{
Name: "no",
}
})
require.NoError(t, err)

waitCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

// Ensure the command completed successfully.
status, err := pool.Client.WaitContainerWithContext(migrateResource.Container.ID, waitCtx)
require.NoError(t, err)
require.Equal(t, 0, status)

// Run a serve and immediately close, ensuring it shuts down gracefully.
serveResource, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: "authzed/spicedb",
Tag: "ci",
Cmd: []string{"serve", "--grpc-preshared-key", "firstkey", "--datastore-engine", driverName, "--datastore-conn-uri", db, "--datastore-gc-interval", "1s", "--telemetry-endpoint", "", "--log-level", "trace", "--enable-experimental-watchable-schema-cache"},
NetworkID: bridgeNetworkName,
Env: envVars,
}, func(config *docker.HostConfig) {
config.RestartPolicy = docker.RestartPolicy{
Name: "no",
}
})
require.NoError(t, err)
t.Cleanup(func() {
_ = pool.Purge(serveResource)
})

ww := &watchingWriter{make(chan bool, 1), "starting watching cache"}

// Grab logs and ensure GC has run before starting a graceful shutdown.
opts := docker.LogsOptions{
Context: context.Background(),
Stderr: true,
Stdout: true,
Follow: true,
Timestamps: true,
RawTerminal: true,
Container: serveResource.Container.ID,
OutputStream: ww,
}

go (func() {
err = pool.Client.Logs(opts)
require.NoError(t, err)
})()

select {
case <-ww.c:
break

case <-time.After(10 * time.Second):
require.Fail(t, "timed out waiting for schema watch to run")
}

require.True(t, gracefulShutdown(pool, serveResource))
})
}
}
8 changes: 4 additions & 4 deletions cmd/spicedb/serve_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,12 @@ func TestGracefulShutdownInMemory(t *testing.T) {
}

type watchingWriter struct {
c chan bool
c chan bool
expectedString string
}

func (ww *watchingWriter) Write(p []byte) (n int, err error) {
// Ensure GC ran.
if strings.Contains(string(p), "running garbage collection worker") {
if strings.Contains(string(p), ww.expectedString) {
ww.c <- true
}

Expand Down Expand Up @@ -218,7 +218,7 @@ func TestGracefulShutdown(t *testing.T) {
})

if awaitGC {
ww := &watchingWriter{make(chan bool, 1)}
ww := &watchingWriter{make(chan bool, 1), "running garbage collection worker"}

// Grab logs and ensure GC has run before starting a graceful shutdown.
opts := docker.LogsOptions{
Expand Down
4 changes: 2 additions & 2 deletions internal/datastore/proxy/schemacaching/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ func NewCachingDatastoreProxy(delegate datastore.Datastore, c cache.Cache, gcWin
// Try to instantiate a schema cache that reads updates from the datastore's schema watch stream. If not possible,
// fallback to the just-in-time caching proxy.
if watchable := datastore.UnwrapAs[datastore.SchemaWatchableDatastore](delegate); watchable != nil {
log.Info().Type("datastore-type", watchable).Msg("enabled schema caching")
log.Info().Type("datastore-type", watchable).Msg("schema watch caching enabled")
return createWatchingCacheProxy(watchable, c, gcWindow)
}

log.Info().Type("datastore-type", delegate).Msg("schema watch was enabled but datastore does not support it; falling back to just-in-time caching")
log.Info().Type("datastore-type", delegate).Msg("schema watch caching was requested but datastore does not support it; falling back to just-in-time caching")
return &definitionCachingProxy{
Datastore: delegate,
c: c,
Expand Down

0 comments on commit 280f3e1

Please sign in to comment.