From 46ba52d0319258390f6bf1127d4e229296035cae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=ADctor=20Rold=C3=A1n=20Betancort?= Date: Tue, 19 Dec 2023 14:21:41 +0000 Subject: [PATCH] fixes CRDB datastore not reporting proper GRPC codes - duplicate relationship now reports gRPC code AlreadyExists - retryable errors now return grpc code Unavailable --- internal/datastore/crdb/crdb.go | 11 ++++++++- internal/datastore/crdb/pool/slqerrors.go | 27 +++++++++++++++++++++++ internal/datastore/crdb/readwrite.go | 6 ----- internal/datastore/memdb/readwrite.go | 2 +- internal/datastore/mysql/datastore.go | 15 +++++++++++-- internal/datastore/mysql/readwrite.go | 4 ---- internal/datastore/postgres/postgres.go | 6 +++++ internal/datastore/postgres/readwrite.go | 6 ----- pkg/datastore/test/tuples.go | 8 +++++++ 9 files changed, 65 insertions(+), 20 deletions(-) diff --git a/internal/datastore/crdb/crdb.go b/internal/datastore/crdb/crdb.go index 34a60a4929..240acfb702 100644 --- a/internal/datastore/crdb/crdb.go +++ b/internal/datastore/crdb/crdb.go @@ -336,12 +336,21 @@ func (cds *crdbDatastore) ReadWriteTx( return nil }) if err != nil { - return datastore.NoRevision, err + return datastore.NoRevision, wrapError(err) } return commitTimestamp, nil } +func wrapError(err error) error { + // If a unique constraint violation is returned, then its likely that the cause + // was an existing relationship. + if cerr := pgxcommon.ConvertToWriteConstraintError(livingTupleConstraint, err); cerr != nil { + return cerr + } + return err +} + func (cds *crdbDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, error) { headMigration, err := migrations.CRDBMigrations.HeadRevision() if err != nil { diff --git a/internal/datastore/crdb/pool/slqerrors.go b/internal/datastore/crdb/pool/slqerrors.go index a5ed007ab5..1edec7ca21 100644 --- a/internal/datastore/crdb/pool/slqerrors.go +++ b/internal/datastore/crdb/pool/slqerrors.go @@ -5,6 +5,10 @@ import ( "fmt" "github.com/jackc/pgx/v5/pgconn" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/authzed/spicedb/pkg/spiceerrors" ) const ( @@ -35,6 +39,15 @@ func (e *MaxRetryError) Error() string { func (e *MaxRetryError) Unwrap() error { return e.LastErr } +func (e *MaxRetryError) GRPCStatus() *status.Status { + s, ok := status.FromError(e.Unwrap()) + if !ok { + return nil + } + + return s +} + // ResettableError is an error that we think may succeed if retried against a new connection. type ResettableError struct { Err error @@ -43,6 +56,13 @@ type ResettableError struct { func (e *ResettableError) Error() string { return "resettable error" + ": " + e.Err.Error() } func (e *ResettableError) Unwrap() error { return e.Err } +func (e *ResettableError) GRPCStatus() *status.Status { + return spiceerrors.WithCodeAndDetails( + e.Unwrap(), + codes.Unavailable, // return unavailable so clients are know it's ok to retry + ) +} + // RetryableError is an error that can be retried against the existing connection. type RetryableError struct { Err error @@ -51,6 +71,13 @@ type RetryableError struct { func (e *RetryableError) Error() string { return "retryable error" + ": " + e.Err.Error() } func (e *RetryableError) Unwrap() error { return e.Err } +func (e *RetryableError) GRPCStatus() *status.Status { + return spiceerrors.WithCodeAndDetails( + e.Unwrap(), + codes.Unavailable, // return unavailable so clients are know it's ok to retry + ) +} + // sqlErrorCode attempts to extract the crdb error code from the error state. func sqlErrorCode(err error) string { var pgerr *pgconn.PgError diff --git a/internal/datastore/crdb/readwrite.go b/internal/datastore/crdb/readwrite.go index fc21b41d7f..c47c47b857 100644 --- a/internal/datastore/crdb/readwrite.go +++ b/internal/datastore/crdb/readwrite.go @@ -168,12 +168,6 @@ func (rwt *crdbReadWriteTXN) WriteRelationships(ctx context.Context, mutations [ } if _, err := rwt.tx.Exec(ctx, sql, args...); err != nil { - // If a unique constraint violation is returned, then its likely that the cause - // was an existing relationship given as a CREATE. - if cerr := pgxcommon.ConvertToWriteConstraintError(livingTupleConstraint, err); cerr != nil { - return cerr - } - return fmt.Errorf(errUnableToWriteRelationships, err) } } diff --git a/internal/datastore/memdb/readwrite.go b/internal/datastore/memdb/readwrite.go index 2045d1aead..0904e6d859 100644 --- a/internal/datastore/memdb/readwrite.go +++ b/internal/datastore/memdb/readwrite.go @@ -213,7 +213,7 @@ func (rwt *memdbReadWriteTx) DeleteNamespaces(_ context.Context, nsNames ...stri func (rwt *memdbReadWriteTx) BulkLoad(ctx context.Context, iter datastore.BulkWriteRelationshipSource) (uint64, error) { updates := []*core.RelationTupleUpdate{{ - Operation: core.RelationTupleUpdate_TOUCH, + Operation: core.RelationTupleUpdate_CREATE, }} var numCopied uint64 diff --git a/internal/datastore/mysql/datastore.go b/internal/datastore/mysql/datastore.go index 0f0cc2bffa..72f91bc9cd 100644 --- a/internal/datastore/mysql/datastore.go +++ b/internal/datastore/mysql/datastore.go @@ -321,7 +321,7 @@ func (mds *Datastore) ReadWriteTx( continue } - return datastore.NoRevision, err + return datastore.NoRevision, wrapError(err) } return revisionFromTransaction(newTxnID), nil @@ -329,7 +329,18 @@ func (mds *Datastore) ReadWriteTx( if !config.DisableRetries { err = fmt.Errorf("max retries exceeded: %w", err) } - return datastore.NoRevision, err + + return datastore.NoRevision, wrapError(err) +} + +// wrapError maps any mysql internal error into a SpiceDB typed error or an error +// that implements GRPCStatus(). +func wrapError(err error) error { + if cerr := convertToWriteConstraintError(err); cerr != nil { + return cerr + } + + return err } func isErrorRetryable(err error) bool { diff --git a/internal/datastore/mysql/readwrite.go b/internal/datastore/mysql/readwrite.go index dd8478db1a..500ef25d1f 100644 --- a/internal/datastore/mysql/readwrite.go +++ b/internal/datastore/mysql/readwrite.go @@ -210,10 +210,6 @@ func (rwt *mysqlReadWriteTXN) WriteRelationships(ctx context.Context, mutations _, err = rwt.tx.ExecContext(ctx, query, args...) if err != nil { - if cerr := convertToWriteConstraintError(err); cerr != nil { - return cerr - } - return fmt.Errorf(errUnableToWriteRelationships, err) } } diff --git a/internal/datastore/postgres/postgres.go b/internal/datastore/postgres/postgres.go index c86d0e8000..49c9cfabc5 100644 --- a/internal/datastore/postgres/postgres.go +++ b/internal/datastore/postgres/postgres.go @@ -473,6 +473,12 @@ func (pgd *pgDatastore) RepairOperations() []datastore.RepairOperation { } func wrapError(err error) error { + // If a unique constraint violation is returned, then its likely that the cause + // was an existing relationship given as a CREATE. + if cerr := pgxcommon.ConvertToWriteConstraintError(livingTupleConstraint, err); cerr != nil { + return cerr + } + if pgxcommon.IsSerializationError(err) { return common.NewSerializationError(err) } diff --git a/internal/datastore/postgres/readwrite.go b/internal/datastore/postgres/readwrite.go index 0362439339..f473b2984d 100644 --- a/internal/datastore/postgres/readwrite.go +++ b/internal/datastore/postgres/readwrite.go @@ -117,12 +117,6 @@ func (rwt *pgReadWriteTXN) WriteRelationships(ctx context.Context, mutations []* _, err = rwt.tx.Exec(ctx, sql, args...) if err != nil { - // If a unique constraint violation is returned, then its likely that the cause - // was an existing relationship given as a CREATE. - if cerr := pgxcommon.ConvertToWriteConstraintError(livingTupleConstraint, err); cerr != nil { - return cerr - } - return fmt.Errorf(errUnableToWriteRelationships, err) } } diff --git a/pkg/datastore/test/tuples.go b/pkg/datastore/test/tuples.go index 72dbea9397..a59364a245 100644 --- a/pkg/datastore/test/tuples.go +++ b/pkg/datastore/test/tuples.go @@ -566,6 +566,14 @@ func CreateAlreadyExistingTest(t *testing.T, tester DatastoreTester) { require.ErrorAs(err, &common.CreateRelationshipExistsError{}) require.Contains(err.Error(), "could not CREATE relationship ") grpcutil.RequireStatus(t, codes.AlreadyExists, err) + + f := func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + _, err := rwt.BulkLoad(ctx, testfixtures.NewBulkTupleGenerator(testResourceNamespace, testReaderRelation, testUserNamespace, 1, t)) + return err + } + _, _ = ds.ReadWriteTx(ctx, f) + _, err = ds.ReadWriteTx(ctx, f) + grpcutil.RequireStatus(t, codes.AlreadyExists, err) } // TouchAlreadyExistingTest tests touching a relationship twice.