Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

codec: add ut for open protocol #272

Merged
merged 1 commit into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion downstreamadapter/sink/kafka_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func NewKafkaSink(changefeedID model.ChangeFeedID, sinkURI *url.URL, replicaConf
return nil, errors.Trace(err)
}

columnSelector, err := common.New(replicaConfig)
columnSelector, err := common.NewColumnSelectors(replicaConfig)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/common/column_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type ColumnSelectors struct {
}

// New return a column selectors
func New(cfg *config.ReplicaConfig) (*ColumnSelectors, error) {
func NewColumnSelectors(cfg *config.ReplicaConfig) (*ColumnSelectors, error) {
selectors := make([]*ColumnSelector, 0, len(cfg.Sink.ColumnSelectors))
for _, r := range cfg.Sink.ColumnSelectors {
selector, err := newColumnSelector(r, cfg.CaseSensitive)
Expand Down
2 changes: 1 addition & 1 deletion pkg/common/column_selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestColumnSelectorGetSelector(t *testing.T) {
Columns: []string{"co?1"},
},
}
selectors, err := New(replicaConfig)
selectors, err := NewColumnSelectors(replicaConfig)
require.NoError(t, err)

{
Expand Down
73 changes: 59 additions & 14 deletions pkg/sink/codec/open/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/hack"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink/codec"
Expand Down Expand Up @@ -121,7 +120,7 @@ func encodeDDLEvent(e *common.DDLEvent, config *ticommon.Config) ([]byte, []byte

valueWriter.WriteObject(func() {
valueWriter.WriteStringField("q", e.Job.Query)
valueWriter.WriteStringField("t", string(e.Job.Type))
valueWriter.WriteIntField("t", int(e.Job.Type))
})

util.ReturnJSONWriter(keyWriter)
Expand Down Expand Up @@ -344,14 +343,13 @@ func writeColumnFieldValueIfUpdated(
idx int,
tableInfo *common.TableInfo,
) error {

colType := col.GetType()
flag := *tableInfo.ColumnsFlag[col.ID]
whereHandle := flag.IsHandleKey()

writeFunc := func(writeColumnValue func()) {
writer.WriteObjectField(col.Name.O, func() {
writer.WriteStringField("t", string(colType)) // todo:please check performance
writer.WriteIntField("t", int(colType))
if whereHandle {
writer.WriteBoolField("h", whereHandle)
}
Expand All @@ -360,6 +358,15 @@ func writeColumnFieldValueIfUpdated(
})
}

if row.IsNull(idx) && preRow.IsNull(idx) {
return nil
} else if preRow.IsNull(idx) && !row.IsNull(idx) {
writeFunc(func() { writer.WriteNullField("v") })
return nil
} else if !preRow.IsNull(idx) && row.IsNull(idx) {
return writeColumnFieldValue(writer, col, preRow, idx, tableInfo)
}

switch col.GetType() {
case mysql.TypeBit:
rowDatum := row.GetDatum(idx, &col.FieldType)
Expand All @@ -382,40 +389,78 @@ func writeColumnFieldValueIfUpdated(
rowValue := row.GetBytes(idx)
preRowValue := preRow.GetBytes(idx)
if !bytes.Equal(rowValue, preRowValue) {
writeFunc(func() { writer.WriteBase64StringField("v", preRowValue) })
if len(preRowValue) == 0 {
writeFunc(func() { writer.WriteNullField("v") })
} else {
writeFunc(func() { writer.WriteBase64StringField("v", preRowValue) })
}
}
case mysql.TypeVarchar, mysql.TypeString, mysql.TypeVarString:
rowValue := row.GetBytes(idx)
preRowValue := preRow.GetBytes(idx)
if !bytes.Equal(rowValue, preRowValue) {
if preRowValue == nil {
preRowValue = common.EmptyBytes
if len(preRowValue) == 0 {
writeFunc(func() { writer.WriteNullField("v") })
} else {
if flag.IsBinary() {
str := string(preRowValue)
str = strconv.Quote(str)
str = str[1 : len(str)-1]
writeFunc(func() { writer.WriteStringField("v", str) })
} else {
writeFunc(func() { writer.WriteStringField("v", string(preRowValue)) })
}
}
writeFunc(func() { writer.WriteStringField("v", string(hack.String(preRowValue))) })
}
case mysql.TypeEnum, mysql.TypeSet:
rowValue := row.GetEnum(idx).Value
preRowValue := preRow.GetEnum(idx).Value
if rowValue != preRowValue {
writeFunc(func() { writer.WriteUint64Field("v", preRowValue) })
if preRowValue == 0 {
writeFunc(func() { writer.WriteNullField("v") })
} else {
writeFunc(func() { writer.WriteUint64Field("v", preRowValue) })
}
}
case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeNewDate, mysql.TypeTimestamp:
rowValue := row.GetTime(idx).String()
preRowValue := preRow.GetTime(idx).String()
rowValue := row.GetTime(idx)
preRowValue := preRow.GetTime(idx)
if rowValue != preRowValue {
writeFunc(func() { writer.WriteStringField("v", preRowValue) })
if preRowValue.IsZero() {
writeFunc(func() { writer.WriteNullField("v") })
} else {
writeFunc(func() { writer.WriteStringField("v", preRowValue.String()) })
}
}
case mysql.TypeDuration:
rowValue := row.GetDuration(idx, 0)
preRowValue := preRow.GetDuration(idx, 0)
if rowValue != preRowValue {
writeFunc(func() { writer.WriteStringField("v", preRowValue.String()) })
if preRowValue.ToNumber().IsZero() {
writeFunc(func() { writer.WriteNullField("v") })
} else {
writeFunc(func() { writer.WriteStringField("v", preRowValue.String()) })
}
}
case mysql.TypeJSON:
rowValue := row.GetJSON(idx).String()
preRowValue := preRow.GetJSON(idx).String()
if rowValue != preRowValue {
writeFunc(func() { writer.WriteStringField("v", preRowValue) })
if preRow.GetJSON(idx).IsZero() {
writeFunc(func() { writer.WriteNullField("v") })
} else {
writeFunc(func() { writer.WriteStringField("v", preRowValue) })
}
}
case mysql.TypeNewDecimal:
rowValue := row.GetMyDecimal(idx)
preValue := preRow.GetMyDecimal(idx)
if rowValue.Compare(preValue) != 0 {
if preValue.IsZero() {
writeFunc(func() { writer.WriteNullField("v") })
} else {
writeFunc(func() { writer.WriteStringField("v", preValue.String()) })
}
}
default:
rowDatum := row.GetDatum(idx, &col.FieldType)
Expand Down
139 changes: 135 additions & 4 deletions pkg/sink/codec/open/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,144 @@ func TestDMLEvent(t *testing.T) {

}

func TestDDLEvent(t *testing.T) {}
func TestOnlyOutputUpdatedEvent(t *testing.T) {
helper := mounter.NewEventTestHelper(t)
defer helper.Close()

helper.Tk().MustExec("use test")

protocolConfig := ticommon.NewConfig(config.ProtocolOpen)
protocolConfig.OnlyOutputUpdatedColumns = true

{
job := helper.DDL2Job(`create table test.t(a tinyint primary key, b int, c decimal(10,2), d json, e char(10), f binary(10), g blob)`)
event := helper.DML2Event("test", "t", `insert into test.t values (1, 123, 123.12, '{"key1": "value1"}',"Alice",0x0102030405060708090A,0x4944330300000000)`)
eventNew := helper.DML2Event("test", "t", `update test.t set b = 456,c = 456.45 where a = 1`)
tableInfo := helper.GetTableInfo(job)

preRow, _ := event.GetNextRow()
row, _ := eventNew.GetNextRow()
row.PreRow = preRow.Row

updateRowEvent := &common.RowEvent{
TableInfo: tableInfo,
CommitTs: 1,
Event: row,
ColumnSelector: common.NewDefaultColumnSelector(),
Callback: func() {}}

_, value, err := encodeRowChangeEventWithoutCompress(updateRowEvent, protocolConfig, false, "")
require.NoError(t, err)

require.Equal(t, `{"u":{"a":{"t":1,"h":true,"f":11,"v":1},"b":{"t":3,"f":65,"v":456},"c":{"t":246,"f":65,"v":"456.45"},"d":{"t":245,"f":65,"v":"{\"key1\": \"value1\"}"},"e":{"t":254,"f":64,"v":"Alice"},"f":{"t":254,"f":65,"v":"\\x01\\x02\\x03\\x04\\x05\\x06\\a\\b\\t\\n"},"g":{"t":252,"f":65,"v":"SUQzAwAAAAA="}},"p":{"b":{"t":3,"f":65,"v":123},"c":{"t":246,"f":65,"v":"123.12"}}}`, string(value))

}
}

func TestHandleOnlyEvent(t *testing.T) {
helper := mounter.NewEventTestHelper(t)
defer helper.Close()

helper.Tk().MustExec("use test")

job := helper.DDL2Job(`create table test.t(a tinyint primary key, b int)`)

tableInfo := helper.GetTableInfo(job)
protocolConfig := ticommon.NewConfig(config.ProtocolOpen)

// Insert
dmlEvent := helper.DML2Event("test", "t", `insert into test.t values (1, 123)`)
require.NotNil(t, dmlEvent)
insertRow, ok := dmlEvent.GetNextRow()
require.True(t, ok)

insertRowEvent := &common.RowEvent{
TableInfo: tableInfo,
CommitTs: 1,
Event: insertRow,
ColumnSelector: common.NewDefaultColumnSelector(),
Callback: func() {}}

key, value, err := encodeRowChangeEventWithoutCompress(insertRowEvent, protocolConfig, true, "")
require.NoError(t, err)

require.Equal(t, `{"ts":1,"scm":"test","tbl":"t","t":1}`, string(key))
require.Equal(t, `{"u":{"a":{"t":1,"h":true,"f":11,"v":1}}}`, string(value))
}

func TestResolvedTsEvent(t *testing.T) {}
func TestDDLEvent(t *testing.T) {
helper := mounter.NewEventTestHelper(t)
defer helper.Close()

helper.Tk().MustExec("use test")

job := helper.DDL2Job(`create table test.t(a tinyint primary key, b int)`)

protocolConfig := ticommon.NewConfig(config.ProtocolOpen)

ddlEvent := &common.DDLEvent{
Job: job,
CommitTS: 1,
}

key, value, err := encodeDDLEvent(ddlEvent, protocolConfig)
require.NoError(t, err)

require.Equal(t, `{"ts":1,"scm":"test","tbl":"t","t":2}`, string(key)[16:])
require.Equal(t, `{"q":"create table test.t(a tinyint primary key, b int)","t":3}`, string(value)[8:]) // ?

}

func TestResolvedTsEvent(t *testing.T) {
key, value, err := encodeResolvedTs(12345678)
require.NoError(t, err)

require.Equal(t, `{"ts":12345678,"t":3}`, string(key)[16:])
require.Equal(t, 8, len(string(value)))

}

func TestEncodeWithColumnSelector(t *testing.T) {
helper := mounter.NewEventTestHelper(t)
defer helper.Close()
helper.Tk().MustExec("use test")

func TestHandleOnlyEvent(t *testing.T) {}
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Sink.ColumnSelectors = []*config.ColumnSelector{
{
Matcher: []string{"test.*"},
Columns: []string{"a*"},
},
}
selectors, err := common.NewColumnSelectors(replicaConfig)
require.NoError(t, err)
selector := selectors.GetSelector("test", "t")

job := helper.DDL2Job(`create table test.t(a tinyint primary key, b int)`)

tableInfo := helper.GetTableInfo(job)
protocolConfig := ticommon.NewConfig(config.ProtocolOpen)

dmlEvent := helper.DML2Event("test", "t", `insert into test.t values (1, 123)`)
require.NotNil(t, dmlEvent)
insertRow, ok := dmlEvent.GetNextRow()
require.True(t, ok)

func TestEncodeWithColumnSelector(t *testing.T) {}
insertRowEvent := &common.RowEvent{
TableInfo: tableInfo,
CommitTs: 1,
Event: insertRow,
ColumnSelector: selector,
Callback: func() {}}

key, value, err := encodeRowChangeEventWithoutCompress(insertRowEvent, protocolConfig, false, "")
require.NoError(t, err)

require.Equal(t, `{"ts":1,"scm":"test","tbl":"t","t":1}`, string(key))
require.Equal(t, `{"u":{"a":{"t":1,"h":true,"f":11,"v":1}}}`, string(value))

// todo: column selector 匹配后没有 handle 列报错
}

// 包括多个 message 压缩,callback 值等
func TestEncodeMessages(t *testing.T) {}
Loading