Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
  • Loading branch information
hongyunyan committed Sep 10, 2024
1 parent d920fe9 commit 1468082
Show file tree
Hide file tree
Showing 4 changed files with 279 additions and 32 deletions.
20 changes: 4 additions & 16 deletions pkg/sink/codec/open/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
ticommon "github.com/pingcap/tiflow/pkg/sink/codec/common"
)

func encodeRowChangeEventWithoutCompress(e *common.RowEvent, config *ticommon.Config, largeMessageOnlyHandleKeyColumns bool, claimCheckLocationName string) ([]byte, []byte, error) {
func encodeRowChangedEvent(e *common.RowEvent, config *ticommon.Config, largeMessageOnlyHandleKeyColumns bool, claimCheckLocationName string) ([]byte, []byte, int, error) {
keyBuf := &bytes.Buffer{}
valueBuf := &bytes.Buffer{}
keyWriter := util.BorrowJSONWriter(keyBuf)
Expand All @@ -43,18 +43,12 @@ func encodeRowChangeEventWithoutCompress(e *common.RowEvent, config *ticommon.Co
err = writeColumnFieldValues(valueWriter, e.GetPreRows(), e.TableInfo, e.ColumnSelector, onlyHandleKeyColumns)
})
})
if err != nil {
return nil, nil, err
}
} else if e.IsInsert() {
valueWriter.WriteObject(func() {
valueWriter.WriteObjectField("u", func() {
err = writeColumnFieldValues(valueWriter, e.GetRows(), e.TableInfo, e.ColumnSelector, largeMessageOnlyHandleKeyColumns)
})
})
if err != nil {
return nil, nil, err
}
} else if e.IsUpdate() {
valueWriter.WriteObject(func() {
valueWriter.WriteObjectField("u", func() {
Expand All @@ -73,24 +67,18 @@ func encodeRowChangeEventWithoutCompress(e *common.RowEvent, config *ticommon.Co
})
}
})

if err != nil {
return nil, nil, err
}
}

util.ReturnJSONWriter(keyWriter)
util.ReturnJSONWriter(valueWriter)

return keyBuf.Bytes(), valueBuf.Bytes(), nil
}

