Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inital commit; ignore generated columns #1

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion internal/db/postgres/context/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,9 @@ func getColumnsConfig(ctx context.Context, tx pgx.Tx, oid toolkit.Oid, version i
if ok {
column.CanonicalTypeName = canonicalType.Name
}
res = append(res, &column)
if !column.IsGenerated {
res = append(res, &column)
}
idx++
}

Expand Down
3 changes: 2 additions & 1 deletion internal/db/postgres/dumpers/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func (td *TableDumper) dumper(ctx context.Context, eg *errgroup.Group, w io.Writ
return fmt.Errorf("cannot initialize validation pipeline: %w", err)
}
} else {
log.Debug().Msg("table has transformers")
pipeline, err = NewTransformationPipeline(ctx, eg, td.table, w)
if err != nil {
return fmt.Errorf("cannot initialize transformation pipeline: %w", err)
Expand Down Expand Up @@ -127,7 +128,7 @@ func (td *TableDumper) process(ctx context.Context, tx pgx.Tx, w io.WriteCloser,

frontend := tx.Conn().PgConn().Frontend()
query, err := td.table.GetCopyFromStatement()
log.Debug().
log.Warn().
Str("query", query).
Msgf("dumping table %s.%s using pgcopy query", td.table.Schema, td.table.Name)
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions internal/db/postgres/dumpers/transformation_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func (tp *TransformationPipeline) TransformSync(ctx context.Context, r *toolkit.

func (tp *TransformationPipeline) TransformAsync(ctx context.Context, r *toolkit.Record) (*toolkit.Record, error) {
var err error
log.Debug().Msg("transforming async")
for _, w := range tp.transformationWindows {
_, err = w.Transform(ctx, r)
if err != nil {
Expand Down Expand Up @@ -172,6 +173,15 @@ func (tp *TransformationPipeline) Dump(ctx context.Context, data []byte) (err er
return NewDumpError(tp.table.Schema, tp.table.Name, tp.line, fmt.Errorf("error encoding RowDriver to []byte: %w", err))
}

// Print out the table column names
// log.Debug().Msg(fmt.Sprintf("Table columns: %s", tp.table.Columns))
// log.Debug().Msg(fmt.Sprintf("Table schema: %s", tp.table.Schema))
// Print out the table schema from the record
// log.Debug().Msg(fmt.Sprintf("Record schema: %s", tp.record.Driver.Table.Schema))
// log.Debug().Msg(tp.row.Decode(data[:len(data)-1]).Error())
// Print out the row
// log.Debug().Msg(fmt.Sprintf("Row: %s", tp.row))

_, err = tp.w.Write(res)
if err != nil {
return NewDumpError(tp.table.Schema, tp.table.Name, tp.line, fmt.Errorf("error writing dumped data: %w", err))
Expand Down
14 changes: 13 additions & 1 deletion internal/db/postgres/entries/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/greenmaskio/greenmask/internal/db/postgres/transformers/custom"
"github.com/greenmaskio/greenmask/internal/db/postgres/transformers/utils"
"github.com/greenmaskio/greenmask/pkg/toolkit"
"github.com/rs/zerolog/log"
)

// Table - godoc
Expand Down Expand Up @@ -83,6 +84,8 @@ func (t *Table) Entry() (*toc.Entry, error) {
columns = append(columns, fmt.Sprintf(`"%s"`, column.Name))
}
}
log.Warn().Msg("columns here")
log.Warn().Msg(strings.Join(columns, ""))

var query = `COPY "%s"."%s" (%s) FROM stdin`
var schemaName, tableName string
Expand Down Expand Up @@ -121,6 +124,7 @@ func (t *Table) Entry() (*toc.Entry, error) {
Owner: &owner,
Desc: &toc.TableDataDesc,
CopyStmt: &copyStmt,
Columns: columns,
Dependencies: dependencies,
NDeps: int32(len(dependencies)),
FileName: &fileName,
Expand All @@ -130,7 +134,15 @@ func (t *Table) Entry() (*toc.Entry, error) {
}

func (t *Table) GetCopyFromStatement() (string, error) {
query := fmt.Sprintf("COPY \"%s\".\"%s\" TO STDOUT", t.Schema, t.Name)

columns := make([]string, 0, len(t.Columns))
for _, column := range t.Columns {
if !column.IsGenerated {
columns = append(columns, fmt.Sprintf(`"%s"`, column.Name))
}
}

query := fmt.Sprintf("COPY \"%s\".\"%s\" (%s) TO STDOUT", t.Schema, t.Name, strings.Join(columns, ", "))
if t.Query != "" {
query = fmt.Sprintf("COPY (%s) TO STDOUT", t.Query)
}
Expand Down
26 changes: 24 additions & 2 deletions internal/db/postgres/restorers/table_insert_format.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,18 +168,24 @@ func (td *TableRestorerInsertFormat) streamInsertData(ctx context.Context, conn

func (td *TableRestorerInsertFormat) generateInsertStmt(row *pgcopy.Row, onConflictDoNothing bool) string {
var placeholders []string
for i := 0; i < row.Length(); i++ {
for i := 0; i < len(td.Entry.Columns); i++ {
placeholders = append(placeholders, fmt.Sprintf("$%d", i+1))
}
var onConflict string
if onConflictDoNothing {
onConflict = " ON CONFLICT DO NOTHING"
}

// var columns []string = make([]string, row.Length())
// for i := 0; i < row.Length(); i++ {
// columns[i] = fmt.Sprintf(`"%s"`, row.GetColumnName(i))
// }

res := fmt.Sprintf(
`INSERT INTO %s.%s VALUES (%s)%s`,
`INSERT INTO %s.%s (%s) VALUES (%s)%s`,
*td.Entry.Namespace,
*td.Entry.Tag,
strings.Join(td.Entry.Columns, ", "),
strings.Join(placeholders, ", "),
onConflict,
)
Expand All @@ -192,6 +198,22 @@ func (td *TableRestorerInsertFormat) insertDataOnConflictDoNothing(
if td.query == "" {
td.query = td.generateInsertStmt(row, td.doNothing)
}
// log.Warn().Msg(td.query)
// log.Warn().Msg(fmt.Sprintf("%v", getAllArguments(row)))
// log.Warn().Msg(fmt.Sprintf("%v", td.Entry.Columns))

// log.Warn().Msg(fmt.Sprintf("%v", row.Length()))
// log.Warn().Msg(fmt.Sprintf("%v", len(td.Entry.Columns)))
// log.Warn().Msg(fmt.Sprintf("%v", len(getAllArguments(row))))

// array to string with comma separated values

var displayString string
for i := 0; i < len(getAllArguments(row)); i++ {
displayString += fmt.Sprintf("%v, ", getAllArguments(row)[i])
}

// log.Warn().Msg(displayString)

// TODO: The implementation based on pgx.Conn.Exec is not efficient for bulk inserts.
// Consider rewrite to string literal that contains generated statement instead of using prepared statement
Expand Down
5 changes: 5 additions & 0 deletions internal/db/postgres/toc/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Entry struct {
Defn *string
DropStmt *string
CopyStmt *string
Columns []string
Dependencies []int32 /* dumpIds of objects this one depends on */
NDeps int32 /* number of Dependencies */
FileName *string
Expand Down Expand Up @@ -94,6 +95,10 @@ func (e *Entry) Copy() *Entry {
if e.FileName != nil {
res.FileName = NewObj(*e.FileName)
}
if e.Columns != nil {
res.Columns = make([]string, len(e.Columns))
copy(res.Columns, e.Columns)
}
return res
}

Expand Down
15 changes: 15 additions & 0 deletions internal/db/postgres/toc/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"io"
"slices"
"strconv"
"strings"

"github.com/rs/zerolog/log"
)
Expand Down Expand Up @@ -402,6 +403,20 @@ func (r *Reader) readEntries() ([]*Entry, error) {
return nil, fmt.Errorf("cannot read Defn: %w", err)
}
entry.CopyStmt = copyStmt

entry.Columns = make([]string, 0)
// pull column names from copyStmt
// example: COPY "public"."user_community_notification_preference" ("id", "person_id", "community_id", "notification_preference", "created_at", "updated_at", "church_slug") FROM stdin
if copyStmt != nil {
columns := strings.Split(*copyStmt, "(")
if len(columns) > 1 {
columns = strings.Split(columns[1], ")")
if len(columns) > 0 {
entry.Columns = strings.Split(columns[0], ",")
}
}
}

}

if r.version >= BackupVersions["1.6"] {
Expand Down