From c06355edb95f270611fb92cf069f4ae32315930f Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Sun, 8 Sep 2024 21:10:09 +0800 Subject: [PATCH 1/2] add test for column selector and open codec --- downstreamadapter/sink/kafka_sink.go | 2 +- pkg/common/column_selector.go | 156 ++++------------- pkg/common/column_selector_test.go | 169 +++++++++++++++++++ pkg/common/row_change.go | 2 +- pkg/mounter/util.go | 6 + pkg/sink/codec/open/codec.go | 239 ++++++++++++++++++++++----- pkg/sink/codec/open/codec_test.go | 41 +++++ 7 files changed, 452 insertions(+), 163 deletions(-) create mode 100644 pkg/common/column_selector_test.go create mode 100644 pkg/sink/codec/open/codec_test.go diff --git a/downstreamadapter/sink/kafka_sink.go b/downstreamadapter/sink/kafka_sink.go index 3c4f40dcf..5d5864ebf 100644 --- a/downstreamadapter/sink/kafka_sink.go +++ b/downstreamadapter/sink/kafka_sink.go @@ -44,7 +44,7 @@ type KafkaSink struct { protocol config.Protocol - columnSelector *common.ColumnSelector + columnSelector *common.ColumnSelectors // eventRouter used to route events to the right topic and partition. eventRouter *eventrouter.EventRouter // topicManager used to manage topics. diff --git a/pkg/common/column_selector.go b/pkg/common/column_selector.go index 15a35f828..eda7ffcc1 100644 --- a/pkg/common/column_selector.go +++ b/pkg/common/column_selector.go @@ -16,20 +16,32 @@ package common import ( "github.com/pingcap/tidb/pkg/parser/model" filter "github.com/pingcap/tidb/pkg/util/table-filter" - "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher" - "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher/partition" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/errors" ) -type Selector struct { +type Selector interface { + Select(colInfo *model.ColumnInfo) bool +} + +type DefaultColumnSelector struct{} + +func NewDefaultColumnSelector() *DefaultColumnSelector { + return &DefaultColumnSelector{} +} + +func (d *DefaultColumnSelector) Select(colInfo *model.ColumnInfo) bool { + return true +} + +type ColumnSelector struct { tableF filter.Filter columnM filter.ColumnFilter } -func newSelector( +func newColumnSelector( rule *config.ColumnSelector, caseSensitive bool, -) (*Selector, error) { +) (*ColumnSelector, error) { tableM, err := filter.Parse(rule.Matcher) if err != nil { return nil, errors.WrapError(errors.ErrFilterRuleInvalid, err, rule.Matcher) @@ -42,154 +54,50 @@ func newSelector( return nil, errors.WrapError(errors.ErrFilterRuleInvalid, err, rule.Columns) } - return &Selector{ + return &ColumnSelector{ tableF: tableM, columnM: columnM, }, nil } // Match implements Transformer interface -func (s *Selector) Match(schema, table string) bool { +func (s *ColumnSelector) match(schema, table string) bool { return s.tableF.MatchTable(schema, table) } // Select decide whether the col should be encoded or not. -func (s *Selector) Select(colInfo *model.ColumnInfo) bool { +func (s *ColumnSelector) Select(colInfo *model.ColumnInfo) bool { colName := colInfo.Name.O - if s.columnM.MatchColumn(colName) { - return true - } - return false + return s.columnM.MatchColumn(colName) } -// ColumnSelector manages an array of selectors, the first selector match the given +// ColumnSelectors manages an array of selectors, the first selector match the given // event is used to select out columns. -type ColumnSelector struct { - selectors []*Selector +type ColumnSelectors struct { + selectors []*ColumnSelector } -// New return a column selector -func New(cfg *config.ReplicaConfig) (*ColumnSelector, error) { - selectors := make([]*Selector, 0, len(cfg.Sink.ColumnSelectors)) +// New return a column selectors +func New(cfg *config.ReplicaConfig) (*ColumnSelectors, error) { + selectors := make([]*ColumnSelector, 0, len(cfg.Sink.ColumnSelectors)) for _, r := range cfg.Sink.ColumnSelectors { - selector, err := newSelector(r, cfg.CaseSensitive) + selector, err := newColumnSelector(r, cfg.CaseSensitive) if err != nil { return nil, err } selectors = append(selectors, selector) } - return &ColumnSelector{ + return &ColumnSelectors{ selectors: selectors, }, nil } -func (c *ColumnSelector) GetSelector(schema, table string) *Selector { +func (c *ColumnSelectors) GetSelector(schema, table string) Selector { for _, s := range c.selectors { - if s.Match(schema, table) { + if s.match(schema, table) { return s } } - return nil -} - -// VerifyTables return the error if any given table cannot satisfy the column selector constraints. -// 1. if the column is filter out, it must not be a part of handle key or the unique key. -// 2. if the filtered out column is used in the column dispatcher, return error. -func (c *ColumnSelector) VerifyTables( - infos []*TableInfo, eventRouter *dispatcher.EventRouter, -) error { - if len(c.selectors) == 0 { - return nil - } - - for _, table := range infos { - for _, s := range c.selectors { - if !s.Match(table.TableName.Schema, table.TableName.Table) { - continue - } - - retainedColumns := make(map[string]struct{}) - for columnID := range table.ColumnsFlag { - columnInfo, ok := table.GetColumnInfo(columnID) - if !ok { - return errors.ErrColumnSelectorFailed.GenWithStack( - "column not found when verify the table for the column selector, table: %v, column: %s", - table.TableName, columnInfo.Name) - } - columnName := columnInfo.Name.O - if s.columnM.MatchColumn(columnName) { - retainedColumns[columnName] = struct{}{} - continue - } - - partitionDispatcher := eventRouter.GetPartitionDispatcher(table.TableName.Schema, table.TableName.Table) - switch v := partitionDispatcher.(type) { - case *partition.ColumnsDispatcher: - for _, col := range v.Columns { - if col == columnInfo.Name.O { - return errors.ErrColumnSelectorFailed.GenWithStack( - "the filtered out column is used in the column dispatcher, "+ - "table: %v, column: %s", table.TableName, columnInfo.Name) - } - } - default: - } - } - - if !verifyIndices(table, retainedColumns) { - return errors.ErrColumnSelectorFailed.GenWithStack( - "no primary key columns or unique key columns obtained after filter out, table: %+v", table.TableName) - } - } - } - return nil -} - -// verifyIndices return true if the primary key retained, -// else at least there are one unique key columns in the retained columns. -func verifyIndices(table *TableInfo, retainedColumns map[string]struct{}) bool { - primaryKeyColumns := table.GetPrimaryKeyColumnNames() - - retained := true - for _, name := range primaryKeyColumns { - if _, ok := retainedColumns[name]; !ok { - retained = false - break - } - } - // primary key columns are retained, return true. - if retained { - return true - } - - // at least one unique key columns are retained, return true. - for _, index := range table.Indices { - if !index.Unique { - continue - } - - retained = true - for _, col := range index.Columns { - if _, ok := retainedColumns[col.Name.O]; !ok { - retained = false - break - } - } - if retained { - return true - } - } - return false -} - -// VerifyColumn return true if the given `schema.table` column is matched. -func (c *ColumnSelector) VerifyColumn(schema, table, column string) bool { - for _, s := range c.selectors { - if !s.Match(schema, table) { - continue - } - return s.columnM.MatchColumn(column) - } - return true + return NewDefaultColumnSelector() } diff --git a/pkg/common/column_selector_test.go b/pkg/common/column_selector_test.go new file mode 100644 index 000000000..d856a435f --- /dev/null +++ b/pkg/common/column_selector_test.go @@ -0,0 +1,169 @@ +package common + +import ( + "testing" + + "github.com/pingcap/tiflow/pkg/config" + "github.com/stretchr/testify/require" +) + +func TestNewColumnSelector(t *testing.T) { + // the column selector is not set + replicaConfig := config.GetDefaultReplicaConfig() + selectors, err := New(replicaConfig) + require.NoError(t, err) + require.NotNil(t, selectors) + require.Len(t, selectors.selectors, 0) + + replicaConfig.Sink.ColumnSelectors = []*config.ColumnSelector{ + { + Matcher: []string{"test.*"}, + Columns: []string{"a", "b"}, + }, + { + Matcher: []string{"test1.*"}, + Columns: []string{"*", "!a"}, + }, + { + Matcher: []string{"test2.*"}, + Columns: []string{"co*", "!col2"}, + }, + { + Matcher: []string{"test3.*"}, + Columns: []string{"co?1"}, + }, + } + selectors, err = New(replicaConfig) + require.NoError(t, err) + require.Len(t, selectors.selectors, 4) +} + +func TestColumnSelectorGetSelector(t *testing.T) { + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Sink.ColumnSelectors = []*config.ColumnSelector{ + { + Matcher: []string{"test.*"}, + Columns: []string{"a", "b"}, + }, + { + Matcher: []string{"test1.*"}, + Columns: []string{"*", "!a"}, + }, + { + Matcher: []string{"test2.*"}, + Columns: []string{"co*", "!col2"}, + }, + { + Matcher: []string{"test3.*"}, + Columns: []string{"co?1"}, + }, + } + selectors, err := New(replicaConfig) + require.NoError(t, err) + + { + selector := selectors.GetSelector("test", "t1") + tableInfo1 := BuildTableInfo("test", "t1", []*Column{ + { + Name: "a", + }, + { + Name: "b", + }, + { + Name: "c", + }, + }, nil) + for _, col := range tableInfo1.Columns { + if col.Name.O != "c" { + require.True(t, selector.Select(col)) + } else { + require.False(t, selector.Select(col)) + } + } + } + + { + selector := selectors.GetSelector("test1", "aaa") + tableInfo1 := BuildTableInfo("test1", "aaa", []*Column{ + { + Name: "a", + }, + { + Name: "b", + }, + { + Name: "c", + }, + }, nil) + for _, col := range tableInfo1.Columns { + if col.Name.O != "a" { + require.True(t, selector.Select(col)) + } else { + require.False(t, selector.Select(col)) + } + } + } + + { + selector := selectors.GetSelector("test2", "t2") + tableInfo1 := BuildTableInfo("test2", "t2", []*Column{ + { + Name: "a", + }, + { + Name: "col2", + }, + { + Name: "col1", + }, + }, nil) + for _, col := range tableInfo1.Columns { + if col.Name.O == "col1" { + require.True(t, selector.Select(col)) + } else { + require.False(t, selector.Select(col)) + } + } + } + + { + selector := selectors.GetSelector("test3", "t3") + tableInfo1 := BuildTableInfo("test3", "t3", []*Column{ + { + Name: "a", + }, + { + Name: "col2", + }, + { + Name: "col1", + }, + }, nil) + for _, col := range tableInfo1.Columns { + if col.Name.O == "col1" { + require.True(t, selector.Select(col)) + } else { + require.False(t, selector.Select(col)) + } + } + } + + { + selector := selectors.GetSelector("test4", "t4") + tableInfo1 := BuildTableInfo("test4", "t4", []*Column{ + { + Name: "a", + }, + { + Name: "col2", + }, + { + Name: "col1", + }, + }, nil) + for _, col := range tableInfo1.Columns { + require.True(t, selector.Select(col)) + } + } +} diff --git a/pkg/common/row_change.go b/pkg/common/row_change.go index 75ac5e6d5..f4760f82a 100644 --- a/pkg/common/row_change.go +++ b/pkg/common/row_change.go @@ -265,7 +265,7 @@ type RowEvent struct { TableInfo *TableInfo CommitTs uint64 Event RowDelta - ColumnSelector *Selector + ColumnSelector Selector Callback func() } diff --git a/pkg/mounter/util.go b/pkg/mounter/util.go index eed2420a7..10442441d 100644 --- a/pkg/mounter/util.go +++ b/pkg/mounter/util.go @@ -84,6 +84,12 @@ func (s *EventTestHelper) ApplyJob(job *timodel.Job) { job.BinlogInfo.TableInfo) } +func (s *EventTestHelper) GetTableInfo(job *timodel.Job) *common.TableInfo { + key := toTableInfosKey(job.SchemaName, job.TableName) + log.Info("apply job", zap.String("jobKey", key), zap.Any("job", job)) + return s.tableInfos[key] +} + // DDL2Job executes the DDL stmt and returns the DDL job func (s *EventTestHelper) DDL2Job(ddl string) *timodel.Job { s.tk.MustExec(ddl) diff --git a/pkg/sink/codec/open/codec.go b/pkg/sink/codec/open/codec.go index 70904dedc..d83a61a02 100644 --- a/pkg/sink/codec/open/codec.go +++ b/pkg/sink/codec/open/codec.go @@ -18,7 +18,7 @@ import ( ticommon "github.com/pingcap/tiflow/pkg/sink/codec/common" ) -func encodeRowChangedEvent(e *common.RowEvent, config *ticommon.Config, largeMessageOnlyHandleKeyColumns bool, claimCheckLocationName string) ([]byte, []byte, int, error) { +func encodeRowChangeEventWithoutCompress(e *common.RowEvent, config *ticommon.Config, largeMessageOnlyHandleKeyColumns bool, claimCheckLocationName string) ([]byte, []byte, error) { keyBuf := &bytes.Buffer{} valueBuf := &bytes.Buffer{} keyWriter := util.BorrowJSONWriter(keyBuf) @@ -38,53 +38,71 @@ func encodeRowChangedEvent(e *common.RowEvent, config *ticommon.Config, largeMes var err error if e.IsDelete() { onlyHandleKeyColumns := config.DeleteOnlyHandleKeyColumns || largeMessageOnlyHandleKeyColumns - - valueWriter.WriteArrayField("d", func() { - err = writeColumnFieldValues(valueWriter, e.GetPreRows(), e.TableInfo, e.ColumnSelector, onlyHandleKeyColumns) + valueWriter.WriteObject(func() { + valueWriter.WriteObjectField("d", func() { + err = writeColumnFieldValues(valueWriter, e.GetPreRows(), e.TableInfo, e.ColumnSelector, onlyHandleKeyColumns) + }) }) if err != nil { - return nil, nil, 0, err + return nil, nil, err } } else if e.IsInsert() { - valueWriter.WriteArrayField("u", func() { - err = writeColumnFieldValues(valueWriter, e.GetRows(), e.TableInfo, e.ColumnSelector, largeMessageOnlyHandleKeyColumns) + valueWriter.WriteObject(func() { + valueWriter.WriteObjectField("u", func() { + err = writeColumnFieldValues(valueWriter, e.GetRows(), e.TableInfo, e.ColumnSelector, largeMessageOnlyHandleKeyColumns) + }) }) if err != nil { - return nil, nil, 0, err + return nil, nil, err } } else if e.IsUpdate() { - valueWriter.WriteArrayField("u", func() { - err = writeColumnFieldValues(valueWriter, e.GetRows(), e.TableInfo, e.ColumnSelector, largeMessageOnlyHandleKeyColumns) - }) - if err != nil { - return nil, nil, 0, err - } - valueWriter.WriteArrayField("p", func() { - err = writeColumnFieldValues(valueWriter, e.GetPreRows(), e.TableInfo, e.ColumnSelector, largeMessageOnlyHandleKeyColumns) + valueWriter.WriteObject(func() { + valueWriter.WriteObjectField("u", func() { + err = writeColumnFieldValues(valueWriter, e.GetRows(), e.TableInfo, e.ColumnSelector, largeMessageOnlyHandleKeyColumns) + }) + if err != nil { + return + } + if !config.OnlyOutputUpdatedColumns { + valueWriter.WriteObjectField("p", func() { + err = writeColumnFieldValues(valueWriter, e.GetPreRows(), e.TableInfo, e.ColumnSelector, largeMessageOnlyHandleKeyColumns) + }) + } else { + valueWriter.WriteObjectField("p", func() { + writeUpdatedColumnFieldValues(valueWriter, e.GetPreRows(), e.GetRows(), e.TableInfo, e.ColumnSelector, largeMessageOnlyHandleKeyColumns) + }) + } }) + if err != nil { - return nil, nil, 0, err + return nil, nil, err } } util.ReturnJSONWriter(keyWriter) util.ReturnJSONWriter(valueWriter) - value, err := ticommon.Compress( - config.ChangefeedID, config.LargeMessageHandle.LargeMessageHandleCompression, valueBuf.Bytes(), - ) + return keyBuf.Bytes(), valueBuf.Bytes(), nil +} +func encodeRowChangedEvent(e *common.RowEvent, config *ticommon.Config, largeMessageOnlyHandleKeyColumns bool, claimCheckLocationName string) ([]byte, []byte, int, error) { + key, value, err := encodeRowChangeEventWithoutCompress(e, config, largeMessageOnlyHandleKeyColumns, claimCheckLocationName) if err != nil { return nil, nil, 0, err } - key := keyBuf.Bytes() + valueCompressed, err := ticommon.Compress( + config.ChangefeedID, config.LargeMessageHandle.LargeMessageHandleCompression, value, + ) + if err != nil { + return nil, nil, 0, err + } // for single message that is longer than max-message-bytes // 16 is the length of `keyLenByte` and `valueLenByte`, 8 is the length of `versionHead` - length := len(key) + len(value) + ticommon.MaxRecordOverhead + 16 + 8 + length := len(key) + len(valueCompressed) + ticommon.MaxRecordOverhead + 16 + 8 - return key, value, length, nil + return key, valueCompressed, length, nil } func encodeDDLEvent(e *common.DDLEvent, config *ticommon.Config) ([]byte, []byte, error) { @@ -174,8 +192,10 @@ func writeColumnFieldValue(writer *util.JSONWriter, col *timodel.ColumnInfo, row flag := *tableInfo.ColumnsFlag[col.ID] whereHandle := flag.IsHandleKey() - writer.WriteStringField("t", string(colType)) // todo:please check performance - writer.WriteBoolField("h", whereHandle) + writer.WriteIntField("t", int(colType)) // todo:please check performance + if whereHandle { + writer.WriteBoolField("h", whereHandle) + } writer.WriteUint64Field("f", uint64(flag)) // TODO:deal with nil @@ -188,53 +208,59 @@ func writeColumnFieldValue(writer *util.JSONWriter, col *timodel.ColumnInfo, row if err != nil { return nil } - writer.WriteUint64Field(col.Name.O, value) + writer.WriteUint64Field("v", value) case mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob: value := row.GetBytes(idx) if value == nil { value = common.EmptyBytes } - writer.WriteBase64StringField(col.Name.O, value) + writer.WriteBase64StringField("v", value) case mysql.TypeVarchar, mysql.TypeString, mysql.TypeVarString: value := row.GetBytes(idx) if value == nil { value = common.EmptyBytes } - writer.WriteStringField(col.Name.O, string(hack.String(value))) + writer.WriteStringField("v", string(hack.String(value))) case mysql.TypeEnum, mysql.TypeSet: value := row.GetEnum(idx).Value - writer.WriteUint64Field(col.Name.O, value) + writer.WriteUint64Field("v", value) case mysql.TypeNewDecimal: d := row.GetMyDecimal(idx) value := d.String() - writer.WriteStringField(col.Name.O, value) + writer.WriteStringField("v", value) case mysql.TypeFloat: value := row.GetFloat32(idx) - writer.WriteFloat32Field(col.Name.O, value) + writer.WriteFloat32Field("v", value) case mysql.TypeDouble: value := row.GetFloat64(idx) - writer.WriteFloat64Field(col.Name.O, value) + writer.WriteFloat64Field("v", value) case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeNewDate, mysql.TypeTimestamp: value := row.GetTime(idx).String() - writer.WriteStringField(col.Name.O, value) + writer.WriteStringField("v", value) case mysql.TypeDuration: value := row.GetDuration(idx, 0).String() - writer.WriteStringField(col.Name.O, value) + writer.WriteStringField("v", value) case mysql.TypeJSON: value := row.GetJSON(idx).String() - writer.WriteStringField(col.Name.O, value) + writer.WriteStringField("v", value) default: d := row.GetDatum(idx, &col.FieldType) // NOTICE: GetValue() may return some types that go sql not support, which will cause sink DML fail // Make specified convert upper if you need // Go sql support type ref to: https://github.com/golang/go/blob/go1.17.4/src/database/sql/driver/types.go#L236 value := d.GetValue() - writer.WriteAnyField(col.Name.O, value) + writer.WriteAnyField("v", value) } return nil } -func writeColumnFieldValues(jWriter *util.JSONWriter, row *chunk.Row, tableInfo *common.TableInfo, selector *common.Selector, onlyHandleKeyColumns bool) error { +func writeColumnFieldValues( + jWriter *util.JSONWriter, + row *chunk.Row, + tableInfo *common.TableInfo, + selector common.Selector, + onlyHandleKeyColumns bool, +) error { flag := false // flag to check if any column is written colInfo := tableInfo.Columns @@ -256,3 +282,142 @@ func writeColumnFieldValues(jWriter *util.JSONWriter, row *chunk.Row, tableInfo } return nil } + +func writeUpdatedColumnFieldValues( + jWriter *util.JSONWriter, + preRow *chunk.Row, + row *chunk.Row, + tableInfo *common.TableInfo, + selector common.Selector, + onlyHandleKeyColumns bool, +) { + // we don't need check here whether after column selector there still exists handle key column + // because writeUpdatedColumnFieldValues only can be called after successfully dealing with one row event + colInfo := tableInfo.Columns + + for idx, col := range colInfo { + if selector.Select(col) { + if onlyHandleKeyColumns && !tableInfo.ColumnsFlag[col.ID].IsHandleKey() { + continue + } + writeColumnFieldValueIfUpdated(jWriter, col, preRow, row, idx, tableInfo) + } + } +} + +func writeColumnFieldValueIfUpdated( + writer *util.JSONWriter, + col *timodel.ColumnInfo, + preRow *chunk.Row, + row *chunk.Row, + idx int, + tableInfo *common.TableInfo, +) error { + + colType := col.GetType() + flag := *tableInfo.ColumnsFlag[col.ID] + whereHandle := flag.IsHandleKey() + + writeFunc := func(writeColumnValue func()) { + writer.WriteObjectField(col.Name.O, func() { + writer.WriteStringField("t", string(colType)) // todo:please check performance + if whereHandle { + writer.WriteBoolField("h", whereHandle) + } + writer.WriteUint64Field("f", uint64(flag)) + writeColumnValue() + }) + } + + switch col.GetType() { + case mysql.TypeBit: + rowDatum := row.GetDatum(idx, &col.FieldType) + rowDatumPoint := &rowDatum + // Encode bits as integers to avoid pingcap/tidb#10988 (which also affects MySQL itself) + rowValue, _ := rowDatumPoint.GetBinaryLiteral().ToInt(types.DefaultStmtNoWarningContext) + + preRowDatum := row.GetDatum(idx, &col.FieldType) + preRowDatumPoint := &preRowDatum + // Encode bits as integers to avoid pingcap/tidb#10988 (which also affects MySQL itself) + preRowValue, _ := preRowDatumPoint.GetBinaryLiteral().ToInt(types.DefaultStmtNoWarningContext) + // if err != nil { + // return false, err + // } + + if rowValue != preRowValue { + writeFunc(func() { writer.WriteUint64Field("v", preRowValue) }) + } + case mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob: + rowValue := row.GetBytes(idx) + preRowValue := preRow.GetBytes(idx) + if !bytes.Equal(rowValue, preRowValue) { + writeFunc(func() { writer.WriteBase64StringField("v", preRowValue) }) + } + case mysql.TypeVarchar, mysql.TypeString, mysql.TypeVarString: + rowValue := row.GetBytes(idx) + preRowValue := preRow.GetBytes(idx) + if !bytes.Equal(rowValue, preRowValue) { + if preRowValue == nil { + preRowValue = common.EmptyBytes + } + writeFunc(func() { writer.WriteStringField("v", string(hack.String(preRowValue))) }) + } + case mysql.TypeEnum, mysql.TypeSet: + rowValue := row.GetEnum(idx).Value + preRowValue := preRow.GetEnum(idx).Value + if rowValue != preRowValue { + writeFunc(func() { writer.WriteUint64Field("v", preRowValue) }) + } + case mysql.TypeNewDecimal: + rowValue := row.GetMyDecimal(idx) + preRowValue := preRow.GetMyDecimal(idx) + if rowValue.Compare(preRowValue) != 0 { + writeFunc(func() { writer.WriteStringField("v", preRowValue.String()) }) + } + case mysql.TypeFloat: + rowValue := row.GetFloat32(idx) + preRowValue := preRow.GetFloat32(idx) + if rowValue != preRowValue { + writeFunc(func() { writer.WriteFloat32Field("v", preRowValue) }) + } + case mysql.TypeDouble: + rowvalue := row.GetFloat64(idx) + preRowValue := preRow.GetFloat64(idx) + if rowvalue != preRowValue { + writeFunc(func() { writer.WriteFloat64Field("v", preRowValue) }) + } + case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeNewDate, mysql.TypeTimestamp: + rowValue := row.GetTime(idx).String() + preRowValue := preRow.GetTime(idx).String() + if rowValue != preRowValue { + writeFunc(func() { writer.WriteStringField("v", preRowValue) }) + } + case mysql.TypeDuration: + rowValue := row.GetDuration(idx, 0) + preRowValue := preRow.GetDuration(idx, 0) + if rowValue != preRowValue { + writeFunc(func() { writer.WriteStringField("v", preRowValue.String()) }) + } + case mysql.TypeJSON: + rowValue := row.GetJSON(idx).String() + preRowValue := preRow.GetJSON(idx).String() + if rowValue != preRowValue { + writeFunc(func() { writer.WriteStringField("v", preRowValue) }) + } + default: + rowDatum := row.GetDatum(idx, &col.FieldType) + // NOTICE: GetValue() may return some types that go sql not support, which will cause sink DML fail + // Make specified convert upper if you need + // Go sql support type ref to: https://github.com/golang/go/blob/go1.17.4/src/database/sql/driver/types.go#L236 + rowValue := rowDatum.GetValue() + + preRowDatum := preRow.GetDatum(idx, &col.FieldType) + preRowValue := preRowDatum.GetValue() + + if rowValue != preRowValue { + writeFunc(func() { writer.WriteAnyField("v", preRowValue) }) + } + } + return nil + +} diff --git a/pkg/sink/codec/open/codec_test.go b/pkg/sink/codec/open/codec_test.go new file mode 100644 index 000000000..0604c3d15 --- /dev/null +++ b/pkg/sink/codec/open/codec_test.go @@ -0,0 +1,41 @@ +package open + +import ( + "testing" + + "github.com/flowbehappy/tigate/pkg/common" + "github.com/flowbehappy/tigate/pkg/mounter" + "github.com/pingcap/tiflow/pkg/config" + ticommon "github.com/pingcap/tiflow/pkg/sink/codec/common" + "github.com/stretchr/testify/require" +) + +// test 每个 type 的解码,test column selector, test callback, handle only + +// Test Column Type: TinyInt, Tinyint(null), Bool, Bool(null), SmallInt, SmallInt(null) +func TestBasicType(t *testing.T) { + helper := mounter.NewEventTestHelper(t) + defer helper.Close() + + helper.Tk().MustExec("use test") + + job := helper.DDL2Job(`create table test.t(a tinyint primary key, b tinyint, c bool, d bool, e smallint, f smallint)`) + dmlEvent := helper.DML2Event("test", "t", `insert into test.t(a,c, e) values (1, true, -1)`) + require.NotNil(t, dmlEvent) + row, ok := dmlEvent.GetNextRow() + require.True(t, ok) + tableInfo := helper.GetTableInfo(job) + + rowEvent := &common.RowEvent{ + TableInfo: tableInfo, + CommitTs: 1, + Event: row, + ColumnSelector: common.NewDefaultColumnSelector(), + Callback: func() {}} + + protocolConfig := ticommon.NewConfig(config.ProtocolOpen) + key, value, err := encodeRowChangeEventWithoutCompress(rowEvent, protocolConfig, false, "") + require.NoError(t, err) + require.Equal(t, `{"ts":1,"scm":"test","tbl":"t","t":1}`, string(key)) + require.Equal(t, `{"u":{"a":{"t":1,"h":true,"f":11,"v":1},"b":{"t":1,"f":65,"v":null},"c":{"t":1,"f":65,"v":1},"d":{"t":1,"f":65,"v":null},"e":{"t":2,"f":65,"v":-1},"f":{"t":2,"f":65,"v":null}}}`, string(value)) +} From 9cfdd14fe61d827a1c636309042a032b594c7cfa Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Sun, 8 Sep 2024 21:46:44 +0800 Subject: [PATCH 2/2] update --- pkg/sink/codec/open/codec.go | 44 +++++++++---------------------- pkg/sink/codec/open/codec_test.go | 12 +++++---- 2 files changed, 19 insertions(+), 37 deletions(-) diff --git a/pkg/sink/codec/open/codec.go b/pkg/sink/codec/open/codec.go index d83a61a02..5a90ed09b 100644 --- a/pkg/sink/codec/open/codec.go +++ b/pkg/sink/codec/open/codec.go @@ -224,22 +224,20 @@ func writeColumnFieldValue(writer *util.JSONWriter, col *timodel.ColumnInfo, row case mysql.TypeEnum, mysql.TypeSet: value := row.GetEnum(idx).Value writer.WriteUint64Field("v", value) - case mysql.TypeNewDecimal: - d := row.GetMyDecimal(idx) - value := d.String() - writer.WriteStringField("v", value) - case mysql.TypeFloat: - value := row.GetFloat32(idx) - writer.WriteFloat32Field("v", value) - case mysql.TypeDouble: - value := row.GetFloat64(idx) - writer.WriteFloat64Field("v", value) case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeNewDate, mysql.TypeTimestamp: - value := row.GetTime(idx).String() - writer.WriteStringField("v", value) + value := row.GetTime(idx) + if value.IsZero() { + writer.WriteNullField("v") + } else { + writer.WriteStringField("v", value.String()) + } case mysql.TypeDuration: - value := row.GetDuration(idx, 0).String() - writer.WriteStringField("v", value) + value := row.GetDuration(idx, 0) + if value.ToNumber().IsZero() { + writer.WriteNullField("v") + } else { + writer.WriteStringField("v", value.String()) + } case mysql.TypeJSON: value := row.GetJSON(idx).String() writer.WriteStringField("v", value) @@ -368,24 +366,6 @@ func writeColumnFieldValueIfUpdated( if rowValue != preRowValue { writeFunc(func() { writer.WriteUint64Field("v", preRowValue) }) } - case mysql.TypeNewDecimal: - rowValue := row.GetMyDecimal(idx) - preRowValue := preRow.GetMyDecimal(idx) - if rowValue.Compare(preRowValue) != 0 { - writeFunc(func() { writer.WriteStringField("v", preRowValue.String()) }) - } - case mysql.TypeFloat: - rowValue := row.GetFloat32(idx) - preRowValue := preRow.GetFloat32(idx) - if rowValue != preRowValue { - writeFunc(func() { writer.WriteFloat32Field("v", preRowValue) }) - } - case mysql.TypeDouble: - rowvalue := row.GetFloat64(idx) - preRowValue := preRow.GetFloat64(idx) - if rowvalue != preRowValue { - writeFunc(func() { writer.WriteFloat64Field("v", preRowValue) }) - } case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeNewDate, mysql.TypeTimestamp: rowValue := row.GetTime(idx).String() preRowValue := preRow.GetTime(idx).String() diff --git a/pkg/sink/codec/open/codec_test.go b/pkg/sink/codec/open/codec_test.go index 0604c3d15..394294027 100644 --- a/pkg/sink/codec/open/codec_test.go +++ b/pkg/sink/codec/open/codec_test.go @@ -11,16 +11,17 @@ import ( ) // test 每个 type 的解码,test column selector, test callback, handle only - -// Test Column Type: TinyInt, Tinyint(null), Bool, Bool(null), SmallInt, SmallInt(null) +// TODO:ColumnNull 是什么类型? +// Test Column Type: TinyInt, Tinyint(null), Bool, Bool(null), SmallInt, SmallInt(null), Int, Int(null), Float, Float(nulll), Double, Double(null), +// Timestamp, Timestamp(null), BigInt, BigInt(null), MediumInt, MediumInt(null), Date, Date(null), Time, Time(null) func TestBasicType(t *testing.T) { helper := mounter.NewEventTestHelper(t) defer helper.Close() helper.Tk().MustExec("use test") - job := helper.DDL2Job(`create table test.t(a tinyint primary key, b tinyint, c bool, d bool, e smallint, f smallint)`) - dmlEvent := helper.DML2Event("test", "t", `insert into test.t(a,c, e) values (1, true, -1)`) + job := helper.DDL2Job(`create table test.t(a tinyint primary key, b tinyint, c bool, d bool, e smallint, f smallint, g int, h int, i float, j float, k double, l double, m timestamp, n timestamp, o bigint, p bigint, q mediumint, r mediumint, s date, t date, u time, v time)`) + dmlEvent := helper.DML2Event("test", "t", `insert into test.t(a,c,e,g,i,k,m,o,q,s,u) values (1, true, -1, 123, 153.123,153.123,"1973-12-30 15:30:00",123,123,"2000-01-01","23:59:59")`) require.NotNil(t, dmlEvent) row, ok := dmlEvent.GetNextRow() require.True(t, ok) @@ -37,5 +38,6 @@ func TestBasicType(t *testing.T) { key, value, err := encodeRowChangeEventWithoutCompress(rowEvent, protocolConfig, false, "") require.NoError(t, err) require.Equal(t, `{"ts":1,"scm":"test","tbl":"t","t":1}`, string(key)) - require.Equal(t, `{"u":{"a":{"t":1,"h":true,"f":11,"v":1},"b":{"t":1,"f":65,"v":null},"c":{"t":1,"f":65,"v":1},"d":{"t":1,"f":65,"v":null},"e":{"t":2,"f":65,"v":-1},"f":{"t":2,"f":65,"v":null}}}`, string(value)) + require.Equal(t, `{"u":{"a":{"t":1,"h":true,"f":11,"v":1},"b":{"t":1,"f":65,"v":null},"c":{"t":1,"f":65,"v":1},"d":{"t":1,"f":65,"v":null},"e":{"t":2,"f":65,"v":-1},"f":{"t":2,"f":65,"v":null},"g":{"t":3,"f":65,"v":123},"h":{"t":3,"f":65,"v":null},"i":{"t":4,"f":65,"v":153.123},"j":{"t":4,"f":65,"v":null},"k":{"t":5,"f":65,"v":153.123},"l":{"t":5,"f":65,"v":null},"m":{"t":7,"f":65,"v":"1973-12-30 15:30:00"},"n":{"t":7,"f":65,"v":null},"o":{"t":8,"f":65,"v":123},"p":{"t":8,"f":65,"v":null},"q":{"t":9,"f":65,"v":123},"r":{"t":9,"f":65,"v":null},"s":{"t":10,"f":65,"v":"2000-01-01"},"t":{"t":10,"f":65,"v":null},"u":{"t":11,"f":65,"v":"23:59:59"},"v":{"t":11,"f":65,"v":null}}}`, string(value)) + }