diff --git a/cmd/spicedb/schemawatch_integration_test.go b/cmd/spicedb/schemawatch_integration_test.go new file mode 100644 index 0000000000..75922d8534 --- /dev/null +++ b/cmd/spicedb/schemawatch_integration_test.go @@ -0,0 +1,127 @@ +//go:build docker && image +// +build docker,image + +package main + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/google/uuid" + "github.com/ory/dockertest/v3" + "github.com/ory/dockertest/v3/docker" + "github.com/stretchr/testify/require" + + testdatastore "github.com/authzed/spicedb/internal/testserver/datastore" + "github.com/authzed/spicedb/pkg/datastore" +) + +func TestSchemaWatch(t *testing.T) { + engines := map[string]bool{ + "postgres": false, + "mysql": false, + "cockroachdb": true, + "spanner": false, + } + require.Equal(t, len(engines), len(datastore.Engines)) + + for driverName, shouldRun := range engines { + if !shouldRun { + continue + } + + t.Run(driverName, func(t *testing.T) { + bridgeNetworkName := fmt.Sprintf("bridge-%s", uuid.New().String()) + + pool, err := dockertest.NewPool("") + require.NoError(t, err) + + // Create a bridge network for testing. + network, err := pool.Client.CreateNetwork(docker.CreateNetworkOptions{ + Name: bridgeNetworkName, + }) + require.NoError(t, err) + t.Cleanup(func() { + pool.Client.RemoveNetwork(network.ID) + }) + + engine := testdatastore.RunDatastoreEngineWithBridge(t, driverName, bridgeNetworkName) + + envVars := []string{} + if wev, ok := engine.(testdatastore.RunningEngineForTestWithEnvVars); ok { + envVars = wev.ExternalEnvVars() + } + + // Run the migrate command and wait for it to complete. + db := engine.NewDatabase(t) + migrateResource, err := pool.RunWithOptions(&dockertest.RunOptions{ + Repository: "authzed/spicedb", + Tag: "ci", + Cmd: []string{"migrate", "head", "--datastore-engine", driverName, "--datastore-conn-uri", db}, + NetworkID: bridgeNetworkName, + Env: envVars, + }, func(config *docker.HostConfig) { + config.RestartPolicy = docker.RestartPolicy{ + Name: "no", + } + }) + require.NoError(t, err) + + waitCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + // Ensure the command completed successfully. + status, err := pool.Client.WaitContainerWithContext(migrateResource.Container.ID, waitCtx) + require.NoError(t, err) + require.Equal(t, 0, status) + + // Run a serve and immediately close, ensuring it shuts down gracefully. + serveResource, err := pool.RunWithOptions(&dockertest.RunOptions{ + Repository: "authzed/spicedb", + Tag: "ci", + Cmd: []string{"serve", "--grpc-preshared-key", "firstkey", "--datastore-engine", driverName, "--datastore-conn-uri", db, "--datastore-gc-interval", "1s", "--telemetry-endpoint", "", "--log-level", "trace", "--enable-experimental-watchable-schema-cache"}, + NetworkID: bridgeNetworkName, + Env: envVars, + }, func(config *docker.HostConfig) { + config.RestartPolicy = docker.RestartPolicy{ + Name: "no", + } + }) + require.NoError(t, err) + t.Cleanup(func() { + _ = pool.Purge(serveResource) + }) + + ww := &watchingWriter{make(chan bool, 1), "starting watching cache"} + + // Grab logs and ensure GC has run before starting a graceful shutdown. + opts := docker.LogsOptions{ + Context: context.Background(), + Stderr: true, + Stdout: true, + Follow: true, + Timestamps: true, + RawTerminal: true, + Container: serveResource.Container.ID, + OutputStream: ww, + } + + go (func() { + err = pool.Client.Logs(opts) + require.NoError(t, err) + })() + + select { + case <-ww.c: + break + + case <-time.After(10 * time.Second): + require.Fail(t, "timed out waiting for schema watch to run") + } + + require.True(t, gracefulShutdown(pool, serveResource)) + }) + } +} diff --git a/cmd/spicedb/serve_integration_test.go b/cmd/spicedb/serve_integration_test.go index e3be1c17c1..f8e9840e63 100644 --- a/cmd/spicedb/serve_integration_test.go +++ b/cmd/spicedb/serve_integration_test.go @@ -133,12 +133,12 @@ func TestGracefulShutdownInMemory(t *testing.T) { } type watchingWriter struct { - c chan bool + c chan bool + expectedString string } func (ww *watchingWriter) Write(p []byte) (n int, err error) { - // Ensure GC ran. - if strings.Contains(string(p), "running garbage collection worker") { + if strings.Contains(string(p), ww.expectedString) { ww.c <- true } @@ -218,7 +218,7 @@ func TestGracefulShutdown(t *testing.T) { }) if awaitGC { - ww := &watchingWriter{make(chan bool, 1)} + ww := &watchingWriter{make(chan bool, 1), "running garbage collection worker"} // Grab logs and ensure GC has run before starting a graceful shutdown. opts := docker.LogsOptions{ diff --git a/internal/datastore/proxy/schemacaching/caching.go b/internal/datastore/proxy/schemacaching/caching.go index 9259e19cfe..43110844a7 100644 --- a/internal/datastore/proxy/schemacaching/caching.go +++ b/internal/datastore/proxy/schemacaching/caching.go @@ -52,11 +52,11 @@ func NewCachingDatastoreProxy(delegate datastore.Datastore, c cache.Cache, gcWin // Try to instantiate a schema cache that reads updates from the datastore's schema watch stream. If not possible, // fallback to the just-in-time caching proxy. if watchable := datastore.UnwrapAs[datastore.SchemaWatchableDatastore](delegate); watchable != nil { - log.Info().Type("datastore-type", watchable).Msg("enabled schema caching") + log.Info().Type("datastore-type", watchable).Msg("schema watch caching enabled") return createWatchingCacheProxy(watchable, c, gcWindow) } - log.Info().Type("datastore-type", delegate).Msg("schema watch was enabled but datastore does not support it; falling back to just-in-time caching") + log.Info().Type("datastore-type", delegate).Msg("schema watch caching was requested but datastore does not support it; falling back to just-in-time caching") return &definitionCachingProxy{ Datastore: delegate, c: c,