Skip to content

Commit

Permalink
materialize-databricks: remove fencing, switch to idempotent apply
Browse files Browse the repository at this point in the history
  • Loading branch information
mdibaiee committed Nov 21, 2023
1 parent 5541487 commit 88334f5
Show file tree
Hide file tree
Showing 12 changed files with 98 additions and 155 deletions.
1 change: 1 addition & 0 deletions materialize-bigquery/transactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func newTransactor(
ep *sql.Endpoint,
fence sql.Fence,
bindings []sql.Table,
open pm.Request_Open,
) (_ pm.Transactor, err error) {
cfg := ep.Config.(*config)

Expand Down
179 changes: 73 additions & 106 deletions materialize-databricks/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"os"
"context"
stdsql "database/sql"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
Expand All @@ -24,6 +23,7 @@ import (
pf "github.com/estuary/flow/go/protocols/flow"
pm "github.com/estuary/flow/go/protocols/materialize"
log "github.com/sirupsen/logrus"
"github.com/google/uuid"
"go.gazette.dev/core/consumer/protocol"
)

Expand Down Expand Up @@ -92,13 +92,13 @@ func newSqlServerDriver() *sql.Driver {
}).Info("connecting to databricks")

var metaBase sql.TablePath
var metaSpecs, metaCheckpoints = sql.MetaTables(metaBase)
var metaSpecs, _ = sql.MetaTables(metaBase)

