Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

codec: support open protocol to decode chunk data #266

Merged
merged 1 commit into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions downstreamadapter/sink/helper/eventrouter/event_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,6 @@ func (s *EventRouter) GetTopicForDDL(ddl *model.DDLEvent) string {
return topicGenerator.Substitute(schema, table)
}

// GetPartitionForRowChange returns the target partition for row changes.
func (s *EventRouter) GetPartitionForRowChange(row *common.RowChangedEvent, partitionNum int32) (int32, string, error) {
return s.GetPartitionDispatcher(row.TableInfo.GetSchemaName(), row.TableInfo.GetTableName()).
GeneratePartitionIndexAndKey(row, partitionNum)
}

// GetPartitionForRowChange returns the target partition for row changes.
func (s *EventRouter) GetPartitionGeneratorForRowChange(tableInfo *common.TableInfo) partition.PartitionGenerator {
return s.GetPartitionDispatcher(tableInfo.GetSchemaName(), tableInfo.GetTableName())
Expand Down
27 changes: 16 additions & 11 deletions downstreamadapter/sink/helper/eventrouter/partition/columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,33 +42,38 @@ func newColumnsPartitionGenerator(columns []string) *ColumnsPartitionGenerator {
}
}

func (r *ColumnsPartitionGenerator) GeneratePartitionIndexAndKey(row *common.RowChangedEvent, partitionNum int32) (int32, string, error) {
func (r *ColumnsPartitionGenerator) GeneratePartitionIndexAndKey(row *common.RowDelta, partitionNum int32, tableInfo *common.TableInfo, commitTs uint64) (int32, string, error) {
r.lock.Lock()
defer r.lock.Unlock()
r.hasher.Reset()

r.hasher.Write([]byte(row.TableInfo.GetSchemaName()), []byte(row.TableInfo.GetTableName()))
r.hasher.Write([]byte(tableInfo.GetSchemaName()), []byte(tableInfo.GetTableName()))

dispatchCols := row.Columns
if len(dispatchCols) == 0 {
dispatchCols = row.PreColumns
rowData := row.Row
if rowData.IsEmpty() {
rowData = row.PreRow
}

offsets, ok := row.TableInfo.OffsetsByNames(r.Columns)
offsets, ok := tableInfo.OffsetsByNames(r.Columns)
if !ok {
log.Error("columns not found when dispatch event",
zap.Any("tableName", row.TableInfo.GetTableName()),
zap.Any("tableName", tableInfo.GetTableName()),
zap.Strings("columns", r.Columns))
return 0, "", errors.ErrDispatcherFailed.GenWithStack(
"columns not found when dispatch event, table: %v, columns: %v", row.TableInfo.GetTableName(), r.Columns)
"columns not found when dispatch event, table: %v, columns: %v", tableInfo.GetTableName(), r.Columns)
}

for idx := 0; idx < len(r.Columns); idx++ {
col := dispatchCols[offsets[idx]]
if col == nil {
colInfo := tableInfo.Columns[offsets[idx]]
value, err := common.FormatColVal(&rowData, colInfo, idx)
if err != nil {
// FIXME:
log.Panic("FormatColVal failed", zap.Error(err))
}
if value == nil {
continue
}
r.hasher.Write([]byte(r.Columns[idx]), []byte(model.ColumnValueString(col.Value)))
r.hasher.Write([]byte(r.Columns[idx]), []byte(model.ColumnValueString(value)))
}

sum32 := r.hasher.Sum32()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
type PartitionGenerator interface {
// GeneratePartitionIndexAndKey returns an index of partitions or a partition key for event.
// Concurrency Note: This method is thread-safe.
GeneratePartitionIndexAndKey(row *common.RowChangedEvent, partitionNum int32) (int32, string, error)
GeneratePartitionIndexAndKey(row *common.RowDelta, partitionNum int32, tableInfo *common.TableInfo, commitTs uint64) (int32, string, error)
}

func GetPartitionGenerator(rule string, scheme string, indexName string, columns []string) PartitionGenerator {
Expand Down
42 changes: 28 additions & 14 deletions downstreamadapter/sink/helper/eventrouter/partition/index_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,42 +39,56 @@ func newIndexValuePartitionGenerator(indexName string) *IndexValuePartitionGener
}
}

func (r *IndexValuePartitionGenerator) GeneratePartitionIndexAndKey(row *common.RowChangedEvent, partitionNum int32) (int32, string, error) {
func (r *IndexValuePartitionGenerator) GeneratePartitionIndexAndKey(row *common.RowDelta,
partitionNum int32,
tableInfo *common.TableInfo,
commitTs uint64,
) (int32, string, error) {
r.lock.Lock()
defer r.lock.Unlock()
r.hasher.Reset()
r.hasher.Write([]byte(row.TableInfo.GetSchemaName()), []byte(row.TableInfo.GetTableName()))
r.hasher.Write([]byte(tableInfo.GetSchemaName()), []byte(tableInfo.GetTableName()))

dispatchCols := row.Columns
if len(row.Columns) == 0 {
dispatchCols = row.PreColumns
rowData := row.Row
if rowData.IsEmpty() {
rowData = row.PreRow
}

// the most normal case, index-name is not set, use the handle key columns.
if r.IndexName == "" {
for _, col := range dispatchCols {
for idx, col := range tableInfo.Columns {
if col == nil {
continue
}
if col.Flag.IsHandleKey() {
r.hasher.Write([]byte(col.Name), []byte(model.ColumnValueString(col.Value)))
if tableInfo.ColumnsFlag[col.ID].IsHandleKey() {
value, err := common.FormatColVal(&rowData, col, idx)
if err != nil {
// FIXME:
log.Panic("FormatColVal failed", zap.Error(err))
}
r.hasher.Write([]byte(col.Name.O), []byte(model.ColumnValueString(value)))
}
}
} else {
names, offsets, ok := row.TableInfo.IndexByName(r.IndexName)
names, offsets, ok := tableInfo.IndexByName(r.IndexName)
if !ok {
log.Error("index not found when dispatch event",
zap.Any("tableName", row.TableInfo.GetTableName()),
zap.Any("tableName", tableInfo.GetTableName()),
zap.String("indexName", r.IndexName))
return 0, "", errors.ErrDispatcherFailed.GenWithStack(
"index not found when dispatch event, table: %v, index: %s", row.TableInfo.GetTableName(), r.IndexName)
"index not found when dispatch event, table: %v, index: %s", tableInfo.GetTableName(), r.IndexName)
}
for idx := 0; idx < len(names); idx++ {
col := dispatchCols[offsets[idx]]
if col == nil {
colInfo := tableInfo.Columns[offsets[idx]]
value, err := common.FormatColVal(&rowData, colInfo, idx)
if err != nil {
// FIXME:
log.Panic("FormatColVal failed", zap.Error(err))
}
if value == nil {
continue
}
r.hasher.Write([]byte(names[idx]), []byte(model.ColumnValueString(col.Value)))
r.hasher.Write([]byte(names[idx]), []byte(model.ColumnValueString(value)))
}
}

Expand Down
6 changes: 5 additions & 1 deletion downstreamadapter/sink/helper/eventrouter/partition/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ func newKeyPartitionGenerator(partitionKey string) *KeyPartitionGenerator {
}
}

func (t *KeyPartitionGenerator) GeneratePartitionIndexAndKey(*common.RowChangedEvent, int32) (int32, string, error) {
func (t *KeyPartitionGenerator) GeneratePartitionIndexAndKey(row *common.RowDelta,
partitionNum int32,
tableInfo *common.TableInfo,
commitTs uint64,
) (int32, string, error) {
return 0, t.partitionKey, nil
}
11 changes: 8 additions & 3 deletions downstreamadapter/sink/helper/eventrouter/partition/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,16 @@ func newTablePartitionGenerator() *TablePartitionGenerator {
}

// GeneratePartitionIndexAndKey returns the target partition to which a row changed event should be dispatched.
func (t *TablePartitionGenerator) GeneratePartitionIndexAndKey(row *common.RowChangedEvent, partitionNum int32) (int32, string, error) {
func (t *TablePartitionGenerator) GeneratePartitionIndexAndKey(
row *common.RowDelta,
partitionNum int32,
tableInfo *common.TableInfo,
commitTs uint64,
) (int32, string, error) {
t.lock.Lock()
defer t.lock.Unlock()
t.hasher.Reset()
// distribute partition by table
t.hasher.Write([]byte(row.TableInfo.GetSchemaName()), []byte(row.TableInfo.GetTableName()))
return int32(t.hasher.Sum32() % uint32(partitionNum)), row.TableInfo.TableName.String(), nil
t.hasher.Write([]byte(tableInfo.GetSchemaName()), []byte(tableInfo.GetTableName()))
return int32(t.hasher.Sum32() % uint32(partitionNum)), tableInfo.TableName.String(), nil
}
9 changes: 7 additions & 2 deletions downstreamadapter/sink/helper/eventrouter/partition/ts.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ func newTsPartitionGenerator() *TsPartitionGenerator {
return &TsPartitionGenerator{}
}

func (t *TsPartitionGenerator) GeneratePartitionIndexAndKey(row *common.RowChangedEvent, partitionNum int32) (int32, string, error) {
return int32(row.CommitTs % uint64(partitionNum)), strconv.FormatUint(row.CommitTs, 10), nil
func (t *TsPartitionGenerator) GeneratePartitionIndexAndKey(
row *common.RowDelta,
partitionNum int32,
tableInfo *common.TableInfo,
commitTs uint64,
) (int32, string, error) {
return int32(commitTs % uint64(partitionNum)), strconv.FormatUint(commitTs, 10), nil
}
26 changes: 10 additions & 16 deletions downstreamadapter/sink/kafka_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"context"
"net/url"

"github.com/flowbehappy/tigate/downstreamadapter/sink/helper/columnselector"
"github.com/flowbehappy/tigate/downstreamadapter/sink/helper/eventrouter"
"github.com/flowbehappy/tigate/downstreamadapter/sink/helper/topicmanager"
"github.com/flowbehappy/tigate/downstreamadapter/sink/types"
Expand Down Expand Up @@ -45,7 +44,7 @@ type KafkaSink struct {

protocol config.Protocol

columnSelector *columnselector.ColumnSelector
columnSelector *common.ColumnSelector
// eventRouter used to route events to the right topic and partition.
eventRouter *eventrouter.EventRouter
// topicManager used to manage topics.
Expand Down Expand Up @@ -98,7 +97,7 @@ func NewKafkaSink(changefeedID model.ChangeFeedID, sinkURI *url.URL, replicaConf
return nil, errors.Trace(err)
}

columnSelector, err := columnselector.New(replicaConfig)
columnSelector, err := common.New(replicaConfig)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -159,7 +158,7 @@ func (s *KafkaSink) AddDMLEvent(event *common.DMLEvent, tableProgress *types.Tab
return
}
partitonGenerator := s.eventRouter.GetPartitionGeneratorForRowChange(event.TableInfo)

selector := s.columnSelector.GetSelector(event.TableInfo.TableName.Schema, event.TableInfo.TableName.Table)
toRowCallback := func(postTxnFlushed []func(), totalCount uint64) func() {
var calledCount atomic.Uint64
// The callback of the last row will trigger the callback of the txn.
Expand All @@ -176,20 +175,12 @@ func (s *KafkaSink) AddDMLEvent(event *common.DMLEvent, tableProgress *types.Tab
rowCallback := toRowCallback(event.PostTxnFlushed, rowsCount)

for {
_, ok := event.GetNextRow()
row, ok := event.GetNextRow()
if !ok {
break
}
// FIXME: pass a real row
row := &common.RowChangedEvent{}
err = s.columnSelector.Apply(row)
if err != nil {
s.cancel(err)
log.Error("failed to do column selector for row", zap.Error(err))
return
}

index, key, err := partitonGenerator.GeneratePartitionIndexAndKey(row, partitionNum)
index, key, err := partitonGenerator.GeneratePartitionIndexAndKey(&row, partitionNum, event.TableInfo, event.CommitTs)
if err != nil {
s.cancel(err)
log.Error("failed to generate partition index and key for row", zap.Error(err))
Expand All @@ -204,8 +195,11 @@ func (s *KafkaSink) AddDMLEvent(event *common.DMLEvent, tableProgress *types.Tab
TotalPartition: partitionNum,
},
RowEvent: common.RowEvent{
Event: row,
Callback: rowCallback,
TableInfo: event.TableInfo,
CommitTs: event.CommitTs,
Event: row,
Callback: rowCallback,
ColumnSelector: selector,
},
}
}
Expand Down
87 changes: 3 additions & 84 deletions downstreamadapter/writer/sql_builder.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
package writer

import (
"fmt"
"math"
"strings"

"github.com/flowbehappy/tigate/pkg/common"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/parser/charset"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tiflow/pkg/quotes"
"go.uber.org/zap"
Expand Down Expand Up @@ -137,7 +131,7 @@ func getArgs(row *chunk.Row, tableInfo *common.TableInfo) ([]interface{}, error)
if col == nil || tableInfo.ColumnsFlag[col.ID].IsGeneratedColumn() {
continue
}
v, err := formatColVal(row, col, i)
v, err := common.FormatColVal(row, col, i)
if err != nil {
return nil, err
}
Expand All @@ -160,81 +154,6 @@ func getColumnList(tableInfo *common.TableInfo) string {
return b.String()
}

var emptyBytes = make([]byte, 0)

// getColumnValue returns the column value in the row
func formatColVal(row *chunk.Row, col *model.ColumnInfo, idx int) (
value interface{}, err error,
) {
var v interface{}
switch col.GetType() {
case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeNewDate, mysql.TypeTimestamp:
v = row.GetTime(idx).String()
case mysql.TypeDuration:
v = row.GetDuration(idx, 0).String()
case mysql.TypeJSON:
v = row.GetJSON(idx).String()
case mysql.TypeNewDecimal:
d := row.GetMyDecimal(idx)
if d == nil {
// nil takes 0 byte.
return nil, nil
}
v = d.String()
case mysql.TypeEnum, mysql.TypeSet:
v = row.GetEnum(idx).Value
case mysql.TypeBit:
d := row.GetDatum(idx, &col.FieldType)
dp := &d
// Encode bits as integers to avoid pingcap/tidb#10988 (which also affects MySQL itself)
v, err = dp.GetBinaryLiteral().ToInt(types.DefaultStmtNoWarningContext)
if err != nil {
return nil, err
}
case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar,
mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob:
b := row.GetBytes(idx)
if b == nil {
b = emptyBytes
}

v = b
case mysql.TypeFloat:
b := row.GetFloat32(idx)
if math.IsNaN(float64(b)) || math.IsInf(float64(b), 1) || math.IsInf(float64(b), -1) {
warn := fmt.Sprintf("the value is invalid in column: %f", v)
log.Warn(warn)
b = 0
}
v = b
case mysql.TypeDouble:
b := row.GetFloat64(idx)
if math.IsNaN(b) || math.IsInf(b, 1) || math.IsInf(b, -1) {
warn := fmt.Sprintf("the value is invalid in column: %f", v)
log.Warn(warn)
b = 0
}
v = b
default:
d := row.GetDatum(idx, &col.FieldType)
// NOTICE: GetValue() may return some types that go sql not support, which will cause sink DML fail
// Make specified convert upper if you need
// Go sql support type ref to: https://github.com/golang/go/blob/go1.17.4/src/database/sql/driver/types.go#L236
v = d.GetValue()
}

// If the column value type is []byte and charset is not binary, we get its string
// representation. Because if we use the byte array respresentation, the go-sql-driver
// will automatically set `_binary` charset for that column, which is not expected.
// See https://github.com/go-sql-driver/mysql/blob/ce134bfc/connection.go#L267
if col.GetCharset() != "" && col.GetCharset() != charset.CharsetBin {
if b, ok := v.([]byte); ok {
v = string(b)
}
}
return v, nil
}

// whereSlice returns the column names and values for the WHERE clause
func whereSlice(row *chunk.Row, tableInfo *common.TableInfo) ([]string, []interface{}) {
args := make([]interface{}, 0, len(tableInfo.Columns))
Expand All @@ -245,7 +164,7 @@ func whereSlice(row *chunk.Row, tableInfo *common.TableInfo) ([]string, []interf
continue
}
colNames = append(colNames, col.Name.O)
v, err := formatColVal(row, col, i)
v, err := common.FormatColVal(row, col, i)
if err != nil {
// FIXME: handle error
log.Panic("formatColVal failed", zap.Error(err))
Expand All @@ -257,7 +176,7 @@ func whereSlice(row *chunk.Row, tableInfo *common.TableInfo) ([]string, []interf
if len(colNames) == 0 {
for i, col := range tableInfo.Columns {
colNames = append(colNames, col.Name.O)
v, err := formatColVal(row, col, i)
v, err := common.FormatColVal(row, col, i)
if err != nil {
// FIXME: handle error
log.Panic("formatColVal failed", zap.Error(err))
Expand Down
Loading
Loading