diff --git a/internal/datastore/common/helpers.go b/internal/datastore/common/helpers.go index 68251ae769..0d8c87001f 100644 --- a/internal/datastore/common/helpers.go +++ b/internal/datastore/common/helpers.go @@ -25,7 +25,7 @@ func WriteTuples(ctx context.Context, ds datastore.Datastore, op core.RelationTu // UpdateTuplesInDatastore is a convenience method to perform multiple relation update operations on a Datastore func UpdateTuplesInDatastore(ctx context.Context, ds datastore.Datastore, updates ...*core.RelationTupleUpdate) (datastore.Revision, error) { - return ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + return ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { return rwt.WriteRelationships(ctx, updates) }) } diff --git a/internal/datastore/common/revisions/optimized.go b/internal/datastore/common/revisions/optimized.go index c251a5f934..f73275bd3b 100644 --- a/internal/datastore/common/revisions/optimized.go +++ b/internal/datastore/common/revisions/optimized.go @@ -9,6 +9,7 @@ import ( "github.com/benbjohnson/clock" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" "golang.org/x/sync/singleflight" log "github.com/authzed/spicedb/internal/logging" @@ -37,9 +38,7 @@ func (cor *CachedOptimizedRevisions) SetOptimizedRevisionFunc(revisionFunc Optim } func (cor *CachedOptimizedRevisions) OptimizedRevision(ctx context.Context) (datastore.Revision, error) { - ctx, span := tracer.Start(ctx, "OptimizedRevision") - defer span.End() - + span := trace.SpanFromContext(ctx) localNow := cor.clockFn.Now() // Subtract a random amount of time from now, to let barely expired candidates get selected diff --git a/internal/datastore/common/sql.go b/internal/datastore/common/sql.go index 7490db9c63..8ff329dce0 100644 --- a/internal/datastore/common/sql.go +++ b/internal/datastore/common/sql.go @@ -496,8 +496,6 @@ func (tqs QueryExecutor) ExecuteQuery( query SchemaQueryFilterer, opts ...options.QueryOptionsOption, ) (datastore.RelationshipIterator, error) { - ctx, span := tracer.Start(ctx, "ExecuteQuery") - defer span.End() queryOpts := options.NewQueryOptionsWithOptions(opts...) query = query.TupleOrder(queryOpts.Sort) diff --git a/internal/datastore/crdb/crdb.go b/internal/datastore/crdb/crdb.go index 6036054f75..954c5f5026 100644 --- a/internal/datastore/crdb/crdb.go +++ b/internal/datastore/crdb/crdb.go @@ -306,7 +306,7 @@ func (cds *crdbDatastore) ReadWriteTx( 0, } - if err := f(rwt); err != nil { + if err := f(ctx, rwt); err != nil { return err } diff --git a/internal/datastore/crdb/pool_test.go b/internal/datastore/crdb/pool_test.go index bd79676982..4041c34dbe 100644 --- a/internal/datastore/crdb/pool_test.go +++ b/internal/datastore/crdb/pool_test.go @@ -121,7 +121,7 @@ func TestTxReset(t *testing.T) { // WriteNamespace utilizes execute so we'll use it i := 0 - rev, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + rev, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { if i < len(tt.errors) { defer func() { i++ }() return tt.errors[i] diff --git a/internal/datastore/memdb/memdb.go b/internal/datastore/memdb/memdb.go index 162c827c54..a54fdcc480 100644 --- a/internal/datastore/memdb/memdb.go +++ b/internal/datastore/memdb/memdb.go @@ -131,7 +131,7 @@ func (mdb *memdbDatastore) SnapshotReader(revisionRaw datastore.Revision) datast } func (mdb *memdbDatastore) ReadWriteTx( - _ context.Context, + ctx context.Context, f datastore.TxUserFunc, opts ...options.RWTOptionsOption, ) (datastore.Revision, error) { @@ -170,7 +170,7 @@ func (mdb *memdbDatastore) ReadWriteTx( newRevision := mdb.newRevisionID() rwt := &memdbReadWriteTx{memdbReader{&sync.Mutex{}, txSrc, nil}, newRevision} - if err := f(rwt); err != nil { + if err := f(ctx, rwt); err != nil { mdb.Lock() if tx != nil { tx.Abort() diff --git a/internal/datastore/memdb/memdb_test.go b/internal/datastore/memdb/memdb_test.go index 20d9ea4176..238ecee9e6 100644 --- a/internal/datastore/memdb/memdb_test.go +++ b/internal/datastore/memdb/memdb_test.go @@ -46,7 +46,7 @@ func TestConcurrentWritePanic(t *testing.T) { numPanics := uint64(0) require.Eventually(func() bool { - _, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { g := errgroup.Group{} g.Go(func() (err error) { defer func() { @@ -94,7 +94,7 @@ func TestConcurrentWriteRelsError(t *testing.T) { for i := 0; i < 50; i++ { i := i g.Go(func() error { - _, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + _, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { updates := []*corev1.RelationTupleUpdate{} for j := 0; j < 500; j++ { updates = append(updates, &corev1.RelationTupleUpdate{ diff --git a/internal/datastore/mysql/datastore.go b/internal/datastore/mysql/datastore.go index 5c9473e8cb..8bfb94d475 100644 --- a/internal/datastore/mysql/datastore.go +++ b/internal/datastore/mysql/datastore.go @@ -314,7 +314,7 @@ func (mds *Datastore) ReadWriteTx( newTxnID, } - return fn(rwt) + return fn(ctx, rwt) }); err != nil { if !config.DisableRetries && isErrorRetryable(err) { continue diff --git a/internal/datastore/mysql/datastore_test.go b/internal/datastore/mysql/datastore_test.go index c92740fa72..5a93158be6 100644 --- a/internal/datastore/mysql/datastore_test.go +++ b/internal/datastore/mysql/datastore_test.go @@ -181,7 +181,7 @@ func GarbageCollectionTest(t *testing.T, ds datastore.Datastore) { req.True(r.IsReady) // Write basic namespaces. - writtenAt, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + writtenAt, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { return rwt.WriteNamespaces( ctx, namespace.Namespace( @@ -202,7 +202,7 @@ func GarbageCollectionTest(t *testing.T, ds datastore.Datastore) { req.Zero(removed.Namespaces) // Replace the namespace with a new one. - writtenAt, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + writtenAt, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { return rwt.WriteNamespaces( ctx, namespace.Namespace( @@ -322,7 +322,7 @@ func GarbageCollectionByTimeTest(t *testing.T, ds datastore.Datastore) { req.True(r.IsReady) // Write basic namespaces. - _, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { return rwt.WriteNamespaces( ctx, namespace.Namespace( @@ -419,7 +419,7 @@ func NoRelationshipsGarbageCollectionTest(t *testing.T, ds datastore.Datastore) req.True(r.IsReady) // Write basic namespaces. - _, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { return rwt.WriteNamespaces( ctx, namespace.Namespace( @@ -456,7 +456,7 @@ func ChunkedGarbageCollectionTest(t *testing.T, ds datastore.Datastore) { req.True(r.IsReady) // Write basic namespaces. - _, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { return rwt.WriteNamespaces( ctx, namespace.Namespace( diff --git a/internal/datastore/postgres/postgres.go b/internal/datastore/postgres/postgres.go index 5ed8917db9..1a1b5a063c 100644 --- a/internal/datastore/postgres/postgres.go +++ b/internal/datastore/postgres/postgres.go @@ -347,7 +347,7 @@ func (pgd *pgDatastore) ReadWriteTx( newXID, } - return fn(rwt) + return fn(ctx, rwt) })) if err != nil { diff --git a/internal/datastore/postgres/postgres_test.go b/internal/datastore/postgres/postgres_test.go index 891073d026..a80f9f3072 100644 --- a/internal/datastore/postgres/postgres_test.go +++ b/internal/datastore/postgres/postgres_test.go @@ -239,7 +239,7 @@ func GarbageCollectionTest(t *testing.T, ds datastore.Datastore) { r, err := ds.ReadyState(ctx) require.NoError(err) require.True(r.IsReady) - firstWrite, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + firstWrite, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { // Write basic namespaces. return rwt.WriteNamespaces(ctx, namespace.Namespace( "resource", @@ -258,7 +258,7 @@ func GarbageCollectionTest(t *testing.T, ds datastore.Datastore) { require.Zero(removed.Namespaces) // Replace the namespace with a new one. - updateTwoNamespaces, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + updateTwoNamespaces, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { return rwt.WriteNamespaces( ctx, namespace.Namespace( @@ -367,7 +367,7 @@ func GarbageCollectionTest(t *testing.T, ds datastore.Datastore) { tRequire.TupleExists(ctx, tpl, relLastWriteAt) // Inject a transaction to clean up the last write - lastRev, err := pds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + lastRev, err := pds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { return nil }) require.NoError(err) @@ -426,7 +426,7 @@ func GarbageCollectionByTimeTest(t *testing.T, ds datastore.Datastore) { require.NoError(err) require.True(r.IsReady) // Write basic namespaces. - _, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { return rwt.WriteNamespaces(ctx, namespace.Namespace( "resource", namespace.MustRelation("reader", nil), @@ -469,7 +469,7 @@ func GarbageCollectionByTimeTest(t *testing.T, ds datastore.Datastore) { require.NoError(err) // Inject a revision to sweep up the last revision - _, err = pds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + _, err = pds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { return nil }) require.NoError(err) @@ -501,7 +501,7 @@ func ChunkedGarbageCollectionTest(t *testing.T, ds datastore.Datastore) { require.NoError(err) require.True(r.IsReady) // Write basic namespaces. - _, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { return rwt.WriteNamespaces(ctx, namespace.Namespace( "resource", namespace.MustRelation("reader", nil), @@ -549,7 +549,7 @@ func ChunkedGarbageCollectionTest(t *testing.T, ds datastore.Datastore) { require.NoError(err) // Inject a revision to sweep up the last revision - _, err = pds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + _, err = pds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { return nil }) require.NoError(err) @@ -703,7 +703,7 @@ func ConcurrentRevisionHeadTest(t *testing.T, ds datastore.Datastore) { require.NoError(err) require.True(r.IsReady) // Write basic namespaces. - _, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { return rwt.WriteNamespaces(ctx, namespace.Namespace( "resource", namespace.MustRelation("reader", nil), @@ -719,7 +719,7 @@ func ConcurrentRevisionHeadTest(t *testing.T, ds datastore.Datastore) { var commitLastRev, commitFirstRev datastore.Revision g.Go(func() error { var err error - commitLastRev, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + commitLastRev, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { rtu := tuple.Touch(&core.RelationTuple{ ResourceAndRelation: &core.ObjectAndRelation{ Namespace: "resource", @@ -746,7 +746,7 @@ func ConcurrentRevisionHeadTest(t *testing.T, ds datastore.Datastore) { <-waitToStart - commitFirstRev, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + commitFirstRev, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { rtu := tuple.Touch(&core.RelationTuple{ ResourceAndRelation: &core.ObjectAndRelation{ Namespace: "resource", @@ -805,7 +805,7 @@ func ConcurrentRevisionWatchTest(t *testing.T, ds datastore.Datastore) { require.True(r.IsReady) // Write basic namespaces. - rev, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + rev, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { return rwt.WriteNamespaces(ctx, namespace.Namespace( "resource", namespace.MustRelation("reader", nil), @@ -852,7 +852,7 @@ func ConcurrentRevisionWatchTest(t *testing.T, ds datastore.Datastore) { var commitLastRev, commitFirstRev datastore.Revision g.Go(func() error { var err error - commitLastRev, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + commitLastRev, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { err = rwt.WriteRelationships(ctx, []*core.RelationTupleUpdate{ tuple.Touch(tuple.MustParse("something:001#viewer@user:123")), tuple.Touch(tuple.MustParse("something:002#viewer@user:123")), @@ -871,7 +871,7 @@ func ConcurrentRevisionWatchTest(t *testing.T, ds datastore.Datastore) { <-waitToStart - commitFirstRev, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + commitFirstRev, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { return rwt.WriteRelationships(ctx, []*core.RelationTupleUpdate{ tuple.Touch(tuple.MustParse("resource:1001#reader@user:456")), tuple.Touch(tuple.MustParse("resource:1002#reader@user:456")), @@ -889,7 +889,7 @@ func ConcurrentRevisionWatchTest(t *testing.T, ds datastore.Datastore) { require.False(commitFirstRev.Equal(commitLastRev)) // Write another revision. - afterRev, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + afterRev, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { rtu := tuple.Touch(&core.RelationTuple{ ResourceAndRelation: &core.ObjectAndRelation{ Namespace: "resource", @@ -1031,7 +1031,7 @@ func RevisionInversionTest(t *testing.T, ds datastore.Datastore) { require.NoError(err) require.True(r.IsReady) // Write basic namespaces. - _, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { return rwt.WriteNamespaces(ctx, namespace.Namespace( "resource", namespace.MustRelation("reader", nil), @@ -1047,7 +1047,7 @@ func RevisionInversionTest(t *testing.T, ds datastore.Datastore) { var commitLastRev, commitFirstRev datastore.Revision g.Go(func() error { var err error - commitLastRev, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + commitLastRev, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { rtu := tuple.Touch(&core.RelationTuple{ ResourceAndRelation: &core.ObjectAndRelation{ Namespace: "resource", @@ -1074,7 +1074,7 @@ func RevisionInversionTest(t *testing.T, ds datastore.Datastore) { <-waitToStart - commitFirstRev, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + commitFirstRev, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { rtu := tuple.Touch(&core.RelationTuple{ ResourceAndRelation: &core.ObjectAndRelation{ Namespace: "resource", @@ -1112,7 +1112,7 @@ func OTelTracingTest(t *testing.T, ds datastore.Datastore) { testTraceProvider.RegisterSpanProcessor(spanrecorder) // Perform basic operation - _, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { return rwt.WriteNamespaces(ctx, namespace.Namespace("resource")) }) require.NoError(err) @@ -1207,7 +1207,7 @@ func datastoreWithInterceptorAndTestData(t *testing.T, interceptor pgcommon.Quer // Write namespaces and a few thousand relationships. ctx := context.Background() for i := 0; i < 1000; i++ { - _, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + _, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { err := rwt.WriteNamespaces(ctx, namespace.Namespace( fmt.Sprintf("resource%d", i), namespace.MustRelation("reader", nil))) @@ -1262,7 +1262,7 @@ func datastoreWithInterceptorAndTestData(t *testing.T, interceptor pgcommon.Quer // Delete some relationships. for i := 990; i < 1000; i++ { - _, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + _, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { rtu := tuple.Delete(&core.RelationTuple{ ResourceAndRelation: &core.ObjectAndRelation{ Namespace: testfixtures.DocumentNS.Name, @@ -1295,7 +1295,7 @@ func datastoreWithInterceptorAndTestData(t *testing.T, interceptor pgcommon.Quer // Write some more relationships. for i := 1000; i < 1100; i++ { - _, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + _, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { // Write some relationships. rtu := tuple.Touch(&core.RelationTuple{ ResourceAndRelation: &core.ObjectAndRelation{ diff --git a/internal/datastore/proxy/observable.go b/internal/datastore/proxy/observable.go index 631f770fa3..5c16d18c6c 100644 --- a/internal/datastore/proxy/observable.go +++ b/internal/datastore/proxy/observable.go @@ -77,8 +77,8 @@ func (p *observableProxy) ReadWriteTx( f datastore.TxUserFunc, opts ...options.RWTOptionsOption, ) (datastore.Revision, error) { - return p.delegate.ReadWriteTx(ctx, func(delegateRWT datastore.ReadWriteTransaction) error { - return f(&observableRWT{&observableReader{delegateRWT}, delegateRWT}) + return p.delegate.ReadWriteTx(ctx, func(ctx context.Context, delegateRWT datastore.ReadWriteTransaction) error { + return f(ctx, &observableRWT{&observableReader{delegateRWT}, delegateRWT}) }, opts...) } @@ -193,7 +193,11 @@ func (r *observableReader) ReadNamespaceByName(ctx context.Context, nsName strin } func (r *observableReader) QueryRelationships(ctx context.Context, filter datastore.RelationshipsFilter, options ...options.QueryOptionsOption) (datastore.RelationshipIterator, error) { - ctx, closer := observe(ctx, "QueryRelationships") + ctx, closer := observe(ctx, "QueryRelationships", trace.WithAttributes( + attribute.String("resourceType", filter.ResourceType), + attribute.String("resourceRelation", filter.OptionalResourceRelation), + attribute.String("caveatName", filter.OptionalCaveatName), + )) iterator, err := r.delegate.QueryRelationships(ctx, filter, options...) if err != nil { diff --git a/internal/datastore/proxy/proxy_test/mock.go b/internal/datastore/proxy/proxy_test/mock.go index 94d20466f9..58759388a3 100644 --- a/internal/datastore/proxy/proxy_test/mock.go +++ b/internal/datastore/proxy/proxy_test/mock.go @@ -21,14 +21,14 @@ func (dm *MockDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader } func (dm *MockDatastore) ReadWriteTx( - _ context.Context, + ctx context.Context, f datastore.TxUserFunc, opts ...options.RWTOptionsOption, ) (datastore.Revision, error) { args := dm.Called(opts) mockRWT := args.Get(0).(datastore.ReadWriteTransaction) - if err := f(mockRWT); err != nil { + if err := f(ctx, mockRWT); err != nil { return datastore.NoRevision, err } diff --git a/internal/datastore/proxy/readonly_test.go b/internal/datastore/proxy/readonly_test.go index 941b3de3c5..5ba055cc19 100644 --- a/internal/datastore/proxy/readonly_test.go +++ b/internal/datastore/proxy/readonly_test.go @@ -34,13 +34,13 @@ func TestRWOperationErrors(t *testing.T) { ds := NewReadonlyDatastore(delegate) ctx := context.Background() - rev, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + rev, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { return rwt.DeleteNamespaces(ctx, "fake") }) require.ErrorAs(err, &datastore.ErrReadOnly{}) require.Equal(datastore.NoRevision, rev) - rev, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + rev, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { return rwt.WriteNamespaces(ctx, &core.NamespaceDefinition{Name: "user"}) }) require.ErrorAs(err, &datastore.ErrReadOnly{}) diff --git a/internal/datastore/proxy/schemacaching/standardcache.go b/internal/datastore/proxy/schemacaching/standardcache.go index eb52ed9bb5..9273cb98db 100644 --- a/internal/datastore/proxy/schemacaching/standardcache.go +++ b/internal/datastore/proxy/schemacaching/standardcache.go @@ -40,9 +40,9 @@ func (p *definitionCachingProxy) ReadWriteTx( f datastore.TxUserFunc, opts ...options.RWTOptionsOption, ) (datastore.Revision, error) { - return p.Datastore.ReadWriteTx(ctx, func(delegateRWT datastore.ReadWriteTransaction) error { + return p.Datastore.ReadWriteTx(ctx, func(ctx context.Context, delegateRWT datastore.ReadWriteTransaction) error { rwt := &definitionCachingRWT{delegateRWT, &sync.Map{}} - return f(rwt) + return f(ctx, rwt) }, opts...) } diff --git a/internal/datastore/proxy/schemacaching/standardcaching_test.go b/internal/datastore/proxy/schemacaching/standardcaching_test.go index 4c0de5764d..b3a1e0bc6f 100644 --- a/internal/datastore/proxy/schemacaching/standardcaching_test.go +++ b/internal/datastore/proxy/schemacaching/standardcaching_test.go @@ -219,7 +219,7 @@ func TestRWTCaching(t *testing.T) { ds := NewCachingDatastoreProxy(dsMock, nil, 1*time.Hour, JustInTimeCaching) - rev, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + rev, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { _, updatedA, err := tester.readSingleFunc(ctx, rwt, nsA) require.NoError(err) require.True(zero.Equal(updatedA)) @@ -256,7 +256,7 @@ func TestRWTCacheWithWrites(t *testing.T) { ds := NewCachingDatastoreProxy(dsMock, nil, 1*time.Hour, JustInTimeCaching) - rev, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + rev, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { // Cache the 404 _, _, err := tester.readSingleFunc(ctx, rwt, nsA) require.Error(err, tester.notFoundErr) @@ -374,7 +374,7 @@ func TestSnapshotCachingRealDatastore(t *testing.T) { ds := NewCachingDatastoreProxy(rawDS, nil, 1*time.Hour, JustInTimeCaching) if tc.nsDef != nil { - _, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { err := rwt.WriteNamespaces(ctx, tc.nsDef) if err != nil { return err diff --git a/internal/datastore/spanner/reader.go b/internal/datastore/spanner/reader.go index 0c9a0198e6..1642e182a4 100644 --- a/internal/datastore/spanner/reader.go +++ b/internal/datastore/spanner/reader.go @@ -6,6 +6,8 @@ import ( "time" "cloud.google.com/go/spanner" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc/codes" "github.com/authzed/spicedb/internal/datastore/common" @@ -72,14 +74,14 @@ func (sr spannerReader) ReverseQueryRelationships( func queryExecutor(txSource txFactory) common.ExecuteQueryFunc { return func(ctx context.Context, sql string, args []any) ([]*core.RelationTuple, error) { - ctx, span := tracer.Start(ctx, "ExecuteQuery") - defer span.End() - + span := trace.SpanFromContext(ctx) + span.AddEvent("Query issued to database") iter := txSource().Query(ctx, statementFromSQL(sql, args)) defer iter.Stop() var tuples []*core.RelationTuple + span.AddEvent("start reading iterator") if err := iter.Do(func(row *spanner.Row) error { nextTuple := &core.RelationTuple{ ResourceAndRelation: &core.ObjectAndRelation{}, @@ -113,6 +115,8 @@ func queryExecutor(txSource txFactory) common.ExecuteQueryFunc { return nil, err } + span.AddEvent("finished reading iterator", trace.WithAttributes(attribute.Int("tupleCount", len(tuples)))) + span.SetAttributes(attribute.Int("count", len(tuples))) return tuples, nil } } @@ -155,7 +159,7 @@ func (sr spannerReader) ListAllNamespaces(ctx context.Context) ([]datastore.Revi ) defer iter.Stop() - allNamespaces, err := readAllNamespaces(iter) + allNamespaces, err := readAllNamespaces(iter, trace.SpanFromContext(ctx)) if err != nil { return nil, fmt.Errorf(errUnableToListNamespaces, err) } @@ -181,7 +185,7 @@ func (sr spannerReader) LookupNamespacesWithNames(ctx context.Context, nsNames [ ) defer iter.Stop() - foundNamespaces, err := readAllNamespaces(iter) + foundNamespaces, err := readAllNamespaces(iter, trace.SpanFromContext(ctx)) if err != nil { return nil, fmt.Errorf(errUnableToListNamespaces, err) } @@ -189,8 +193,9 @@ func (sr spannerReader) LookupNamespacesWithNames(ctx context.Context, nsNames [ return foundNamespaces, nil } -func readAllNamespaces(iter *spanner.RowIterator) ([]datastore.RevisionedNamespace, error) { +func readAllNamespaces(iter *spanner.RowIterator, span trace.Span) ([]datastore.RevisionedNamespace, error) { var allNamespaces []datastore.RevisionedNamespace + span.AddEvent("start reading iterator") if err := iter.Do(func(row *spanner.Row) error { var serialized []byte var updated time.Time @@ -212,7 +217,8 @@ func readAllNamespaces(iter *spanner.RowIterator) ([]datastore.RevisionedNamespa }); err != nil { return nil, err } - + span.AddEvent("finished reading iterator", trace.WithAttributes(attribute.Int("namespaceCount", len(allNamespaces)))) + span.SetAttributes(attribute.Int("count", len(allNamespaces))) return allNamespaces, nil } diff --git a/internal/datastore/spanner/revisions.go b/internal/datastore/spanner/revisions.go index d6f2978e22..0c7a3ae5d7 100644 --- a/internal/datastore/spanner/revisions.go +++ b/internal/datastore/spanner/revisions.go @@ -26,9 +26,6 @@ func (sd spannerDatastore) HeadRevision(ctx context.Context) (datastore.Revision } func (sd spannerDatastore) now(ctx context.Context) (time.Time, error) { - ctx, span := tracer.Start(ctx, "now") - defer span.End() - var timestamp time.Time if err := sd.client.Single().Query(ctx, spanner.NewStatement("SELECT CURRENT_TIMESTAMP()")).Do(func(r *spanner.Row) error { return r.Columns(×tamp) diff --git a/internal/datastore/spanner/spanner.go b/internal/datastore/spanner/spanner.go index a59b568da3..b2b95fccae 100644 --- a/internal/datastore/spanner/spanner.go +++ b/internal/datastore/spanner/spanner.go @@ -12,6 +12,8 @@ import ( spb "cloud.google.com/go/spanner/apiv1/spannerpb" sq "github.com/Masterminds/squirrel" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/api/option" "google.golang.org/grpc/codes" @@ -115,11 +117,42 @@ func NewSpannerDatastore(database string, opts ...Option) (datastore.Datastore, return ds, nil } +type traceableRTX struct { + delegate readTX +} + +func (t *traceableRTX) ReadRow(ctx context.Context, table string, key spanner.Key, columns []string) (*spanner.Row, error) { + trace.SpanFromContext(ctx).SetAttributes( + attribute.String("spannerAPI", "ReadOnlyTransaction.ReadRow"), + attribute.String("table", table), + attribute.String("key", key.String()), + attribute.StringSlice("columns", columns)) + + return t.delegate.ReadRow(ctx, table, key, columns) +} + +func (t *traceableRTX) Read(ctx context.Context, table string, keys spanner.KeySet, columns []string) *spanner.RowIterator { + trace.SpanFromContext(ctx).SetAttributes( + attribute.String("spannerAPI", "ReadOnlyTransaction.Read"), + attribute.String("table", table), + attribute.StringSlice("columns", columns)) + + return t.delegate.Read(ctx, table, keys, columns) +} + +func (t *traceableRTX) Query(ctx context.Context, statement spanner.Statement) *spanner.RowIterator { + trace.SpanFromContext(ctx).SetAttributes( + attribute.String("spannerAPI", "ReadOnlyTransaction.Query"), + attribute.String("statement", statement.SQL)) + + return t.delegate.Query(ctx, statement) +} + func (sd spannerDatastore) SnapshotReader(revisionRaw datastore.Revision) datastore.Reader { r := revisionRaw.(revision.Decimal) txSource := func() readTX { - return sd.client.Single().WithTimestampBound(spanner.ReadTimestamp(timestampFromRevision(r))) + return &traceableRTX{delegate: sd.client.Single().WithTimestampBound(spanner.ReadTimestamp(timestampFromRevision(r)))} } executor := common.QueryExecutor{Executor: queryExecutor(txSource)} return spannerReader{executor, txSource} @@ -128,10 +161,13 @@ func (sd spannerDatastore) SnapshotReader(revisionRaw datastore.Revision) datast func (sd spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUserFunc, opts ...options.RWTOptionsOption) (datastore.Revision, error) { config := options.NewRWTOptionsWithOptions(opts...) + ctx, span := tracer.Start(ctx, "ReadWriteTx") + defer span.End() + ctx, cancel := context.WithCancel(ctx) resp, err := sd.client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, spannerRWT *spanner.ReadWriteTransaction) error { txSource := func() readTX { - return spannerRWT + return &traceableRTX{delegate: spannerRWT} } executor := common.QueryExecutor{Executor: queryExecutor(txSource)} @@ -140,7 +176,13 @@ func (sd spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUserF spannerRWT, sd.config.disableStats, } - if err := fn(rwt); err != nil { + err := func() error { + innerCtx, innerSpan := tracer.Start(ctx, "TxUserFunc") + defer innerSpan.End() + + return fn(innerCtx, rwt) + }() + if err != nil { if config.DisableRetries { defer cancel() } diff --git a/internal/datastore/spanner/stats.go b/internal/datastore/spanner/stats.go index 262f50203f..07093f03ce 100644 --- a/internal/datastore/spanner/stats.go +++ b/internal/datastore/spanner/stats.go @@ -7,6 +7,7 @@ import ( "time" "cloud.google.com/go/spanner" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc/codes" log "github.com/authzed/spicedb/internal/logging" @@ -40,7 +41,7 @@ func (sd spannerDatastore) Statistics(ctx context.Context) (datastore.Stats, err ) defer iter.Stop() - allNamespaces, err := readAllNamespaces(iter) + allNamespaces, err := readAllNamespaces(iter, trace.SpanFromContext(ctx)) if err != nil { return datastore.Stats{}, fmt.Errorf("unable to read namespaces: %w", err) } diff --git a/internal/dispatch/graph/graph.go b/internal/dispatch/graph/graph.go index c10e68d3a8..0a46a47734 100644 --- a/internal/dispatch/graph/graph.go +++ b/internal/dispatch/graph/graph.go @@ -158,8 +158,10 @@ func (ld *localDispatcher) lookupRelation(_ context.Context, ns *core.NamespaceD // DispatchCheck implements dispatch.Check interface func (ld *localDispatcher) DispatchCheck(ctx context.Context, req *v1.DispatchCheckRequest) (*v1.DispatchCheckResponse, error) { - ctx, span := tracer.Start(ctx, "DispatchCheck", trace.WithAttributes( - attribute.String("resource-type", tuple.StringRR(req.ResourceRelation)), + resourceType := tuple.StringRR(req.ResourceRelation) + spanName := "DispatchCheck → " + resourceType + "@" + req.Subject.Namespace + "#" + req.Subject.Relation + ctx, span := tracer.Start(ctx, spanName, trace.WithAttributes( + attribute.String("resource-type", resourceType), attribute.StringSlice("resource-ids", req.ResourceIds), attribute.String("subject", tuple.StringONR(req.Subject)), )) @@ -273,9 +275,12 @@ func (ld *localDispatcher) DispatchReachableResources( req *v1.DispatchReachableResourcesRequest, stream dispatch.ReachableResourcesStream, ) error { - ctx, span := tracer.Start(stream.Context(), "DispatchReachableResources", trace.WithAttributes( - attribute.String("resource-type", tuple.StringRR(req.ResourceRelation)), - attribute.String("subject-type", tuple.StringRR(req.SubjectRelation)), + resourceType := tuple.StringRR(req.ResourceRelation) + subjectRelation := tuple.StringRR(req.SubjectRelation) + spanName := "DispatchReachableResources → " + resourceType + "@" + subjectRelation + ctx, span := tracer.Start(stream.Context(), spanName, trace.WithAttributes( + attribute.String("resource-type", resourceType), + attribute.String("subject-type", subjectRelation), attribute.StringSlice("subject-ids", req.SubjectIds), )) defer span.End() @@ -332,9 +337,13 @@ func (ld *localDispatcher) DispatchLookupSubjects( req *v1.DispatchLookupSubjectsRequest, stream dispatch.LookupSubjectsStream, ) error { - ctx, span := tracer.Start(stream.Context(), "DispatchLookupSubjects", trace.WithAttributes( - attribute.String("resource-type", tuple.StringRR(req.ResourceRelation)), - attribute.String("subject-type", tuple.StringRR(req.SubjectRelation)), + resourceType := tuple.StringRR(req.ResourceRelation) + subjectRelation := tuple.StringRR(req.SubjectRelation) + spanName := "DispatchLookupSubjects → " + resourceType + "@" + subjectRelation + + ctx, span := tracer.Start(stream.Context(), spanName, trace.WithAttributes( + attribute.String("resource-type", resourceType), + attribute.String("subject-type", subjectRelation), attribute.StringSlice("resource-ids", req.ResourceIds), )) defer span.End() diff --git a/internal/graph/computed/computecheck_test.go b/internal/graph/computed/computecheck_test.go index 2f606baddc..e07e554a02 100644 --- a/internal/graph/computed/computecheck_test.go +++ b/internal/graph/computed/computecheck_test.go @@ -943,7 +943,7 @@ func writeCaveatedTuples(ctx context.Context, _ *testing.T, ds datastore.Datasto return datastore.NoRevision, err } - return ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + return ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { if err := rwt.WriteNamespaces(ctx, compiled.ObjectDefinitions...); err != nil { return err } diff --git a/internal/services/shared/schema_test.go b/internal/services/shared/schema_test.go index 870816d21f..4ed2e2fed4 100644 --- a/internal/services/shared/schema_test.go +++ b/internal/services/shared/schema_test.go @@ -51,7 +51,7 @@ func TestApplySchemaChanges(t *testing.T) { validated, err := ValidateSchemaChanges(context.Background(), compiled, false) require.NoError(err) - _, err = ds.ReadWriteTx(context.Background(), func(rwt datastore.ReadWriteTransaction) error { + _, err = ds.ReadWriteTx(context.Background(), func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { applied, err := ApplySchemaChanges(context.Background(), rwt, validated) require.NoError(err) diff --git a/internal/services/v1/experimental.go b/internal/services/v1/experimental.go index dcd1d79c7b..9946770b70 100644 --- a/internal/services/v1/experimental.go +++ b/internal/services/v1/experimental.go @@ -211,7 +211,7 @@ func (es *experimentalServer) BulkImportRelationships(stream v1.ExperimentalServ ds := datastoremw.MustFromContext(stream.Context()) var numWritten uint64 - if _, err := ds.ReadWriteTx(stream.Context(), func(rwt datastore.ReadWriteTransaction) error { + if _, err := ds.ReadWriteTx(stream.Context(), func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { loadedNamespaces := make(map[string]*typesystem.TypeSystem) loadedCaveats := make(map[string]*core.CaveatDefinition) diff --git a/internal/services/v1/preconditions_test.go b/internal/services/v1/preconditions_test.go index c9fc6caf15..31690b00a3 100644 --- a/internal/services/v1/preconditions_test.go +++ b/internal/services/v1/preconditions_test.go @@ -31,7 +31,7 @@ func TestPreconditions(t *testing.T) { require.True(revision.GreaterThan(datastore.NoRevision)) ctx := context.Background() - _, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { require.NoError(checkPreconditions(ctx, rwt, []*v1.Precondition{ { Operation: v1.Precondition_OPERATION_MUST_MATCH, diff --git a/internal/services/v1/relationships.go b/internal/services/v1/relationships.go index 3d0a7c5528..f75f220f14 100644 --- a/internal/services/v1/relationships.go +++ b/internal/services/v1/relationships.go @@ -10,6 +10,7 @@ import ( "github.com/jzelinskie/stringz" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "go.opentelemetry.io/otel/trace" "google.golang.org/protobuf/proto" "github.com/authzed/spicedb/internal/dispatch" @@ -248,6 +249,8 @@ func (ps *permissionServer) ReadRelationships(req *v1.ReadRelationshipsRequest, func (ps *permissionServer) WriteRelationships(ctx context.Context, req *v1.WriteRelationshipsRequest) (*v1.WriteRelationshipsResponse, error) { ds := datastoremw.MustFromContext(ctx) + span := trace.SpanFromContext(ctx) + span.AddEvent("validating mutations") // Ensure that the updates and preconditions are not over the configured limits. if len(req.Updates) > int(ps.config.MaxUpdatesPerWrite) { return nil, ps.rewriteError( @@ -282,8 +285,10 @@ func (ps *permissionServer) WriteRelationships(ctx context.Context, req *v1.Writ } // Execute the write operation(s). + span.AddEvent("read write transaction") tupleUpdates := tuple.UpdateFromRelationshipUpdates(req.Updates) - revision, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + revision, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + span.AddEvent("preconditions") // Validate the preconditions. for _, precond := range req.OptionalPreconditions { if err := ps.checkFilterNamespaces(ctx, precond.Filter, rwt); err != nil { @@ -292,6 +297,7 @@ func (ps *permissionServer) WriteRelationships(ctx context.Context, req *v1.Writ } // Validate the updates. + span.AddEvent("validate updates") err := relationships.ValidateRelationshipUpdates(ctx, rwt, tupleUpdates) if err != nil { return ps.rewriteError(ctx, err) @@ -302,10 +308,12 @@ func (ps *permissionServer) WriteRelationships(ctx context.Context, req *v1.Writ DispatchCount: uint32(len(req.OptionalPreconditions)) + 1, }) + span.AddEvent("preconditions") if err := checkPreconditions(ctx, rwt, req.OptionalPreconditions); err != nil { return err } + span.AddEvent("write relationships") return rwt.WriteRelationships(ctx, tupleUpdates) }) if err != nil { @@ -338,7 +346,7 @@ func (ps *permissionServer) DeleteRelationships(ctx context.Context, req *v1.Del ds := datastoremw.MustFromContext(ctx) deletionProgress := v1.DeleteRelationshipsResponse_DELETION_PROGRESS_COMPLETE - revision, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + revision, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { if err := ps.checkFilterNamespaces(ctx, req.RelationshipFilter, rwt); err != nil { return err } diff --git a/internal/services/v1/schema.go b/internal/services/v1/schema.go index fb7590bd5a..6faf82f61e 100644 --- a/internal/services/v1/schema.go +++ b/internal/services/v1/schema.go @@ -120,7 +120,7 @@ func (ss *schemaServer) WriteSchema(ctx context.Context, in *v1.WriteSchemaReque } // Update the schema. - revision, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + revision, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { applied, err := shared.ApplySchemaChanges(ctx, rwt, validated) if err != nil { return err diff --git a/internal/testfixtures/datastore.go b/internal/testfixtures/datastore.go index bb3920d2f8..59f48a4b6c 100644 --- a/internal/testfixtures/datastore.go +++ b/internal/testfixtures/datastore.go @@ -169,7 +169,7 @@ func StandardDatastoreWithCaveatedData(ds datastore.Datastore, require *require. ds, _ = StandardDatastoreWithSchema(ds, require) ctx := context.Background() - _, err := ds.ReadWriteTx(ctx, func(tx datastore.ReadWriteTransaction) error { + _, err := ds.ReadWriteTx(ctx, func(ctx context.Context, tx datastore.ReadWriteTransaction) error { return tx.WriteCaveats(ctx, createTestCaveat(require)) }) require.NoError(err) @@ -225,7 +225,7 @@ func DatastoreFromSchemaAndTestRelationships(ds datastore.Datastore, schema stri _ = writeDefinitions(validating, require, compiled.ObjectDefinitions, compiled.CaveatDefinitions) - newRevision, err := validating.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + newRevision, err := validating.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { mutations := make([]*core.RelationTupleUpdate, 0, len(relationships)) for _, rel := range relationships { mutations = append(mutations, tuple.Create(rel.CloneVT())) @@ -242,7 +242,7 @@ func DatastoreFromSchemaAndTestRelationships(ds datastore.Datastore, schema stri func writeDefinitions(ds datastore.Datastore, require *require.Assertions, objectDefs []*core.NamespaceDefinition, caveatDefs []*core.CaveatDefinition) datastore.Revision { ctx := context.Background() - newRevision, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + newRevision, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { if len(caveatDefs) > 0 { err := rwt.WriteCaveats(ctx, caveatDefs) require.NoError(err) diff --git a/internal/testfixtures/validating.go b/internal/testfixtures/validating.go index d156f91ec8..b6b359f7ad 100644 --- a/internal/testfixtures/validating.go +++ b/internal/testfixtures/validating.go @@ -37,9 +37,9 @@ func (vd validatingDatastore) ReadWriteTx( return datastore.NoRevision, fmt.Errorf("nil delegate function") } - return vd.Datastore.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + return vd.Datastore.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { txDelegate := validatingReadWriteTransaction{validatingSnapshotReader{rwt}, rwt} - return f(txDelegate) + return f(ctx, txDelegate) }, opts...) } diff --git a/pkg/datastore/datastore.go b/pkg/datastore/datastore.go index d207bfd81d..c72f5ffa48 100644 --- a/pkg/datastore/datastore.go +++ b/pkg/datastore/datastore.go @@ -273,7 +273,7 @@ type ReadWriteTransaction interface { } // TxUserFunc is a type for the function that users supply when they invoke a read-write transaction. -type TxUserFunc func(ReadWriteTransaction) error +type TxUserFunc func(context.Context, ReadWriteTransaction) error // ReadyState represents the ready state of the datastore. type ReadyState struct { diff --git a/pkg/datastore/test/bulk.go b/pkg/datastore/test/bulk.go index 0909486169..0424805ace 100644 --- a/pkg/datastore/test/bulk.go +++ b/pkg/datastore/test/bulk.go @@ -33,7 +33,7 @@ func BulkUploadTest(t *testing.T, tester DatastoreTester) { t, ) - _, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { loaded, err := rwt.BulkLoad(ctx, bulkSource) require.NoError(err) require.Equal(uint64(tc), loaded) @@ -66,7 +66,7 @@ func BulkUploadErrorsTest(t *testing.T, tester DatastoreTester) { ds, _ := testfixtures.StandardDatastoreWithSchema(rawDS, require) - _, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { inserted, err := rwt.BulkLoad(ctx, &onlyErrorSource{}) // We can't check the specific error because pgx is not wrapping diff --git a/pkg/datastore/test/caveat.go b/pkg/datastore/test/caveat.go index 9602614b54..761db7b697 100644 --- a/pkg/datastore/test/caveat.go +++ b/pkg/datastore/test/caveat.go @@ -115,7 +115,7 @@ func WriteReadDeleteCaveatTest(t *testing.T, tester DatastoreTester) { req.Len(cvs, 0) // Delete Caveat - rev, err = ds.ReadWriteTx(ctx, func(tx datastore.ReadWriteTransaction) error { + rev, err = ds.ReadWriteTx(ctx, func(ctx context.Context, tx datastore.ReadWriteTransaction) error { return tx.DeleteCaveats(ctx, []string{coreCaveat.Name}) }) req.NoError(err) @@ -380,7 +380,7 @@ func assertTupleCorrectlyStored(req *require.Assertions, ds datastore.Datastore, func skipIfNotCaveatStorer(t *testing.T, ds datastore.Datastore) { ctx := context.Background() - _, _ = ds.ReadWriteTx(ctx, func(transaction datastore.ReadWriteTransaction) error { // nolint: errcheck + _, _ = ds.ReadWriteTx(ctx, func(ctx context.Context, transaction datastore.ReadWriteTransaction) error { // nolint: errcheck _, _, err := transaction.ReadCaveatByName(ctx, uuid.NewString()) if !errors.As(err, &datastore.ErrCaveatNameNotFound{}) { t.Skip("datastore does not implement CaveatStorer interface") @@ -402,7 +402,7 @@ func createTestCaveatedTuple(t *testing.T, tplString string, caveatName string) } func writeCaveats(ctx context.Context, ds datastore.Datastore, coreCaveat ...*core.CaveatDefinition) (datastore.Revision, error) { - rev, err := ds.ReadWriteTx(ctx, func(tx datastore.ReadWriteTransaction) error { + rev, err := ds.ReadWriteTx(ctx, func(ctx context.Context, tx datastore.ReadWriteTransaction) error { return tx.WriteCaveats(ctx, coreCaveat) }) if err != nil { diff --git a/pkg/datastore/test/datastore.go b/pkg/datastore/test/datastore.go index 97d7fcfd1b..f2fa521177 100644 --- a/pkg/datastore/test/datastore.go +++ b/pkg/datastore/test/datastore.go @@ -161,7 +161,7 @@ func makeTestTuple(resourceID, userID string) *core.RelationTuple { func setupDatastore(ds datastore.Datastore, require *require.Assertions) datastore.Revision { ctx := context.Background() - revision, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + revision, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { return rwt.WriteNamespaces(ctx, testGroupNS, testResourceNS, testUserNS) }) require.NoError(err) diff --git a/pkg/datastore/test/namespace.go b/pkg/datastore/test/namespace.go index e72b11d0f3..d9da1deddc 100644 --- a/pkg/datastore/test/namespace.go +++ b/pkg/datastore/test/namespace.go @@ -66,7 +66,7 @@ func NamespaceWriteTest(t *testing.T, tester DatastoreTester) { require.NoError(err) require.Equal(0, len(nsDefs)) - writtenRev, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + writtenRev, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { return rwt.WriteNamespaces(ctx, testUserNS) }) require.NoError(err) @@ -77,7 +77,7 @@ func NamespaceWriteTest(t *testing.T, tester DatastoreTester) { require.Equal(1, len(nsDefs)) require.Equal(testUserNS.Name, nsDefs[0].Definition.Name) - secondWritten, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + secondWritten, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { return rwt.WriteNamespaces(ctx, testNamespace) }) require.NoError(err) @@ -101,7 +101,7 @@ func NamespaceWriteTest(t *testing.T, tester DatastoreTester) { foundDiff := cmp.Diff(testNamespace, found, protocmp.Transform()) require.Empty(foundDiff) - updatedRevision, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + updatedRevision, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { return rwt.WriteNamespaces(ctx, updatedNamespace) }) require.NoError(err) @@ -160,7 +160,7 @@ func NamespaceDeleteTest(t *testing.T, tester DatastoreTester) { require.NotNil(folderTpl) tRequire.TupleExists(ctx, folderTpl, revision) - deletedRev, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + deletedRev, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { return rwt.DeleteNamespaces(ctx, testfixtures.DocumentNS.Name) }) require.NoError(err) @@ -207,7 +207,7 @@ func NamespaceMultiDeleteTest(t *testing.T, tester DatastoreTester) { nsNames = append(nsNames, ns.Definition.Name) } - deletedRev, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + deletedRev, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { return rwt.DeleteNamespaces(ctx, nsNames...) }) require.NoError(t, err) @@ -227,7 +227,7 @@ func EmptyNamespaceDeleteTest(t *testing.T, tester DatastoreTester) { ds, revision := testfixtures.StandardDatastoreWithData(rawDS, require) ctx := context.Background() - deletedRev, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + deletedRev, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { return rwt.DeleteNamespaces(ctx, testfixtures.UserNS.Name) }) require.NoError(err) @@ -271,7 +271,7 @@ definition document { require.NoError(err) ctx := context.Background() - updatedRevision, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + updatedRevision, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { err := rwt.WriteCaveats(ctx, compiled.CaveatDefinitions) if err != nil { return err diff --git a/pkg/datastore/test/pagination.go b/pkg/datastore/test/pagination.go index 53fff2ac94..5ddf4b4912 100644 --- a/pkg/datastore/test/pagination.go +++ b/pkg/datastore/test/pagination.go @@ -69,7 +69,7 @@ func OrderingTest(t *testing.T, tester DatastoreTester) { } // Check a reader from with a transaction - _, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { iter, err := rwt.QueryRelationships(ctx, datastore.RelationshipsFilter{ ResourceType: tc.resourceType, }, options.WithSort(tc.ordering)) @@ -384,7 +384,7 @@ func foreachTxType( reader := ds.SnapshotReader(snapshotRev) fn(reader) - _, _ = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + _, _ = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { fn(rwt) return nil }) diff --git a/pkg/datastore/test/revisions.go b/pkg/datastore/test/revisions.go index 6b3055918e..ef9cfb8b85 100644 --- a/pkg/datastore/test/revisions.go +++ b/pkg/datastore/test/revisions.go @@ -79,7 +79,7 @@ func RevisionSerializationTest(t *testing.T, tester DatastoreTester) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() - revToTest, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + revToTest, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { return rwt.WriteNamespaces(ctx, testNamespace) }) require.NoError(err) @@ -103,7 +103,7 @@ func RevisionGCTest(t *testing.T, tester DatastoreTester) { defer cancel() testCaveat := createCoreCaveat(t) - _, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { if err := rwt.WriteNamespaces(ctx, ns.Namespace("foo/createdtxgc")); err != nil { return err } @@ -113,7 +113,7 @@ func RevisionGCTest(t *testing.T, tester DatastoreTester) { }) require.NoError(err) - previousRev, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + previousRev, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { return rwt.WriteNamespaces(ctx, testNamespace) }) require.NoError(err) @@ -158,7 +158,7 @@ func RevisionGCTest(t *testing.T, tester DatastoreTester) { require.NoError(ds.CheckRevision(ctx, head), "expected freshly obtained head revision to be valid") // write happens, we get a new head revision - newerRev, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + newerRev, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { return rwt.WriteNamespaces(ctx, testNamespace) }) require.NoError(err) diff --git a/pkg/datastore/test/transactions.go b/pkg/datastore/test/transactions.go index 7fb33af37d..f758746b67 100644 --- a/pkg/datastore/test/transactions.go +++ b/pkg/datastore/test/transactions.go @@ -40,7 +40,7 @@ func RetryTest(t *testing.T, tester DatastoreTester) { defer cancel() var attempts int - _, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { attempts++ if tc.returnRetryableError { diff --git a/pkg/datastore/test/tuples.go b/pkg/datastore/test/tuples.go index e1c9aeae9b..72dbea9397 100644 --- a/pkg/datastore/test/tuples.go +++ b/pkg/datastore/test/tuples.go @@ -235,7 +235,7 @@ func SimpleTest(t *testing.T, tester DatastoreTester) { tRequire.TupleExists(ctx, testTuples[0], returnedAt) // Delete with DeleteRelationship - deletedAt, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + deletedAt, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { err := rwt.DeleteRelationships(ctx, &v1.RelationshipFilter{ ResourceType: testResourceNamespace, }) @@ -269,7 +269,7 @@ func ObjectIDsTest(t *testing.T, tester DatastoreTester) { require.NoError(tpl.Validate()) // Write the test tuple - _, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { return rwt.WriteRelationships(ctx, []*core.RelationTupleUpdate{ { Operation: core.RelationTupleUpdate_CREATE, @@ -386,7 +386,7 @@ func DeleteRelationshipsTest(t *testing.T, tester DatastoreTester) { // TODO temporarily store tuples in multiple calls to ReadWriteTransaction since no Datastore // handles correctly duplicate tuples - _, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { for _, tpl := range tt.inputTuples { update := tuple.Touch(tpl) err := rwt.WriteRelationships(ctx, []*core.RelationTupleUpdate{update}) @@ -398,7 +398,7 @@ func DeleteRelationshipsTest(t *testing.T, tester DatastoreTester) { }) require.NoError(err) - deletedAt, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + deletedAt, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { err := rwt.DeleteRelationships(ctx, tt.filter) require.NoError(err) return err @@ -474,7 +474,7 @@ func DeleteNotExistantTest(t *testing.T, tester DatastoreTester) { ds, _ := testfixtures.StandardDatastoreWithData(rawDS, require) ctx := context.Background() - _, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { err := rwt.WriteRelationships(ctx, []*core.RelationTupleUpdate{ tuple.Delete(tuple.MustParse("document:foo#viewer@user:tom#...")), }) @@ -495,7 +495,7 @@ func DeleteAlreadyDeletedTest(t *testing.T, tester DatastoreTester) { ds, _ := testfixtures.StandardDatastoreWithData(rawDS, require) ctx := context.Background() - _, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { // Write the relationship. return rwt.WriteRelationships(ctx, []*core.RelationTupleUpdate{ tuple.Create(tuple.MustParse("document:foo#viewer@user:tom#...")), @@ -503,7 +503,7 @@ func DeleteAlreadyDeletedTest(t *testing.T, tester DatastoreTester) { }) require.NoError(err) - _, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { // Delete the relationship. return rwt.WriteRelationships(ctx, []*core.RelationTupleUpdate{ tuple.Delete(tuple.MustParse("document:foo#viewer@user:tom#...")), @@ -511,7 +511,7 @@ func DeleteAlreadyDeletedTest(t *testing.T, tester DatastoreTester) { }) require.NoError(err) - _, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { // Delete the relationship again. return rwt.WriteRelationships(ctx, []*core.RelationTupleUpdate{ tuple.Delete(tuple.MustParse("document:foo#viewer@user:tom#...")), @@ -696,7 +696,7 @@ func MultipleReadsInRWTTest(t *testing.T, tester DatastoreTester) { ds, _ := testfixtures.StandardDatastoreWithData(rawDS, require) ctx := context.Background() - _, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { it, err := rwt.QueryRelationships(ctx, datastore.RelationshipsFilter{ ResourceType: "document", }) @@ -735,7 +735,7 @@ func ConcurrentWriteSerializationTest(t *testing.T, tester DatastoreTester) { startTime := time.Now() g.Go(func() error { - _, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + _, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { iter, err := rwt.QueryRelationships(ctx, datastore.RelationshipsFilter{ ResourceType: testResourceNamespace, }) @@ -760,7 +760,7 @@ func ConcurrentWriteSerializationTest(t *testing.T, tester DatastoreTester) { <-waitToStart - _, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { defer waitToFinishCloser.Do(func() { close(waitToFinish) }) diff --git a/pkg/datastore/test/watch.go b/pkg/datastore/test/watch.go index aaa0b3f4d9..cdb8bdbfbe 100644 --- a/pkg/datastore/test/watch.go +++ b/pkg/datastore/test/watch.go @@ -90,7 +90,7 @@ func WatchTest(t *testing.T, tester DatastoreTester) { testUpdates = append(testUpdates, batch, []*core.RelationTupleUpdate{deleteUpdate}) - _, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { err := rwt.DeleteRelationships(ctx, &v1.RelationshipFilter{ ResourceType: testResourceNamespace, OptionalRelation: testReaderRelation, diff --git a/pkg/development/devcontext.go b/pkg/development/devcontext.go index e4fd29005b..a6ee4f75d2 100644 --- a/pkg/development/devcontext.go +++ b/pkg/development/devcontext.go @@ -80,7 +80,7 @@ func newDevContextWithDatastore(ctx context.Context, requestContext *devinterfac } var inputErrors []*devinterface.DeveloperError - currentRevision, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + currentRevision, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { inputErrors, err = loadCompiled(ctx, compiled, rwt) if err != nil || len(inputErrors) > 0 { return err diff --git a/pkg/typesystem/typesystem_test.go b/pkg/typesystem/typesystem_test.go index d96622b879..2908f3382f 100644 --- a/pkg/typesystem/typesystem_test.go +++ b/pkg/typesystem/typesystem_test.go @@ -361,7 +361,7 @@ func TestTypeSystem(t *testing.T) { ctx := context.Background() - lastRevision, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + lastRevision, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { for _, otherNS := range tc.otherNamespaces { if err := rwt.WriteNamespaces(ctx, otherNS); err != nil { return err diff --git a/pkg/validationfile/loader.go b/pkg/validationfile/loader.go index 18ca813256..ee8d169987 100644 --- a/pkg/validationfile/loader.go +++ b/pkg/validationfile/loader.go @@ -109,7 +109,7 @@ func PopulateFromFilesContents(ctx context.Context, ds datastore.Datastore, file } // Load the definitions and relationships into the datastore. - revision, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + revision, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { // Write the caveat definitions. err := rwt.WriteCaveats(ctx, caveatDefs) if err != nil { @@ -154,7 +154,7 @@ func PopulateFromFilesContents(ctx context.Context, ds datastore.Datastore, file for _, update := range chunked { chunkedTuples = append(chunkedTuples, update.Tuple) } - revision, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + revision, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { err = relationships.ValidateRelationshipsForCreateOrTouch(ctx, rwt, chunkedTuples) if err != nil { return err