func encodeRowChangedEvent(e *common.RowEvent, config *ticommon.Config, largeMessageOnlyHandleKeyColumns bool, claimCheckLocationName string) ([]byte, []byte, int, error) {
key, value, err := encodeRowChangeEventWithoutCompress(e, config, largeMessageOnlyHandleKeyColumns, claimCheckLocationName)
if err != nil {
return nil, nil, 0, err
}

key := keyBuf.Bytes()
value := valueBuf.Bytes()

valueCompressed, err := ticommon.Compress(
config.ChangefeedID, config.LargeMessageHandle.LargeMessageHandleCompression, value,
)
Expand Down
18 changes: 8 additions & 10 deletions pkg/sink/codec/open/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestBasicType(t *testing.T) {
Callback: func() {}}

protocolConfig := ticommon.NewConfig(config.ProtocolOpen)
key, value, err := encodeRowChangeEventWithoutCompress(rowEvent, protocolConfig, false, "")
key, value, _, err := encodeRowChangedEvent(rowEvent, protocolConfig, false, "")
require.NoError(t, err)
require.Equal(t, `{"ts":1,"scm":"test","tbl":"t","t":1}`, string(key))
require.Equal(t, `{"u":{"a":{"t":1,"h":true,"f":11,"v":1},"b":{"t":1,"f":65,"v":null},"c":{"t":1,"f":65,"v":1},"d":{"t":1,"f":65,"v":null},"e":{"t":2,"f":65,"v":-1},"f":{"t":2,"f":65,"v":null},"g":{"t":3,"f":65,"v":123},"h":{"t":3,"f":65,"v":null},"i":{"t":4,"f":65,"v":153.123},"j":{"t":4,"f":65,"v":null},"k":{"t":5,"f":65,"v":153.123},"l":{"t":5,"f":65,"v":null},"m":{"t":7,"f":65,"v":"1973-12-30 15:30:00"},"n":{"t":7,"f":65,"v":null},"o":{"t":8,"f":65,"v":123},"p":{"t":8,"f":65,"v":null},"q":{"t":9,"f":65,"v":123},"r":{"t":9,"f":65,"v":null},"s":{"t":10,"f":65,"v":"2000-01-01"},"t":{"t":10,"f":65,"v":null},"u":{"t":11,"f":65,"v":"23:59:59"},"v":{"t":11,"f":65,"v":null},"w":{"t":12,"f":65,"v":"2015-12-20 23:58:58"},"x":{"t":12,"f":65,"v":null},"y":{"t":13,"f":193,"v":1970},"z":{"t":13,"f":193,"v":null},"aa":{"t":15,"f":64,"v":"测试"},"ab":{"t":15,"f":64,"v":null},"ac":{"t":15,"f":65,"v":"\\x01\\x02\\x03\\x04\\x05\\x06\\a\\b\\t\\n"},"ad":{"t":15,"f":65,"v":null},"ae":{"t":16,"f":193,"v":81},"af":{"t":16,"f":193,"v":null},"ag":{"t":245,"f":65,"v":"{\"key1\": \"value1\"}"},"ah":{"t":245,"f":65,"v":null},"ai":{"t":246,"f":65,"v":"129012.12"},"aj":{"t":246,"f":65,"v":null},"ak":{"t":247,"f":64,"v":1},"al":{"t":247,"f":64,"v":null},"am":{"t":248,"f":64,"v":2},"an":{"t":248,"f":64,"v":null},"ao":{"t":249,"f":64,"v":"NXJXTDZLK1ZkR1Y0ZEE9PQ=="},"ap":{"t":249,"f":64,"v":null},"aq":{"t":249,"f":65,"v":"iVBORw0KGgo="},"ar":{"t":249,"f":65,"v":null},"as1":{"t":250,"f":64,"v":"NXJXTDZLK1ZkR1Y0ZEE9PQ=="},"at":{"t":250,"f":64,"v":null},"au":{"t":250,"f":65,"v":"SUQzAwAAAAA="},"av":{"t":250,"f":65,"v":null},"aw":{"t":251,"f":64,"v":"NXJXTDZLK1ZkR1Y0ZEE9PQ=="},"ax":{"t":251,"f":64,"v":null},"ay":{"t":251,"f":65,"v":"UEsDBBQAAAAIAA=="},"az":{"t":251,"f":65,"v":null},"ba":{"t":252,"f":64,"v":"NXJXTDZLK1ZkR1Y0ZEE9PQ=="},"bb":{"t":252,"f":64,"v":null},"bc":{"t":252,"f":65,"v":"JVBERi0xLjQ="},"bd":{"t":252,"f":65,"v":null},"be":{"t":254,"f":64,"v":"Alice"},"bf":{"t":254,"f":64,"v":null},"bg":{"t":254,"f":65,"v":"\\x01\\x02\\x03\\x04\\x05\\x06\\a\\b\\t\\n"},"bh":{"t":254,"f":65,"v":null}}}`, string(value))
Expand Down Expand Up @@ -70,11 +70,12 @@ func TestDMLEvent(t *testing.T) {
ColumnSelector: common.NewDefaultColumnSelector(),
Callback: func() {}}

key, value, err := encodeRowChangeEventWithoutCompress(insertRowEvent, protocolConfig, false, "")
key, value, length, err := encodeRowChangedEvent(insertRowEvent, protocolConfig, false, "")
require.NoError(t, err)

require.Equal(t, `{"ts":1,"scm":"test","tbl":"t","t":1}`, string(key))
require.Equal(t, `{"u":{"a":{"t":1,"h":true,"f":11,"v":1},"b":{"t":3,"f":65,"v":123}}}`, string(value))
require.Equal(t, len(string(key))+len(string(value))+ticommon.MaxRecordOverhead+16+8, length)

// Update
dmlEvent = helper.DML2Event("test", "t", `update test.t set b = 456 where a = 1`)
Expand All @@ -90,7 +91,7 @@ func TestDMLEvent(t *testing.T) {
ColumnSelector: common.NewDefaultColumnSelector(),
Callback: func() {}}

key, value, err = encodeRowChangeEventWithoutCompress(updateRowEvent, protocolConfig, false, "")
key, value, _, err = encodeRowChangedEvent(updateRowEvent, protocolConfig, false, "")
require.NoError(t, err)

require.Equal(t, `{"ts":2,"scm":"test","tbl":"t","t":1}`, string(key))
Expand All @@ -110,7 +111,7 @@ func TestDMLEvent(t *testing.T) {
ColumnSelector: common.NewDefaultColumnSelector(),
Callback: func() {}}

key, value, err := encodeRowChangeEventWithoutCompress(updateRowEvent, protocolConfig, false, "")
key, value, _, err := encodeRowChangedEvent(updateRowEvent, protocolConfig, false, "")
require.NoError(t, err)

require.Equal(t, `{"ts":3,"scm":"test","tbl":"t","t":1}`, string(key))
Expand Down Expand Up @@ -145,7 +146,7 @@ func TestOnlyOutputUpdatedEvent(t *testing.T) {
ColumnSelector: common.NewDefaultColumnSelector(),
Callback: func() {}}

_, value, err := encodeRowChangeEventWithoutCompress(updateRowEvent, protocolConfig, false, "")
_, value, _, err := encodeRowChangedEvent(updateRowEvent, protocolConfig, false, "")
require.NoError(t, err)

require.Equal(t, `{"u":{"a":{"t":1,"h":true,"f":11,"v":1},"b":{"t":3,"f":65,"v":456},"c":{"t":246,"f":65,"v":"456.45"},"d":{"t":245,"f":65,"v":"{\"key1\": \"value1\"}"},"e":{"t":254,"f":64,"v":"Alice"},"f":{"t":254,"f":65,"v":"\\x01\\x02\\x03\\x04\\x05\\x06\\a\\b\\t\\n"},"g":{"t":252,"f":65,"v":"SUQzAwAAAAA="}},"p":{"b":{"t":3,"f":65,"v":123},"c":{"t":246,"f":65,"v":"123.12"}}}`, string(value))
Expand Down Expand Up @@ -177,7 +178,7 @@ func TestHandleOnlyEvent(t *testing.T) {
ColumnSelector: common.NewDefaultColumnSelector(),
Callback: func() {}}

key, value, err := encodeRowChangeEventWithoutCompress(insertRowEvent, protocolConfig, true, "")
key, value, _, err := encodeRowChangedEvent(insertRowEvent, protocolConfig, true, "")
require.NoError(t, err)

require.Equal(t, `{"ts":1,"scm":"test","tbl":"t","t":1}`, string(key))
Expand Down Expand Up @@ -249,14 +250,11 @@ func TestEncodeWithColumnSelector(t *testing.T) {
ColumnSelector: selector,
Callback: func() {}}

key, value, err := encodeRowChangeEventWithoutCompress(insertRowEvent, protocolConfig, false, "")
key, value, _, err := encodeRowChangedEvent(insertRowEvent, protocolConfig, false, "")
require.NoError(t, err)

require.Equal(t, `{"ts":1,"scm":"test","tbl":"t","t":1}`, string(key))
require.Equal(t, `{"u":{"a":{"t":1,"h":true,"f":11,"v":1}}}`, string(value))

// todo: column selector 匹配后没有 handle 列报错
}

