Skip to content

Commit

Permalink
codec: support open protocol to decode chunk data (#266)
Browse files Browse the repository at this point in the history
  • Loading branch information
hongyunyan authored Sep 6, 2024
1 parent 1591484 commit d53ad6e
Show file tree
Hide file tree
Showing 20 changed files with 689 additions and 334 deletions.
6 changes: 0 additions & 6 deletions downstreamadapter/sink/helper/eventrouter/event_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,6 @@ func (s *EventRouter) GetTopicForDDL(ddl *model.DDLEvent) string {
return topicGenerator.Substitute(schema, table)
}

// GetPartitionForRowChange returns the target partition for row changes.
func (s *EventRouter) GetPartitionForRowChange(row *common.RowChangedEvent, partitionNum int32) (int32, string, error) {
return s.GetPartitionDispatcher(row.TableInfo.GetSchemaName(), row.TableInfo.GetTableName()).
GeneratePartitionIndexAndKey(row, partitionNum)
}

// GetPartitionForRowChange returns the target partition for row changes.
func (s *EventRouter) GetPartitionGeneratorForRowChange(tableInfo *common.TableInfo) partition.PartitionGenerator {
return s.GetPartitionDispatcher(tableInfo.GetSchemaName(), tableInfo.GetTableName())
Expand Down
27 changes: 16 additions & 11 deletions downstreamadapter/sink/helper/eventrouter/partition/columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,33 +42,38 @@ func newColumnsPartitionGenerator(columns []string) *ColumnsPartitionGenerator {
}
}

func (r *ColumnsPartitionGenerator) GeneratePartitionIndexAndKey(row *common.RowChangedEvent, partitionNum int32) (int32, string, error) {
func (r *ColumnsPartitionGenerator) GeneratePartitionIndexAndKey(row *common.RowDelta, partitionNum int32, tableInfo *common.TableInfo, commitTs uint64) (int32, string, error) {
r.lock.Lock()
defer r.lock.Unlock()
r.hasher.Reset()

r.hasher.Write([]byte(row.TableInfo.GetSchemaName()), []byte(row.TableInfo.GetTableName()))
r.hasher.Write([]byte(tableInfo.GetSchemaName()), []byte(tableInfo.GetTableName()))

dispatchCols := row.Columns
if len(dispatchCols) == 0 {
dispatchCols = row.PreColumns
rowData := row.Row
if rowData.IsEmpty() {
rowData = row.PreRow
}

offsets, ok := row.TableInfo.OffsetsByNames(r.Columns)
offsets, ok := tableInfo.OffsetsByNames(r.Columns)
if !ok {
log.Error("columns not found when dispatch event",
zap.Any("tableName", row.TableInfo.GetTableName()),
zap.Any("tableName", tableInfo.GetTableName()),
zap.Strings("columns", r.Columns))
return 0, "", errors.ErrDispatcherFailed.GenWithStack(
"columns not found when dispatch event, table: %v, columns: %v", row.TableInfo.GetTableName(), r.Columns)
"columns not found when dispatch event, table: %v, columns: %v", tableInfo.GetTableName(), r.Columns)
}

for idx := 0; idx < len(r.Columns); idx++ {
col := dispatchCols[offsets[idx]]
if col == nil {
colInfo := tableInfo.Columns[offsets[idx]]
value, err := common.FormatColVal(&rowData, colInfo, idx)
if err != nil {
// FIXME:
log.Panic("FormatColVal failed", zap.Error(err))
}
if value == nil {
continue
}
r.hasher.Write([]byte(r.Columns[idx]), []byte(model.ColumnValueString(col.Value)))
r.hasher.Write([]byte(r.Columns[idx]), []byte(model.ColumnValueString(value)))
}

sum32 := r.hasher.Sum32()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
type PartitionGenerator interface {
// GeneratePartitionIndexAndKey returns an index of partitions or a partition key for event.
// Concurrency Note: This method is thread-safe.
GeneratePartitionIndexAndKey(row *common.RowChangedEvent, partitionNum int32) (int32, string, error)
GeneratePartitionIndexAndKey(row *common.RowDelta, partitionNum int32, tableInfo *common.TableInfo, commitTs uint64) (int32, string, error)
}

func GetPartitionGenerator(rule string, scheme string, indexName string, columns []string) PartitionGenerator {
Expand Down
42 changes: 28 additions & 14 deletions downstreamadapter/sink/helper/eventrouter/partition/index_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,42 +39,56 @@ func newIndexValuePartitionGenerator(indexName string) *IndexValuePartitionGener
}
}

func (r *IndexValuePartitionGenerator) GeneratePartitionIndexAndKey(row *common.RowChangedEvent, partitionNum int32) (int32, string, error) {
func (r *IndexValuePartitionGenerator) GeneratePartitionIndexAndKey(row *common.RowDelta,
partitionNum int32,
tableInfo *common.TableInfo,
commitTs uint64,
) (int32, string, error) {
r.lock.Lock()
defer r.lock.Unlock()
r.hasher.Reset()
r.hasher.Write([]byte(row.TableInfo.GetSchemaName()), []byte(row.TableInfo.GetTableName()))
r.hasher.Write([]byte(tableInfo.GetSchemaName()), []byte(tableInfo.GetTableName()))

dispatchCols := row.Columns
if len(row.Columns) == 0 {
dispatchCols = row.PreColumns
rowData := row.Row
if rowData.IsEmpty() {
rowData = row.PreRow
}

// the most normal case, index-name is not set, use the handle key columns.
if r.IndexName == "" {
for _, col := range dispatchCols {
for idx, col := range tableInfo.Columns {
if col == nil {
continue
}
if col.Flag.IsHandleKey() {
r.hasher.Write([]byte(col.Name), []byte(model.ColumnValueString(col.Value)))
if tableInfo.ColumnsFlag[col.ID].IsHandleKey() {
value, err := common.FormatColVal(&rowData, col, idx)
if err != nil {
// FIXME:
log.Panic("FormatColVal failed", zap.Error(err))
}
r.hasher.Write([]byte(col.Name.O), []byte(model.ColumnValueString(value)))
}
}
} else {
names, offsets, ok := row.TableInfo.IndexByName(r.IndexName)
names, offsets, ok := tableInfo.IndexByName(r.IndexName)
if !ok {
log.Error("index not found when dispatch event",
zap.Any("tableName", row.TableInfo.GetTableName()),
zap.Any("tableName", tableInfo.GetTableName()),
zap.String("indexName", r.IndexName))
return 0, "", errors.ErrDispatcherFailed.GenWithStack(
"index not found when dispatch event, table: %v, index: %s", row.TableInfo.GetTableName(), r.IndexName)
"index not found when dispatch event, table: %v, index: %s", tableInfo.GetTableName(), r.IndexName)
}
for idx := 0; idx < len(names); idx++ {
col := dispatchCols[offsets[idx]]
if col == nil {
colInfo := tableInfo.Columns[offsets[idx]]
value, err := common.FormatColVal(&rowData, colInfo, idx)
if err != nil {
// FIXME:
log.Panic("FormatColVal failed", zap.Error(err))
}
if value == nil {
continue
}
r.hasher.Write([]byte(names[idx]), []byte(model.ColumnValueString(col.Value)))
r.hasher.Write([]byte(names[idx]), []byte(model.ColumnValueString(value)))
}
}

Expand Down
6 changes: 5 additions & 1 deletion downstreamadapter/sink/helper/eventrouter/partition/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ func newKeyPartitionGenerator(partitionKey string) *KeyPartitionGenerator {
}
}

func (t *KeyPartitionGenerator) GeneratePartitionIndexAndKey(*common.RowChangedEvent, int32) (int32, string, error) {
func (t *KeyPartitionGenerator) GeneratePartitionIndexAndKey(row *common.RowDelta,
partitionNum int32,
tableInfo *common.TableInfo,
commitTs uint64,
) (int32, string, error) {
return 0, t.partitionKey, nil
}
11 changes: 8 additions & 3 deletions downstreamadapter/sink/helper/eventrouter/partition/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,16 @@ func newTablePartitionGenerator() *TablePartitionGenerator {
}

// GeneratePartitionIndexAndKey returns the target partition to which a row changed event should be dispatched.
func (t *TablePartitionGenerator) GeneratePartitionIndexAndKey(row *common.RowChangedEvent, partitionNum int32) (int32, string, error) {
func (t *TablePartitionGenerator) GeneratePartitionIndexAndKey(
row *common.RowDelta,
partitionNum int32,
tableInfo *common.TableInfo,
commitTs uint64,
) (int32, string, error) {
t.lock.Lock()
defer t.lock.Unlock()
t.hasher.Reset()
// distribute partition by table
t.hasher.Write([]byte(row.TableInfo.GetSchemaName()), []byte(row.TableInfo.GetTableName()))
return int32(t.hasher.Sum32() % uint32(partitionNum)), row.TableInfo.TableName.String(), nil
t.hasher.Write([]byte(tableInfo.GetSchemaName()), []byte(tableInfo.GetTableName()))
return int32(t.hasher.Sum32() % uint32(partitionNum)), tableInfo.TableName.String(), nil
}
9 changes: 7 additions & 2 deletions downstreamadapter/sink/helper/eventrouter/partition/ts.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ func newTsPartitionGenerator() *TsPartitionGenerator {
return &TsPartitionGenerator{}
}

func (t *TsPartitionGenerator) GeneratePartitionIndexAndKey(row *common.RowChangedEvent, partitionNum int32) (int32, string, error) {
return int32(row.CommitTs % uint64(partitionNum)), strconv.FormatUint(row.CommitTs, 10), nil
func (t *TsPartitionGenerator) GeneratePartitionIndexAndKey(
row *common.RowDelta,
partitionNum int32,
tableInfo *common.TableInfo,
commitTs uint64,
) (int32, string, error) {
return int32(commitTs % uint64(partitionNum)), strconv.FormatUint(commitTs, 10), nil
}
26 changes: 10 additions & 16 deletions downstreamadapter/sink/kafka_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"context"
"net/url"

"github.com/flowbehappy/tigate/downstreamadapter/sink/helper/columnselector"
"github.com/flowbehappy/tigate/downstreamadapter/sink/helper/eventrouter"
"github.com/flowbehappy/tigate/downstreamadapter/sink/helper/topicmanager"
"github.com/flowbehappy/tigate/downstreamadapter/sink/types"
Expand Down Expand Up @@ -45,7 +44,7 @@ type KafkaSink struct {

protocol config.Protocol

columnSelector *columnselector.ColumnSelector
columnSelector *common.ColumnSelector
// eventRouter used to route events to the right topic and partition.
eventRouter *eventrouter.EventRouter
// topicManager used to manage topics.
Expand Down Expand Up @@ -98,7 +97,7 @@ func NewKafkaSink(changefeedID model.ChangeFeedID, sinkURI *url.URL, replicaConf
return nil, errors.Trace(err)
}

columnSelector, err := columnselector.New(replicaConfig)
columnSelector, err := common.New(replicaConfig)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -159,7 +158,7 @@ func (s *KafkaSink) AddDMLEvent(event *common.DMLEvent, tableProgress *types.Tab
return
}
partitonGenerator := s.eventRouter.GetPartitionGeneratorForRowChange(event.TableInfo)

selector := s.columnSelector.GetSelector(event.TableInfo.TableName.Schema, event.TableInfo.TableName.Table)
toRowCallback := func(postTxnFlushed []func(), totalCount uint64) func() {
var calledCount atomic.Uint64
// The callback of the last row will trigger the callback of the txn.
Expand All @@ -176,20 +175,12 @@ func (s *KafkaSink) AddDMLEvent(event *common.DMLEvent, tableProgress *types.Tab
rowCallback := toRowCallback(event.PostTxnFlushed, rowsCount)

for {
_, ok := event.GetNextRow()
row, ok := event.GetNextRow()
if !ok {
break
}
// FIXME: pass a real row
row := &common.RowChangedEvent{}
err = s.columnSelector.Apply(row)
if err != nil {
s.cancel(err)
log.Error("failed to do column selector for row", zap.Error(err))
return
}

index, key, err := partitonGenerator.GeneratePartitionIndexAndKey(row, partitionNum)
index, key, err := partitonGenerator.GeneratePartitionIndexAndKey(&row, partitionNum, event.TableInfo, event.CommitTs)
if err != nil {
s.cancel(err)
log.Error("failed to generate partition index and key for row", zap.Error(err))
Expand All @@ -204,8 +195,11 @@ func (s *KafkaSink) AddDMLEvent(event *common.DMLEvent, tableProgress *types.Tab
TotalPartition: partitionNum,
},
RowEvent: common.RowEvent{
Event: row,
Callback: rowCallback,
TableInfo: event.TableInfo,
CommitTs: event.CommitTs,
Event: row,
Callback: rowCallback,
ColumnSelector: selector,
},
}
}
Expand Down
87 changes: 3 additions & 84 deletions downstreamadapter/writer/sql_builder.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
package writer

import (
"fmt"
"math"
"strings"

"github.com/flowbehappy/tigate/pkg/common"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/parser/charset"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tiflow/pkg/quotes"
"go.uber.org/zap"
Expand Down Expand Up @@ -137,7 +131,7 @@ func getArgs(row *chunk.Row, tableInfo *common.TableInfo) ([]interface{}, error)
if col == nil || tableInfo.ColumnsFlag[col.ID].IsGeneratedColumn() {
continue
}
v, err := formatColVal(row, col, i)
v, err := common.FormatColVal(row, col, i)
if err != nil {
return nil, err
}
Expand All @@ -160,81 +154,6 @@ func getColumnList(tableInfo *common.TableInfo) string {
return b.String()
}

var emptyBytes = make([]byte, 0)

// getColumnValue returns the column value in the row
func formatColVal(row *chunk.Row, col *model.ColumnInfo, idx int) (
value interface{}, err error,
) {
var v interface{}
switch col.GetType() {
case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeNewDate, mysql.TypeTimestamp:
v = row.GetTime(idx).String()
case mysql.TypeDuration:
v = row.GetDuration(idx, 0).String()
case mysql.TypeJSON:
v = row.GetJSON(idx).String()
case mysql.TypeNewDecimal:
d := row.GetMyDecimal(idx)
if d == nil {
// nil takes 0 byte.
return nil, nil
}
v = d.String()
case mysql.TypeEnum, mysql.TypeSet:
v = row.GetEnum(idx).Value
case mysql.TypeBit:
d := row.GetDatum(idx, &col.FieldType)
dp := &d
// Encode bits as integers to avoid pingcap/tidb#10988 (which also affects MySQL itself)
v, err = dp.GetBinaryLiteral().ToInt(types.DefaultStmtNoWarningContext)
if err != nil {
return nil, err
}
case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar,
mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob:
b := row.GetBytes(idx)
if b == nil {
b = emptyBytes
}

v = b
case mysql.TypeFloat:
b := row.GetFloat32(idx)
if math.IsNaN(float64(b)) || math.IsInf(float64(b), 1) || math.IsInf(float64(b), -1) {
warn := fmt.Sprintf("the value is invalid in column: %f", v)
log.Warn(warn)
b = 0
}
v = b
case mysql.TypeDouble:
b := row.GetFloat64(idx)
if math.IsNaN(b) || math.IsInf(b, 1) || math.IsInf(b, -1) {
warn := fmt.Sprintf("the value is invalid in column: %f", v)
log.Warn(warn)
b = 0
}
v = b
default:
d := row.GetDatum(idx, &col.FieldType)
// NOTICE: GetValue() may return some types that go sql not support, which will cause sink DML fail
// Make specified convert upper if you need
// Go sql support type ref to: https://github.com/golang/go/blob/go1.17.4/src/database/sql/driver/types.go#L236
v = d.GetValue()
}

// If the column value type is []byte and charset is not binary, we get its string
// representation. Because if we use the byte array respresentation, the go-sql-driver
// will automatically set `_binary` charset for that column, which is not expected.
// See https://github.com/go-sql-driver/mysql/blob/ce134bfc/connection.go#L267
if col.GetCharset() != "" && col.GetCharset() != charset.CharsetBin {
if b, ok := v.([]byte); ok {
v = string(b)
}
}
return v, nil
}

// whereSlice returns the column names and values for the WHERE clause
func whereSlice(row *chunk.Row, tableInfo *common.TableInfo) ([]string, []interface{}) {
args := make([]interface{}, 0, len(tableInfo.Columns))
Expand All @@ -245,7 +164,7 @@ func whereSlice(row *chunk.Row, tableInfo *common.TableInfo) ([]string, []interf
continue
}
colNames = append(colNames, col.Name.O)
v, err := formatColVal(row, col, i)
v, err := common.FormatColVal(row, col, i)
if err != nil {
// FIXME: handle error
log.Panic("formatColVal failed", zap.Error(err))
Expand All @@ -257,7 +176,7 @@ func whereSlice(row *chunk.Row, tableInfo *common.TableInfo) ([]string, []interf
if len(colNames) == 0 {
for i, col := range tableInfo.Columns {
colNames = append(colNames, col.Name.O)
v, err := formatColVal(row, col, i)
v, err := common.FormatColVal(row, col, i)
if err != nil {
// FIXME: handle error
log.Panic("formatColVal failed", zap.Error(err))
Expand Down
Loading

0 comments on commit d53ad6e

Please sign in to comment.