Skip to content

Commit

Permalink
*: fix sql builder to translate update and delete sql correctly (#257)
Browse files Browse the repository at this point in the history
* remove dml_helper

Signed-off-by: dongmen <[email protected]>

* remove dml_helper

Signed-off-by: dongmen <[email protected]>

* fix update and delete dml conversion

Signed-off-by: dongmen <[email protected]>

---------

Signed-off-by: dongmen <[email protected]>
  • Loading branch information
asddongmen authored Sep 5, 2024
1 parent 2c3ab0e commit 079a972
Show file tree
Hide file tree
Showing 8 changed files with 23 additions and 253 deletions.
201 changes: 0 additions & 201 deletions downstreamadapter/writer/dml_helper.go

This file was deleted.

3 changes: 1 addition & 2 deletions downstreamadapter/writer/mysql_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (w *MysqlWriter) execDDLWithMaxRetries(event *common.DDLEvent) error {

func (w *MysqlWriter) Flush(events []*common.TEvent, workerNum int) error {
dmls := w.prepareDMLs(events)
log.Info("fizz 2 prepare DMLs", zap.Any("dmlsCount", dmls.rowCount), zap.String("dmls", fmt.Sprintf("%v", dmls.sqls)), zap.Any("values", dmls.values), zap.Any("startTs", dmls.startTs), zap.Any("workerNum", workerNum))
log.Info("prepare DMLs", zap.Any("dmlsCount", dmls.rowCount), zap.Any("dmls", fmt.Sprintf("%v", dmls.sqls)), zap.Any("values", dmls.values), zap.Any("startTs", dmls.startTs), zap.Any("workerNum", workerNum))
if dmls.rowCount == 0 {
return nil
}
Expand Down Expand Up @@ -206,7 +206,6 @@ func (w *MysqlWriter) prepareDMLs(events []*common.TEvent) *preparedDMLs {
approximateSize := int64(0)

for _, event := range events {
log.Info("fizz prepareDMLs", zap.Any("event", event.Rows.ToString(event.TableInfo.GetFileSlice())))
if event.Len() == 0 {
continue
}
Expand Down
27 changes: 20 additions & 7 deletions downstreamadapter/writer/sql_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func buildDelete(tableInfo *common.TableInfo, row *common.Row) (string, []interf
quoteTable := tableInfo.TableName.QuoteString()
builder.WriteString("DELETE FROM " + quoteTable + " WHERE ")

colNames, wargs := whereClause(&row.PreRow, tableInfo)
colNames, wargs := whereSlice(&row.PreRow, tableInfo)
if len(wargs) == 0 {
return "", nil
}
Expand All @@ -83,7 +83,7 @@ func buildUpdate(tableInfo *common.TableInfo, row *common.Row) (string, []interf
builder.WriteString("UPDATE " + tableInfo.TableName.QuoteString() + " SET ")

columnNames := make([]string, 0, len(tableInfo.Columns))
allArgs := make([]interface{}, 0, len(tableInfo.Columns)*2)
allArgs := make([]interface{}, 0, len(tableInfo.Columns))
for _, col := range tableInfo.Columns {
if col == nil || tableInfo.ColumnsFlag[col.ID].IsGeneratedColumn() {
continue
Expand Down Expand Up @@ -111,7 +111,7 @@ func buildUpdate(tableInfo *common.TableInfo, row *common.Row) (string, []interf
}

builder.WriteString(" WHERE ")
colNames, wargs := whereClause(&row.PreRow, tableInfo)
colNames, wargs := whereSlice(&row.PreRow, tableInfo)
if len(wargs) == 0 {
return "", nil
}
Expand Down Expand Up @@ -235,14 +235,13 @@ func formatColVal(row *chunk.Row, col *model.ColumnInfo, idx int) (
return v, nil
}

// whereClause builds a parametric WHERE clause as following
// sql: `WHERE {} = ? AND {} > ?`
func whereClause(row *chunk.Row, tableInfo *common.TableInfo) ([]string, []interface{}) {
// whereSlice returns the column names and values for the WHERE clause
func whereSlice(row *chunk.Row, tableInfo *common.TableInfo) ([]string, []interface{}) {
args := make([]interface{}, 0, len(tableInfo.Columns))
colNames := make([]string, 0, len(tableInfo.Columns))
// Try to use unique key values when available
for i, col := range tableInfo.Columns {
if col == nil || tableInfo.ColumnsFlag[col.ID].IsHandleKey() {
if col == nil || !tableInfo.ColumnsFlag[col.ID].IsHandleKey() {
continue
}
colNames = append(colNames, col.Name.O)
Expand All @@ -253,9 +252,23 @@ func whereClause(row *chunk.Row, tableInfo *common.TableInfo) ([]string, []inter
}
args = append(args, v)
}

// if no explicit row id but force replicate, use all key-values in where condition
if len(colNames) == 0 {
for i, col := range tableInfo.Columns {
colNames = append(colNames, col.Name.O)
v, err := formatColVal(row, col, i)
if err != nil {
// FIXME: handle error
log.Panic("formatColVal failed", zap.Error(err))
}
args = append(args, v)
}
}
return colNames, args
}

// placeHolder returns a string with n placeholders separated by commas
// n must be greater or equal than 1, or the function will panic
func placeHolder(n int) string {
var builder strings.Builder
Expand Down
File renamed without changes.
2 changes: 0 additions & 2 deletions pkg/common/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/util/chunk"
"go.uber.org/zap"
)

type Event interface {
Expand Down Expand Up @@ -215,7 +214,6 @@ func (t *TEvent) AppendRow(raw *RawKVEntry,
if err != nil {
return err
}
log.Info("fizz TEvent.AppendRow", zap.Int("count", count), zap.Any("rowType", RowTypeToString(RowType)))
if count == 1 {
t.RowTypes = append(t.RowTypes, RowType)
} else if count == 2 {
Expand Down
1 change: 0 additions & 1 deletion pkg/eventservice/event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,6 @@ func (c *eventBroker) doScan(task *scanTask) {
}
var txnEvent *common.TEvent
for {
log.Info("fizz scan event", zap.Uint64("startTs", task.dataRange.StartTs), zap.Uint64("endTs", task.dataRange.EndTs))
//Node: The first event of the txn must return isNewTxn as true.
e, isNewTxn, err := iter.Next()
if err != nil {
Expand Down
34 changes: 1 addition & 33 deletions pkg/mounter/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,10 @@ import (
"go.uber.org/zap"
)

type chunkDecoder struct {
tableID int64
version uint64
decoder *rowcodec.ChunkDecoder
}

func (c *chunkDecoder) decode(value []byte, handle kv.Handle, chk *chunk.Chunk) error {
return c.decoder.DecodeToChunk(value, handle, chk)
}

func (m *mounter) rawKVToChunkV2(value []byte, tableInfo *common.TableInfo, chk *chunk.Chunk, handle kv.Handle) error {
if len(value) == 0 {
return nil
}
v, ok := m.chunkDecoders.Load(tableInfo.ID)
var d *chunkDecoder
if ok {
d = v.(*chunkDecoder)
if d.version != tableInfo.UpdateTS {
m.chunkDecoders.Delete(tableInfo.ID)
d = nil
} else {
err := d.decode(value, handle, chk)
if err != nil {
return errors.Trace(err)
}

}
}
handleColIDs, _, reqCols := tableInfo.GetRowColInfos()
// This function is used to set the default value for the column that
// is not in the raw data.
Expand Down Expand Up @@ -73,18 +48,11 @@ func (m *mounter) rawKVToChunkV2(value []byte, tableInfo *common.TableInfo, chk
return nil
}
decoder := rowcodec.NewChunkDecoder(reqCols, handleColIDs, defVal, m.tz)
d = &chunkDecoder{
tableID: tableInfo.ID,
version: tableInfo.UpdateTS,
decoder: decoder,
}
// cache it for later use
m.chunkDecoders.Store(tableInfo.ID, d)
err := d.decode(value, handle, chk)
err := decoder.DecodeToChunk(value, handle, chk)
if err != nil {
return errors.Trace(err)
}

return nil
}

Expand Down
Loading

0 comments on commit 079a972

Please sign in to comment.