Skip to content

Commit

Permalink
Merge pull request #1590 from jzelinskie/spanner-optimism
Browse files Browse the repository at this point in the history
Optimistic locking on Spanner read/write transactions
  • Loading branch information
jzelinskie authored Oct 18, 2023
2 parents 1296f82 + 5459af7 commit a3eedf8
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 29 deletions.
2 changes: 1 addition & 1 deletion internal/datastore/spanner/caveat.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (rwt spannerReadWriteTXN) WriteCaveats(_ context.Context, caveats []*core.C
mutations = append(mutations, spanner.InsertOrUpdate(
tableCaveat,
[]string{colName, colCaveatDefinition, colCaveatTS},
[]interface{}{caveat.Name, serialized, spanner.CommitTimestamp},
[]any{caveat.Name, serialized, spanner.CommitTimestamp},
))
}

Expand Down
2 changes: 1 addition & 1 deletion internal/datastore/spanner/migrations/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (smd *SpannerMigrationDriver) RunTx(ctx context.Context, f migrate.TxMigrat
func (smd *SpannerMigrationDriver) WriteVersion(_ context.Context, rwt *spanner.ReadWriteTransaction, version, replaced string) error {
return rwt.BufferWrite([]*spanner.Mutation{
spanner.Delete(tableSchemaVersion, spanner.KeySetFromKeys(spanner.Key{replaced})),
spanner.Insert(tableSchemaVersion, []string{colVersionNum}, []interface{}{version}),
spanner.Insert(tableSchemaVersion, []string{colVersionNum}, []any{version}),
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func init() {
return updateOp.Wait(ctx)
}, func(ctx context.Context, rwt *spanner.ReadWriteTransaction) error {
return rwt.BufferWrite([]*spanner.Mutation{
spanner.Insert("metadata", []string{"unique_id"}, []interface{}{uuid.NewString()}),
spanner.Insert("metadata", []string{"unique_id"}, []any{uuid.NewString()}),
})
}); err != nil {
panic("failed to register migration: " + err.Error())
Expand Down
8 changes: 1 addition & 7 deletions internal/datastore/spanner/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ import (
// https://github.com/googleapis/google-cloud-go/blob/a33861fe46be42ae150d6015ad39dae6e35e04e8/spanner/transaction.go#L55
type readTX interface {
ReadRow(ctx context.Context, table string, key spanner.Key, columns []string) (*spanner.Row, error)

Read(ctx context.Context, table string, keys spanner.KeySet, columns []string) *spanner.RowIterator

Query(ctx context.Context, statement spanner.Statement) *spanner.RowIterator
}

Expand Down Expand Up @@ -73,11 +71,7 @@ func (sr spannerReader) ReverseQueryRelationships(
}

func queryExecutor(txSource txFactory) common.ExecuteQueryFunc {
return func(
ctx context.Context,
sql string,
args []interface{},
) ([]*core.RelationTuple, error) {
return func(ctx context.Context, sql string, args []any) ([]*core.RelationTuple, error) {
ctx, span := tracer.Start(ctx, "ExecuteQuery")
defer span.End()

Expand Down
4 changes: 2 additions & 2 deletions internal/datastore/spanner/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type selectAndDelete struct {
del sq.DeleteBuilder
}

func (snd selectAndDelete) Where(pred interface{}, args ...interface{}) selectAndDelete {
func (snd selectAndDelete) Where(pred any, args ...any) selectAndDelete {
snd.sel = snd.sel.Where(pred, args...)
snd.del = snd.del.Where(pred, args...)
return snd
Expand Down Expand Up @@ -171,7 +171,7 @@ func (rwt spannerReadWriteTXN) WriteNamespaces(_ context.Context, newConfigs ...
mutations = append(mutations, spanner.InsertOrUpdate(
tableNamespace,
[]string{colNamespaceName, colNamespaceConfig, colTimestamp},
[]interface{}{newConfig.Name, serialized, spanner.CommitTimestamp},
[]any{newConfig.Name, serialized, spanner.CommitTimestamp},
))
}

Expand Down
25 changes: 9 additions & 16 deletions internal/datastore/spanner/spanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"cloud.google.com/go/spanner"
spb "cloud.google.com/go/spanner/apiv1/spannerpb"
sq "github.com/Masterminds/squirrel"
"go.opentelemetry.io/otel"
"google.golang.org/api/option"
Expand Down Expand Up @@ -120,28 +121,20 @@ func (sd spannerDatastore) SnapshotReader(revisionRaw datastore.Revision) datast
txSource := func() readTX {
return sd.client.Single().WithTimestampBound(spanner.ReadTimestamp(timestampFromRevision(r)))
}
executor := common.QueryExecutor{
Executor: queryExecutor(txSource),
}
executor := common.QueryExecutor{Executor: queryExecutor(txSource)}
return spannerReader{executor, txSource}
}

func (sd spannerDatastore) ReadWriteTx(
ctx context.Context,
fn datastore.TxUserFunc,
opts ...options.RWTOptionsOption,
) (datastore.Revision, error) {
func (sd spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUserFunc, opts ...options.RWTOptionsOption) (datastore.Revision, error) {
config := options.NewRWTOptionsWithOptions(opts...)

ctx, cancel := context.WithCancel(ctx)
ts, err := sd.client.ReadWriteTransaction(ctx, func(ctx context.Context, spannerRWT *spanner.ReadWriteTransaction) error {
resp, err := sd.client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, spannerRWT *spanner.ReadWriteTransaction) error {
txSource := func() readTX {
return spannerRWT
}

executor := common.QueryExecutor{
Executor: queryExecutor(txSource),
}
executor := common.QueryExecutor{Executor: queryExecutor(txSource)}
rwt := spannerReadWriteTXN{
spannerReader{executor, txSource},
spannerRWT,
Expand All @@ -155,15 +148,15 @@ func (sd spannerDatastore) ReadWriteTx(
}

return nil
})
}, spanner.TransactionOptions{ReadLockMode: spb.TransactionOptions_ReadWrite_OPTIMISTIC})
if err != nil {
if cerr := convertToWriteConstraintError(err); cerr != nil {
return datastore.NoRevision, cerr
}
return datastore.NoRevision, err
}

return revisionFromTimestamp(ts), nil
return revisionFromTimestamp(resp.CommitTs), nil
}

func (sd spannerDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, error) {
Expand Down Expand Up @@ -211,8 +204,8 @@ func (sd spannerDatastore) Close() error {
return nil
}

func statementFromSQL(sql string, args []interface{}) spanner.Statement {
params := make(map[string]interface{}, len(args))
func statementFromSQL(sql string, args []any) spanner.Statement {
params := make(map[string]any, len(args))
for index, arg := range args {
params["p"+strconv.Itoa(index+1)] = arg
}
Expand Down
2 changes: 1 addition & 1 deletion internal/datastore/spanner/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func updateCounter(ctx context.Context, rwt *spanner.ReadWriteTransaction, chang
Msg("updating counter")

if err := rwt.BufferWrite([]*spanner.Mutation{
spanner.InsertOrUpdate(tableCounters, []string{colID, colCount}, []interface{}{counterID, newValue}),
spanner.InsertOrUpdate(tableCounters, []string{colID, colCount}, []any{counterID, newValue}),
}); err != nil {
return fmt.Errorf("unable to buffer update to counter: %w", err)
}
Expand Down

0 comments on commit a3eedf8

Please sign in to comment.