// 包括多个 message 压缩,callback 值等
func TestEncodeMessages(t *testing.T) {}
10 changes: 4 additions & 6 deletions pkg/sink/codec/open/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
// One message can contain at most MaxBatchSize events, and the total size of the message cannot exceed MaxMessageBytes.
type BatchEncoder struct {
messages []*ticommon.Message
// Record the number of events that have been batched in the latest message
eventCount int
// buff the callback of the latest message
callbackBuff []func()

Expand Down Expand Up @@ -120,7 +118,7 @@ func (d *BatchEncoder) pushMessage(key, value []byte, callback func()) {
binary.BigEndian.PutUint64(keyLenByte[:], uint64(len(key)))
binary.BigEndian.PutUint64(valueLenByte[:], uint64(len(value)))

if len(d.messages) == 0 || d.messages[len(d.messages)-1].Length()+length > d.config.MaxMessageBytes || d.eventCount >= d.config.MaxBatchSize {
if len(d.messages) == 0 || d.messages[len(d.messages)-1].Length()+length > d.config.MaxMessageBytes || d.messages[len(d.messages)-1].GetRowsCount() >= d.config.MaxBatchSize {
d.finalizeCallback()
// create a new message
versionHead := make([]byte, 8)
Expand All @@ -135,20 +133,20 @@ func (d *BatchEncoder) pushMessage(key, value []byte, callback func()) {
message.Key = append(message.Key, keyLenByte[:]...)
message.Key = append(message.Key, key...)
message.Value = append(message.Value, value...)

message.IncRowsCount()
d.callbackBuff = append(d.callbackBuff, callback)
d.messages = append(d.messages, &message)
d.eventCount = 1
return
}

// append to the latest message
latestMessage := d.messages[len(d.messages)-1]
latestMessage.Key = append(latestMessage.Key, keyLenByte[:]...)
latestMessage.Key = append(latestMessage.Key, key...)
latestMessage.Value = append(latestMessage.Value, value...)
latestMessage.Value = append(latestMessage.Value, valueLenByte[:]...)
latestMessage.Value = append(latestMessage.Value, value...)
d.callbackBuff = append(d.callbackBuff, callback)
latestMessage.IncRowsCount()

}

Expand Down
Loading

0 comments on commit 1468082

Please sign in to comment.