return &sql.Endpoint{
Config: cfg,
Dialect: databricksDialect,
MetaSpecs: &metaSpecs,
MetaCheckpoints: &metaCheckpoints,
MetaCheckpoints: nil,
Client: client{uri: cfg.ToURI()},
CreateTableTemplate: tplCreateTargetTable,
NewResource: newTableConfig,
Expand Down Expand Up @@ -239,12 +239,7 @@ func (c client) ExecStatements(ctx context.Context, statements []string) error {
}

func (c client) InstallFence(ctx context.Context, checkpoints sql.Table, fence sql.Fence) (sql.Fence, error) {
var err = c.withDB(func(db *stdsql.DB) error {
var err error
fence, err = installFence(ctx, databricksDialect, db, checkpoints, fence, base64.StdEncoding.DecodeString)
return err
})
return fence, err
return sql.Fence{}, nil
}

func (c client) withDB(fn func(*stdsql.DB) error) error {
Expand All @@ -271,7 +266,6 @@ type transactor struct {
// Variables exclusively used by Store.
store struct {
conn *stdsql.Conn
fence sql.Fence
}
bindings []*binding
}
Expand All @@ -285,6 +279,7 @@ func newTransactor(
ep *sql.Endpoint,
fence sql.Fence,
bindings []sql.Table,
open pm.Request_Open,
) (_ pm.Transactor, err error) {
var cfg = ep.Config.(*config)

Expand All @@ -298,7 +293,6 @@ func newTransactor(
}

var d = &transactor{cfg: cfg, wsClient: wsClient}
d.store.fence = fence

// Establish connections.
if db, err := stdsql.Open("databricks", cfg.ToURI()); err != nil {
Expand All @@ -318,6 +312,17 @@ func newTransactor(
}
}

var cp checkpoint
if open.StateJson != nil {
if err := json.Unmarshal(open.StateJson, &cp); err != nil {
return nil, fmt.Errorf("parsing driver config: %w", err)
}
}

if err = d.applyCheckpoint(ctx, cp); err != nil {
return nil, fmt.Errorf("applying recovered checkpoint: %w", err)
}

// Build a query which unions the results of each load subquery.
var subqueries []string
for _, b := range d.bindings {
Expand All @@ -344,8 +349,6 @@ type binding struct {

// path to where we store staging files
rootStagingPath string
loadStagingPath string
storeStagingPath string

// a binding needs to be merged if there are updates to existing documents
// otherwise we just do a direct copy by moving all data from temporary table
Expand All @@ -372,10 +375,7 @@ type binding struct {
func (t *transactor) addBinding(ctx context.Context, target sql.Table) error {
var b = &binding{target: target}

var idx = len(t.bindings)
b.rootStagingPath = fmt.Sprintf("/Volumes/%s/%s/%s", t.cfg.CatalogName, target.Path[0], volumeName)
b.loadStagingPath = fmt.Sprintf("%s/%d_load.json", b.rootStagingPath, idx)
b.storeStagingPath = fmt.Sprintf("%s/%d_store.json", b.rootStagingPath, idx)

for _, m := range []struct {
sql *string
Expand Down Expand Up @@ -437,13 +437,6 @@ func (t *transactor) addBinding(ctx context.Context, target sql.Table) error {
func (d *transactor) Load(it *pm.LoadIterator, loaded func(int, json.RawMessage) error) error {
var ctx = d.Context(it.Context())

// Delete existing staging files
for _, b := range d.bindings {
if err := d.wsClient.Files.DeleteByFilePath(ctx, b.loadStagingPath); err != nil && !strings.Contains(fmt.Sprintf("%s", err), "Not Found") {
return fmt.Errorf("deleting staging file %q: %w", b.loadStagingPath, err)
}
}

var localFiles = make(map[int]*os.File)
for it.Next() {
var b = d.bindings[it.Binding]
Expand All @@ -460,7 +453,7 @@ func (d *transactor) Load(it *pm.LoadIterator, loaded func(int, json.RawMessage)

var jsonMap = make(map[string]interface{}, len(b.target.Keys))
for i, col := range b.target.Keys {
// TODO: what happens here in case of nested keys?
// FIXME: what happens here in case of nested keys?
jsonMap[col.Field] = converted[i]
}

Expand All @@ -481,8 +474,14 @@ func (d *transactor) Load(it *pm.LoadIterator, loaded func(int, json.RawMessage)
for binding, _ := range localFiles {
var b = d.bindings[binding]

var randomKey, err = uuid.NewRandom()
if err != nil {
return fmt.Errorf("generating random key for file: %w", err)
}

var source = fmt.Sprintf("%s/%d_load.json", d.localStagingPath, binding)
var destination = b.loadStagingPath
var destination = fmt.Sprintf("%s/%d_%s_load.json", b.rootStagingPath, binding, randomKey)

if bs, err := os.ReadFile(source); err != nil {
return err
} else {
Expand Down Expand Up @@ -545,16 +544,13 @@ func (d *transactor) Load(it *pm.LoadIterator, loaded func(int, json.RawMessage)
return nil
}

type checkpoint struct {
Queries []string
}

func (d *transactor) Store(it *pm.StoreIterator) (_ pm.StartCommitFunc, err error) {
ctx := it.Context()

// Delete existing staging files
for _, b := range d.bindings {
if err := d.wsClient.Files.DeleteByFilePath(ctx, b.storeStagingPath); err != nil && !strings.Contains(fmt.Sprintf("%s", err), "Not Found") {
return nil, fmt.Errorf("deleting staging file %q: %w", b.storeStagingPath, err)
}
}

var localFiles = make(map[int]*os.File)
for it.Next() {
var b = d.bindings[it.Binding]
Expand Down Expand Up @@ -602,7 +598,12 @@ func (d *transactor) Store(it *pm.StoreIterator) (_ pm.StartCommitFunc, err erro
f.Close()
}

// Upload the staged files
// Upload the staged files and build a list of merge and truncate queries that need to be run
// to effectively commit the files into destination tables. These queries are stored
// in the checkpoint so that if the connector is restarted in middle of a commit
// it can run the same queries on the next startup. This is the pattern for
// recovery log being authoritative and the connector idempotently applies a commit
var queries []string
for binding, _ := range localFiles {
var b = d.bindings[binding]

Expand All @@ -611,8 +612,14 @@ func (d *transactor) Store(it *pm.StoreIterator) (_ pm.StartCommitFunc, err erro
continue
}

var randomKey, err = uuid.NewRandom()
if err != nil {
return nil, fmt.Errorf("generating random key for file: %w", err)
}

var source = fmt.Sprintf("%s/%d_store.json", d.localStagingPath, binding)
var destination = b.storeStagingPath
var destination = fmt.Sprintf("%s/%d_%s_store.json", b.rootStagingPath, binding, randomKey)

if bs, err := os.ReadFile(source); err != nil {
return nil, err
} else {
Expand All @@ -630,90 +637,50 @@ func (d *transactor) Store(it *pm.StoreIterator) (_ pm.StartCommitFunc, err erro
}

// COPY INTO temporary load table from staged files
if _, err := d.store.conn.ExecContext(ctx, b.copyIntoStore); err != nil {
if _, err := d.store.conn.ExecContext(ctx, renderWithFiles(b.copyIntoStore, destination)); err != nil {
return nil, fmt.Errorf("store: writing to temporary table: %w", err)
}

queries = append(queries, renderWithFiles(b.mergeInto, destination), b.truncateStoreSQL)
}

return func(ctx context.Context, runtimeCheckpoint *protocol.Checkpoint, runtimeAckCh <-chan struct{}) (*pf.ConnectorState, pf.OpFuture) {
return nil, pf.RunAsyncOperation(func() error {
// TODO: Databricks does not support transactions across tables, how can we
// handle the fence update in this case?
var err error
if d.store.fence.Checkpoint, err = runtimeCheckpoint.Marshal(); err != nil {
return fmt.Errorf("marshalling checkpoint: %w", err)
}
var cp = checkpoint{Queries: queries}

var fenceUpdate strings.Builder
if err := tplUpdateFence.Execute(&fenceUpdate, d.store.fence); err != nil {
return fmt.Errorf("evaluating fence template: %w", err)
}
var checkpointJSON, err = json.Marshal(cp)
if err != nil {
return nil, pf.FinishedOperation(fmt.Errorf("creating checkpoint json: %w", err))
}

if results, err := d.store.conn.ExecContext(ctx, fenceUpdate.String()); err != nil {
return fmt.Errorf("updating flow checkpoint: %w", err)
} else if rowsAffected, err := results.RowsAffected(); err != nil {
return fmt.Errorf("updating flow checkpoint (rows affected): %w", err)
} else if rowsAffected < 1 {
return fmt.Errorf("This instance was fenced off by another")
var commitOp = pf.RunAsyncOperation(func() error {
select {
case <-runtimeAckCh:
return d.applyCheckpoint(ctx, cp)
case <-ctx.Done():
return ctx.Err()
}
})

for idx, b := range d.bindings {
if b.needsMerge {
log.WithFields(log.Fields{
"mergeInto": b.mergeInto,
}).Warn("running merge")

if rows, err := d.store.conn.QueryContext(ctx, fmt.Sprintf("SELECT * FROM flow_temp_store_table_%d;", idx)); err != nil {
return fmt.Errorf("reading from load table: %w", err)
} else {
defer rows.Close()
for rows.Next() {
var cols, _ = rows.Columns()
var columns = make([]interface{}, len(cols))
var columnPtrs = make([]interface{}, len(cols))

for i, _ := range cols {
columnPtrs[i] = &columns[i]
}

if err = rows.Scan(columnPtrs...); err != nil {
return fmt.Errorf("scanning document: %w", err)
} else {
var m = make(map[string]interface{})
for i, col := range cols {
m[col] = columns[i]
}
log.WithFields(log.Fields{
"m": m,
}).Warn("scanned document")
}
}

if err = rows.Err(); err != nil {
return fmt.Errorf("querying Loads: %w", err)
}
}
return &pf.ConnectorState{UpdatedJson: checkpointJSON}, commitOp
}, nil
}

if _, err := d.store.conn.ExecContext(ctx, b.mergeInto); err != nil {
return fmt.Errorf("store merge on %q: %w", b.target.Identifier, err)
}
} else {
log.WithFields(log.Fields{
"mergeInto": b.copyIntoDirect,
}).Warn("running direct copy")
if _, err := d.store.conn.ExecContext(ctx, b.copyIntoDirect); err != nil {
return fmt.Errorf("store direct copy on %q: %w", b.target.Identifier, err)
}
}
func renderWithFiles(tpl string, files ...string) string {
var s = strings.Join(files, "','")
s = "'" + s + "'"

if _, err = d.store.conn.ExecContext(ctx, b.truncateStoreSQL); err != nil {
return fmt.Errorf("truncating store table: %w", err)
}
}
return fmt.Sprintf(tpl, s)
}

return nil
})
}, nil
// applyCheckpoint merges data from temporary table to main table
func (d *transactor) applyCheckpoint(ctx context.Context, cp checkpoint) error {
for _, q := range cp.Queries {
if _, err := d.store.conn.ExecContext(ctx, q); err != nil {
return fmt.Errorf("query %q failed: %w", q, err)
}
}

return nil
}

func (d *transactor) Destroy() {
Expand Down
40 changes: 2 additions & 38 deletions materialize-databricks/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
sql "github.com/estuary/connectors/materialize-sql"
)

func mustGetCfg(t *testing.T) config {
/*func mustGetCfg(t *testing.T) config {
if os.Getenv("TESTDB") != "yes" {
t.Skipf("skipping %q: ${TESTDB} != \"yes\"", t.Name())
return config{}
Expand All @@ -39,43 +39,7 @@ func mustGetCfg(t *testing.T) config {
}
return out
}

func TestFencingCases(t *testing.T) {
// Because of the number of round-trips required for this test to run it is not run normally.
// Enable it via the TESTDB environment variable. It will take several minutes for this test to
// complete (you should run it with a sufficient -timeout value).
cfg := mustGetCfg(t)

client := client{uri: cfg.ToURI()}

ctx := context.Background()

sql.RunFenceTestCases(t,
client,
[]string{"temp_test_fencing_checkpoints"},
databricksDialect,
tplCreateTargetTable,
func(table sql.Table, fence sql.Fence) error {
var err = client.withDB(func(db *stdsql.DB) error {
var fenceUpdate strings.Builder
if err := tplUpdateFence.Execute(&fenceUpdate, fence); err != nil {
return fmt.Errorf("evaluating fence template: %w", err)
}
var _, err = db.Exec(fenceUpdate.String())
return err
})
return err
},
func(table sql.Table) (out string, err error) {
err = client.withDB(func(db *stdsql.DB) error {
out, err = sql.StdDumpTable(ctx, db, table)
return err
})
return
},
)
}
}*/

func TestValidate(t *testing.T) {
sql.RunValidateTestCases(t, databricksDialect)
Expand Down
Loading

0 comments on commit 88334f5

Please sign in to comment.