From 88334f52ee5d3347d69a415c223acfcaac52c632 Mon Sep 17 00:00:00 2001 From: Mahdi Dibaiee Date: Thu, 2 Nov 2023 12:02:45 +0000 Subject: [PATCH] materialize-databricks: remove fencing, switch to idempotent apply --- materialize-bigquery/transactor.go | 1 + materialize-databricks/driver.go | 179 +++++++++++--------------- materialize-databricks/driver_test.go | 40 +----- materialize-databricks/sqlgen.go | 23 ++-- materialize-mysql/driver.go | 1 + materialize-postgres/driver.go | 1 + materialize-redshift/driver.go | 1 + materialize-snowflake/snowflake.go | 1 + materialize-sql/driver.go | 2 +- materialize-sql/endpoint.go | 2 +- materialize-sqlite/sqlite.go | 1 + materialize-sqlserver/driver.go | 1 + 12 files changed, 98 insertions(+), 155 deletions(-) diff --git a/materialize-bigquery/transactor.go b/materialize-bigquery/transactor.go index 0b447f6d55..5435681af6 100644 --- a/materialize-bigquery/transactor.go +++ b/materialize-bigquery/transactor.go @@ -34,6 +34,7 @@ func newTransactor( ep *sql.Endpoint, fence sql.Fence, bindings []sql.Table, + open pm.Request_Open, ) (_ pm.Transactor, err error) { cfg := ep.Config.(*config) diff --git a/materialize-databricks/driver.go b/materialize-databricks/driver.go index 976021e1ce..1257a68dbe 100644 --- a/materialize-databricks/driver.go +++ b/materialize-databricks/driver.go @@ -4,7 +4,6 @@ import ( "os" "context" stdsql "database/sql" - "encoding/base64" "encoding/json" "errors" "fmt" @@ -24,6 +23,7 @@ import ( pf "github.com/estuary/flow/go/protocols/flow" pm "github.com/estuary/flow/go/protocols/materialize" log "github.com/sirupsen/logrus" + "github.com/google/uuid" "go.gazette.dev/core/consumer/protocol" ) @@ -92,13 +92,13 @@ func newSqlServerDriver() *sql.Driver { }).Info("connecting to databricks") var metaBase sql.TablePath - var metaSpecs, metaCheckpoints = sql.MetaTables(metaBase) + var metaSpecs, _ = sql.MetaTables(metaBase) return &sql.Endpoint{ Config: cfg, Dialect: databricksDialect, MetaSpecs: &metaSpecs, - MetaCheckpoints: &metaCheckpoints, + MetaCheckpoints: nil, Client: client{uri: cfg.ToURI()}, CreateTableTemplate: tplCreateTargetTable, NewResource: newTableConfig, @@ -239,12 +239,7 @@ func (c client) ExecStatements(ctx context.Context, statements []string) error { } func (c client) InstallFence(ctx context.Context, checkpoints sql.Table, fence sql.Fence) (sql.Fence, error) { - var err = c.withDB(func(db *stdsql.DB) error { - var err error - fence, err = installFence(ctx, databricksDialect, db, checkpoints, fence, base64.StdEncoding.DecodeString) - return err - }) - return fence, err + return sql.Fence{}, nil } func (c client) withDB(fn func(*stdsql.DB) error) error { @@ -271,7 +266,6 @@ type transactor struct { // Variables exclusively used by Store. store struct { conn *stdsql.Conn - fence sql.Fence } bindings []*binding } @@ -285,6 +279,7 @@ func newTransactor( ep *sql.Endpoint, fence sql.Fence, bindings []sql.Table, + open pm.Request_Open, ) (_ pm.Transactor, err error) { var cfg = ep.Config.(*config) @@ -298,7 +293,6 @@ func newTransactor( } var d = &transactor{cfg: cfg, wsClient: wsClient} - d.store.fence = fence // Establish connections. if db, err := stdsql.Open("databricks", cfg.ToURI()); err != nil { @@ -318,6 +312,17 @@ func newTransactor( } } + var cp checkpoint + if open.StateJson != nil { + if err := json.Unmarshal(open.StateJson, &cp); err != nil { + return nil, fmt.Errorf("parsing driver config: %w", err) + } + } + + if err = d.applyCheckpoint(ctx, cp); err != nil { + return nil, fmt.Errorf("applying recovered checkpoint: %w", err) + } + // Build a query which unions the results of each load subquery. var subqueries []string for _, b := range d.bindings { @@ -344,8 +349,6 @@ type binding struct { // path to where we store staging files rootStagingPath string - loadStagingPath string - storeStagingPath string // a binding needs to be merged if there are updates to existing documents // otherwise we just do a direct copy by moving all data from temporary table @@ -372,10 +375,7 @@ type binding struct { func (t *transactor) addBinding(ctx context.Context, target sql.Table) error { var b = &binding{target: target} - var idx = len(t.bindings) b.rootStagingPath = fmt.Sprintf("/Volumes/%s/%s/%s", t.cfg.CatalogName, target.Path[0], volumeName) - b.loadStagingPath = fmt.Sprintf("%s/%d_load.json", b.rootStagingPath, idx) - b.storeStagingPath = fmt.Sprintf("%s/%d_store.json", b.rootStagingPath, idx) for _, m := range []struct { sql *string @@ -437,13 +437,6 @@ func (t *transactor) addBinding(ctx context.Context, target sql.Table) error { func (d *transactor) Load(it *pm.LoadIterator, loaded func(int, json.RawMessage) error) error { var ctx = d.Context(it.Context()) - // Delete existing staging files - for _, b := range d.bindings { - if err := d.wsClient.Files.DeleteByFilePath(ctx, b.loadStagingPath); err != nil && !strings.Contains(fmt.Sprintf("%s", err), "Not Found") { - return fmt.Errorf("deleting staging file %q: %w", b.loadStagingPath, err) - } - } - var localFiles = make(map[int]*os.File) for it.Next() { var b = d.bindings[it.Binding] @@ -460,7 +453,7 @@ func (d *transactor) Load(it *pm.LoadIterator, loaded func(int, json.RawMessage) var jsonMap = make(map[string]interface{}, len(b.target.Keys)) for i, col := range b.target.Keys { - // TODO: what happens here in case of nested keys? + // FIXME: what happens here in case of nested keys? jsonMap[col.Field] = converted[i] } @@ -481,8 +474,14 @@ func (d *transactor) Load(it *pm.LoadIterator, loaded func(int, json.RawMessage) for binding, _ := range localFiles { var b = d.bindings[binding] + var randomKey, err = uuid.NewRandom() + if err != nil { + return fmt.Errorf("generating random key for file: %w", err) + } + var source = fmt.Sprintf("%s/%d_load.json", d.localStagingPath, binding) - var destination = b.loadStagingPath + var destination = fmt.Sprintf("%s/%d_%s_load.json", b.rootStagingPath, binding, randomKey) + if bs, err := os.ReadFile(source); err != nil { return err } else { @@ -545,16 +544,13 @@ func (d *transactor) Load(it *pm.LoadIterator, loaded func(int, json.RawMessage) return nil } +type checkpoint struct { + Queries []string +} + func (d *transactor) Store(it *pm.StoreIterator) (_ pm.StartCommitFunc, err error) { ctx := it.Context() - // Delete existing staging files - for _, b := range d.bindings { - if err := d.wsClient.Files.DeleteByFilePath(ctx, b.storeStagingPath); err != nil && !strings.Contains(fmt.Sprintf("%s", err), "Not Found") { - return nil, fmt.Errorf("deleting staging file %q: %w", b.storeStagingPath, err) - } - } - var localFiles = make(map[int]*os.File) for it.Next() { var b = d.bindings[it.Binding] @@ -602,7 +598,12 @@ func (d *transactor) Store(it *pm.StoreIterator) (_ pm.StartCommitFunc, err erro f.Close() } - // Upload the staged files + // Upload the staged files and build a list of merge and truncate queries that need to be run + // to effectively commit the files into destination tables. These queries are stored + // in the checkpoint so that if the connector is restarted in middle of a commit + // it can run the same queries on the next startup. This is the pattern for + // recovery log being authoritative and the connector idempotently applies a commit + var queries []string for binding, _ := range localFiles { var b = d.bindings[binding] @@ -611,8 +612,14 @@ func (d *transactor) Store(it *pm.StoreIterator) (_ pm.StartCommitFunc, err erro continue } + var randomKey, err = uuid.NewRandom() + if err != nil { + return nil, fmt.Errorf("generating random key for file: %w", err) + } + var source = fmt.Sprintf("%s/%d_store.json", d.localStagingPath, binding) - var destination = b.storeStagingPath + var destination = fmt.Sprintf("%s/%d_%s_store.json", b.rootStagingPath, binding, randomKey) + if bs, err := os.ReadFile(source); err != nil { return nil, err } else { @@ -630,90 +637,50 @@ func (d *transactor) Store(it *pm.StoreIterator) (_ pm.StartCommitFunc, err erro } // COPY INTO temporary load table from staged files - if _, err := d.store.conn.ExecContext(ctx, b.copyIntoStore); err != nil { + if _, err := d.store.conn.ExecContext(ctx, renderWithFiles(b.copyIntoStore, destination)); err != nil { return nil, fmt.Errorf("store: writing to temporary table: %w", err) } + + queries = append(queries, renderWithFiles(b.mergeInto, destination), b.truncateStoreSQL) } return func(ctx context.Context, runtimeCheckpoint *protocol.Checkpoint, runtimeAckCh <-chan struct{}) (*pf.ConnectorState, pf.OpFuture) { - return nil, pf.RunAsyncOperation(func() error { - // TODO: Databricks does not support transactions across tables, how can we - // handle the fence update in this case? - var err error - if d.store.fence.Checkpoint, err = runtimeCheckpoint.Marshal(); err != nil { - return fmt.Errorf("marshalling checkpoint: %w", err) - } + var cp = checkpoint{Queries: queries} - var fenceUpdate strings.Builder - if err := tplUpdateFence.Execute(&fenceUpdate, d.store.fence); err != nil { - return fmt.Errorf("evaluating fence template: %w", err) - } + var checkpointJSON, err = json.Marshal(cp) + if err != nil { + return nil, pf.FinishedOperation(fmt.Errorf("creating checkpoint json: %w", err)) + } - if results, err := d.store.conn.ExecContext(ctx, fenceUpdate.String()); err != nil { - return fmt.Errorf("updating flow checkpoint: %w", err) - } else if rowsAffected, err := results.RowsAffected(); err != nil { - return fmt.Errorf("updating flow checkpoint (rows affected): %w", err) - } else if rowsAffected < 1 { - return fmt.Errorf("This instance was fenced off by another") + var commitOp = pf.RunAsyncOperation(func() error { + select { + case <-runtimeAckCh: + return d.applyCheckpoint(ctx, cp) + case <-ctx.Done(): + return ctx.Err() } + }) - for idx, b := range d.bindings { - if b.needsMerge { - log.WithFields(log.Fields{ - "mergeInto": b.mergeInto, - }).Warn("running merge") - - if rows, err := d.store.conn.QueryContext(ctx, fmt.Sprintf("SELECT * FROM flow_temp_store_table_%d;", idx)); err != nil { - return fmt.Errorf("reading from load table: %w", err) - } else { - defer rows.Close() - for rows.Next() { - var cols, _ = rows.Columns() - var columns = make([]interface{}, len(cols)) - var columnPtrs = make([]interface{}, len(cols)) - - for i, _ := range cols { - columnPtrs[i] = &columns[i] - } - - if err = rows.Scan(columnPtrs...); err != nil { - return fmt.Errorf("scanning document: %w", err) - } else { - var m = make(map[string]interface{}) - for i, col := range cols { - m[col] = columns[i] - } - log.WithFields(log.Fields{ - "m": m, - }).Warn("scanned document") - } - } - - if err = rows.Err(); err != nil { - return fmt.Errorf("querying Loads: %w", err) - } - } + return &pf.ConnectorState{UpdatedJson: checkpointJSON}, commitOp + }, nil +} - if _, err := d.store.conn.ExecContext(ctx, b.mergeInto); err != nil { - return fmt.Errorf("store merge on %q: %w", b.target.Identifier, err) - } - } else { - log.WithFields(log.Fields{ - "mergeInto": b.copyIntoDirect, - }).Warn("running direct copy") - if _, err := d.store.conn.ExecContext(ctx, b.copyIntoDirect); err != nil { - return fmt.Errorf("store direct copy on %q: %w", b.target.Identifier, err) - } - } +func renderWithFiles(tpl string, files ...string) string { + var s = strings.Join(files, "','") + s = "'" + s + "'" - if _, err = d.store.conn.ExecContext(ctx, b.truncateStoreSQL); err != nil { - return fmt.Errorf("truncating store table: %w", err) - } - } + return fmt.Sprintf(tpl, s) +} - return nil - }) - }, nil +// applyCheckpoint merges data from temporary table to main table +func (d *transactor) applyCheckpoint(ctx context.Context, cp checkpoint) error { + for _, q := range cp.Queries { + if _, err := d.store.conn.ExecContext(ctx, q); err != nil { + return fmt.Errorf("query %q failed: %w", q, err) + } + } + + return nil } func (d *transactor) Destroy() { diff --git a/materialize-databricks/driver_test.go b/materialize-databricks/driver_test.go index be4605e9a5..d21843cff6 100644 --- a/materialize-databricks/driver_test.go +++ b/materialize-databricks/driver_test.go @@ -13,7 +13,7 @@ import ( sql "github.com/estuary/connectors/materialize-sql" ) -func mustGetCfg(t *testing.T) config { +/*func mustGetCfg(t *testing.T) config { if os.Getenv("TESTDB") != "yes" { t.Skipf("skipping %q: ${TESTDB} != \"yes\"", t.Name()) return config{} @@ -39,43 +39,7 @@ func mustGetCfg(t *testing.T) config { } return out -} - -func TestFencingCases(t *testing.T) { - // Because of the number of round-trips required for this test to run it is not run normally. - // Enable it via the TESTDB environment variable. It will take several minutes for this test to - // complete (you should run it with a sufficient -timeout value). - cfg := mustGetCfg(t) - - client := client{uri: cfg.ToURI()} - - ctx := context.Background() - - sql.RunFenceTestCases(t, - client, - []string{"temp_test_fencing_checkpoints"}, - databricksDialect, - tplCreateTargetTable, - func(table sql.Table, fence sql.Fence) error { - var err = client.withDB(func(db *stdsql.DB) error { - var fenceUpdate strings.Builder - if err := tplUpdateFence.Execute(&fenceUpdate, fence); err != nil { - return fmt.Errorf("evaluating fence template: %w", err) - } - var _, err = db.Exec(fenceUpdate.String()) - return err - }) - return err - }, - func(table sql.Table) (out string, err error) { - err = client.withDB(func(db *stdsql.DB) error { - out, err = sql.StdDumpTable(ctx, db, table) - return err - }) - return - }, - ) -} +}*/ func TestValidate(t *testing.T) { sql.RunValidateTestCases(t, databricksDialect) diff --git a/materialize-databricks/sqlgen.go b/materialize-databricks/sqlgen.go index ebc63781db..e430481c38 100644 --- a/materialize-databricks/sqlgen.go +++ b/materialize-databricks/sqlgen.go @@ -97,7 +97,13 @@ DROP TABLE IF EXISTS {{ template "temp_name_load" . }} -- Idempotent creation of the store table for staging new records. {{ define "createStoreTable" }} -CREATE TABLE IF NOT EXISTS {{ template "temp_name_store" . }} LIKE {{$.Identifier}}; +CREATE TABLE IF NOT EXISTS {{ template "temp_name_store" . }} ( + _metadata_file_name STRING, + {{- range $ind, $col := $.Columns }} + , + {{$col.Identifier}} {{$col.DDL}} + {{- end }} +); {{ end }} -- Templated truncation of the temporary store table: @@ -147,9 +153,8 @@ SELECT -1, NULL FROM {{ Literal $.StagingPath }} ) FILEFORMAT = JSON - FILES = ('{{ $.Table.Binding }}_store.json') + FILES = (%s) FORMAT_OPTIONS ( 'mode' = 'FAILFAST', 'inferTimestamp' = 'true' ) - COPY_OPTIONS ( 'force' = 'true' ) ; {{ end }} @@ -157,16 +162,16 @@ SELECT -1, NULL {{ define "copyIntoStore" }} COPY INTO {{ template "temp_name_store" $.Table }} FROM ( SELECT + _metadata.file_name as _metadata_file_name, {{ range $ind, $key := $.Table.Columns }} - {{- if $ind }}, {{ end -}} + , {{$key.Identifier -}} {{- end }} FROM {{ Literal $.StagingPath }} ) FILEFORMAT = JSON - FILES = ('{{ $.Table.Binding }}_store.json') + FILES = (%s) FORMAT_OPTIONS ( 'mode' = 'FAILFAST', 'inferTimestamp' = 'true' ) - COPY_OPTIONS ( 'force' = 'true' ) ; {{ end }} @@ -181,9 +186,8 @@ SELECT -1, NULL FROM {{ Literal $.StagingPath }} ) FILEFORMAT = JSON - FILES = ('{{ $.Table.Binding }}_load.json') + FILES = (%s) FORMAT_OPTIONS ( 'mode' = 'FAILFAST', 'inferTimestamp' = 'true' ) - COPY_OPTIONS ( 'force' = 'true' ) ; {{ end }} @@ -192,7 +196,8 @@ SELECT -1, NULL USING {{ template "temp_name_store" . }} AS r ON {{ range $ind, $key := $.Keys }} {{- if $ind }} AND {{ end -}} - l.{{ $key.Identifier }} = r.{{ $key.Identifier }} + l.{{ $key.Identifier }} = r.{{ $key.Identifier }} AND + r._metadata_file_name IN (%s) {{- end }} {{- if $.Document }} WHEN MATCHED AND r.{{ $.Document.Identifier }} <=> NULL THEN diff --git a/materialize-mysql/driver.go b/materialize-mysql/driver.go index 5a1a329bb7..800842159b 100644 --- a/materialize-mysql/driver.go +++ b/materialize-mysql/driver.go @@ -550,6 +550,7 @@ func prepareNewTransactor( ep *sql.Endpoint, fence sql.Fence, bindings []sql.Table, + open pm.Request_Open, ) (_ pm.Transactor, err error) { var d = &transactor{dialect: dialect, templates: templates} d.store.fence = fence diff --git a/materialize-postgres/driver.go b/materialize-postgres/driver.go index 7e153c5702..2860e5fa29 100644 --- a/materialize-postgres/driver.go +++ b/materialize-postgres/driver.go @@ -350,6 +350,7 @@ func newTransactor( ep *sql.Endpoint, fence sql.Fence, bindings []sql.Table, + open pm.Request_Open, ) (_ pm.Transactor, err error) { var d = &transactor{} d.store.fence = fence diff --git a/materialize-redshift/driver.go b/materialize-redshift/driver.go index 14ba895ed6..9e98beec45 100644 --- a/materialize-redshift/driver.go +++ b/materialize-redshift/driver.go @@ -461,6 +461,7 @@ func newTransactor( ep *sql.Endpoint, fence sql.Fence, bindings []sql.Table, + open pm.Request_Open, ) (_ pm.Transactor, err error) { var cfg = ep.Config.(*config) diff --git a/materialize-snowflake/snowflake.go b/materialize-snowflake/snowflake.go index 16a5410926..1184ff3583 100644 --- a/materialize-snowflake/snowflake.go +++ b/materialize-snowflake/snowflake.go @@ -429,6 +429,7 @@ func newTransactor( ep *sql.Endpoint, fence sql.Fence, bindings []sql.Table, + open pm.Request_Open, ) (_ pm.Transactor, err error) { var cfg = ep.Config.(*config) diff --git a/materialize-sql/driver.go b/materialize-sql/driver.go index 5d1d6b6789..686a4d7b70 100644 --- a/materialize-sql/driver.go +++ b/materialize-sql/driver.go @@ -347,7 +347,7 @@ func (d *Driver) NewTransactor(ctx context.Context, open pm.Request_Open) (pm.Tr } } - transactor, err := endpoint.NewTransactor(ctx, endpoint, fence, tables) + transactor, err := endpoint.NewTransactor(ctx, endpoint, fence, tables, open) if err != nil { return nil, nil, fmt.Errorf("building transactor: %w", err) } diff --git a/materialize-sql/endpoint.go b/materialize-sql/endpoint.go index d2480f9a47..befe50faef 100644 --- a/materialize-sql/endpoint.go +++ b/materialize-sql/endpoint.go @@ -94,7 +94,7 @@ type Endpoint struct { // which will be parsed into and validated from a resource configuration. NewResource func(*Endpoint) Resource // NewTransactor returns a Transactor ready for pm.RunTransactions. - NewTransactor func(ctx context.Context, _ *Endpoint, _ Fence, bindings []Table) (pm.Transactor, error) + NewTransactor func(ctx context.Context, _ *Endpoint, _ Fence, bindings []Table, open pm.Request_Open) (pm.Transactor, error) // CheckPrerequisites validates that the proposed configuration is able to connect to the // endpoint and perform the required actions. It assumes that any required SSH tunneling is // setup prior to its call. diff --git a/materialize-sqlite/sqlite.go b/materialize-sqlite/sqlite.go index d895981df7..d4100439e1 100644 --- a/materialize-sqlite/sqlite.go +++ b/materialize-sqlite/sqlite.go @@ -140,6 +140,7 @@ func newTransactor( ep *sql.Endpoint, fence sql.Fence, bindings []sql.Table, + open pm.Request_Open, ) (_ pm.Transactor, err error) { var d = &transactor{ dialect: &sqliteDialect, diff --git a/materialize-sqlserver/driver.go b/materialize-sqlserver/driver.go index 715ac5ecb9..655e64abb6 100644 --- a/materialize-sqlserver/driver.go +++ b/materialize-sqlserver/driver.go @@ -354,6 +354,7 @@ func prepareNewTransactor( ep *sql.Endpoint, fence sql.Fence, bindings []sql.Table, + open pm.Request_Open, ) (_ pm.Transactor, err error) { var d = &transactor{templates: templates} d.store.fence = fence