Skip to content

Commit

Permalink
codec: Add test for column selector and open codec (#268)
Browse files Browse the repository at this point in the history
  • Loading branch information
hongyunyan authored Sep 8, 2024
1 parent f5aa01f commit 2f15b9e
Show file tree
Hide file tree
Showing 7 changed files with 443 additions and 172 deletions.
2 changes: 1 addition & 1 deletion downstreamadapter/sink/kafka_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type KafkaSink struct {

protocol config.Protocol

columnSelector *common.ColumnSelector
columnSelector *common.ColumnSelectors
// eventRouter used to route events to the right topic and partition.
eventRouter *eventrouter.EventRouter
// topicManager used to manage topics.
Expand Down
156 changes: 32 additions & 124 deletions pkg/common/column_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,32 @@ package common
import (
"github.com/pingcap/tidb/pkg/parser/model"
filter "github.com/pingcap/tidb/pkg/util/table-filter"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher/partition"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/errors"
)

type Selector struct {
type Selector interface {
Select(colInfo *model.ColumnInfo) bool
}

type DefaultColumnSelector struct{}

func NewDefaultColumnSelector() *DefaultColumnSelector {
return &DefaultColumnSelector{}
}

func (d *DefaultColumnSelector) Select(colInfo *model.ColumnInfo) bool {
return true
}

type ColumnSelector struct {
tableF filter.Filter
columnM filter.ColumnFilter
}

func newSelector(
func newColumnSelector(
rule *config.ColumnSelector, caseSensitive bool,
) (*Selector, error) {
) (*ColumnSelector, error) {
tableM, err := filter.Parse(rule.Matcher)
if err != nil {
return nil, errors.WrapError(errors.ErrFilterRuleInvalid, err, rule.Matcher)
Expand All @@ -42,154 +54,50 @@ func newSelector(
return nil, errors.WrapError(errors.ErrFilterRuleInvalid, err, rule.Columns)
}

return &Selector{
return &ColumnSelector{
tableF: tableM,
columnM: columnM,
}, nil
}

// Match implements Transformer interface
func (s *Selector) Match(schema, table string) bool {
func (s *ColumnSelector) match(schema, table string) bool {
return s.tableF.MatchTable(schema, table)
}

// Select decide whether the col should be encoded or not.
func (s *Selector) Select(colInfo *model.ColumnInfo) bool {
func (s *ColumnSelector) Select(colInfo *model.ColumnInfo) bool {
colName := colInfo.Name.O
if s.columnM.MatchColumn(colName) {
return true
}
return false
return s.columnM.MatchColumn(colName)
}

// ColumnSelector manages an array of selectors, the first selector match the given
// ColumnSelectors manages an array of selectors, the first selector match the given
// event is used to select out columns.
type ColumnSelector struct {
selectors []*Selector
type ColumnSelectors struct {
selectors []*ColumnSelector
}

// New return a column selector
func New(cfg *config.ReplicaConfig) (*ColumnSelector, error) {
selectors := make([]*Selector, 0, len(cfg.Sink.ColumnSelectors))
// New return a column selectors
func New(cfg *config.ReplicaConfig) (*ColumnSelectors, error) {
selectors := make([]*ColumnSelector, 0, len(cfg.Sink.ColumnSelectors))
for _, r := range cfg.Sink.ColumnSelectors {
selector, err := newSelector(r, cfg.CaseSensitive)
selector, err := newColumnSelector(r, cfg.CaseSensitive)
if err != nil {
return nil, err
}
selectors = append(selectors, selector)
}

return &ColumnSelector{
return &ColumnSelectors{
selectors: selectors,
}, nil
}

func (c *ColumnSelector) GetSelector(schema, table string) *Selector {
func (c *ColumnSelectors) GetSelector(schema, table string) Selector {
for _, s := range c.selectors {
if s.Match(schema, table) {
if s.match(schema, table) {
return s
}
}
return nil
}

// VerifyTables return the error if any given table cannot satisfy the column selector constraints.
// 1. if the column is filter out, it must not be a part of handle key or the unique key.
// 2. if the filtered out column is used in the column dispatcher, return error.
func (c *ColumnSelector) VerifyTables(
infos []*TableInfo, eventRouter *dispatcher.EventRouter,
) error {
if len(c.selectors) == 0 {
return nil
}

for _, table := range infos {
for _, s := range c.selectors {
if !s.Match(table.TableName.Schema, table.TableName.Table) {
continue
}

retainedColumns := make(map[string]struct{})
for columnID := range table.ColumnsFlag {
columnInfo, ok := table.GetColumnInfo(columnID)
if !ok {
return errors.ErrColumnSelectorFailed.GenWithStack(
"column not found when verify the table for the column selector, table: %v, column: %s",
table.TableName, columnInfo.Name)
}
columnName := columnInfo.Name.O
if s.columnM.MatchColumn(columnName) {
retainedColumns[columnName] = struct{}{}
continue
}

partitionDispatcher := eventRouter.GetPartitionDispatcher(table.TableName.Schema, table.TableName.Table)
switch v := partitionDispatcher.(type) {
case *partition.ColumnsDispatcher:
for _, col := range v.Columns {
if col == columnInfo.Name.O {
return errors.ErrColumnSelectorFailed.GenWithStack(
"the filtered out column is used in the column dispatcher, "+
"table: %v, column: %s", table.TableName, columnInfo.Name)
}
}
default:
}
}

if !verifyIndices(table, retainedColumns) {
return errors.ErrColumnSelectorFailed.GenWithStack(
"no primary key columns or unique key columns obtained after filter out, table: %+v", table.TableName)
}
}
}
return nil
}

// verifyIndices return true if the primary key retained,
// else at least there are one unique key columns in the retained columns.
func verifyIndices(table *TableInfo, retainedColumns map[string]struct{}) bool {
primaryKeyColumns := table.GetPrimaryKeyColumnNames()

retained := true
for _, name := range primaryKeyColumns {
if _, ok := retainedColumns[name]; !ok {
retained = false
break
}
}
// primary key columns are retained, return true.
if retained {
return true
}

// at least one unique key columns are retained, return true.
for _, index := range table.Indices {
if !index.Unique {
continue
}

retained = true
for _, col := range index.Columns {
if _, ok := retainedColumns[col.Name.O]; !ok {
retained = false
break
}
}
if retained {
return true
}
}
return false
}

// VerifyColumn return true if the given `schema.table` column is matched.
func (c *ColumnSelector) VerifyColumn(schema, table, column string) bool {
for _, s := range c.selectors {
if !s.Match(schema, table) {
continue
}
return s.columnM.MatchColumn(column)
}
return true
return NewDefaultColumnSelector()
}
169 changes: 169 additions & 0 deletions pkg/common/column_selector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package common

import (
"testing"

"github.com/pingcap/tiflow/pkg/config"
"github.com/stretchr/testify/require"
)

func TestNewColumnSelector(t *testing.T) {
// the column selector is not set
replicaConfig := config.GetDefaultReplicaConfig()
selectors, err := New(replicaConfig)
require.NoError(t, err)
require.NotNil(t, selectors)
require.Len(t, selectors.selectors, 0)

replicaConfig.Sink.ColumnSelectors = []*config.ColumnSelector{
{
Matcher: []string{"test.*"},
Columns: []string{"a", "b"},
},
{
Matcher: []string{"test1.*"},
Columns: []string{"*", "!a"},
},
{
Matcher: []string{"test2.*"},
Columns: []string{"co*", "!col2"},
},
{
Matcher: []string{"test3.*"},
Columns: []string{"co?1"},
},
}
selectors, err = New(replicaConfig)
require.NoError(t, err)
require.Len(t, selectors.selectors, 4)
}

func TestColumnSelectorGetSelector(t *testing.T) {
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Sink.ColumnSelectors = []*config.ColumnSelector{
{
Matcher: []string{"test.*"},
Columns: []string{"a", "b"},
},
{
Matcher: []string{"test1.*"},
Columns: []string{"*", "!a"},
},
{
Matcher: []string{"test2.*"},
Columns: []string{"co*", "!col2"},
},
{
Matcher: []string{"test3.*"},
Columns: []string{"co?1"},
},
}
selectors, err := New(replicaConfig)
require.NoError(t, err)

{
selector := selectors.GetSelector("test", "t1")
tableInfo1 := BuildTableInfo("test", "t1", []*Column{
{
Name: "a",
},
{
Name: "b",
},
{
Name: "c",
},
}, nil)
for _, col := range tableInfo1.Columns {
if col.Name.O != "c" {
require.True(t, selector.Select(col))
} else {
require.False(t, selector.Select(col))
}
}
}

{
selector := selectors.GetSelector("test1", "aaa")
tableInfo1 := BuildTableInfo("test1", "aaa", []*Column{
{
Name: "a",
},
{
Name: "b",
},
{
Name: "c",
},
}, nil)
for _, col := range tableInfo1.Columns {
if col.Name.O != "a" {
require.True(t, selector.Select(col))
} else {
require.False(t, selector.Select(col))
}
}
}

{
selector := selectors.GetSelector("test2", "t2")
tableInfo1 := BuildTableInfo("test2", "t2", []*Column{
{
Name: "a",
},
{
Name: "col2",
},
{
Name: "col1",
},
}, nil)
for _, col := range tableInfo1.Columns {
if col.Name.O == "col1" {
require.True(t, selector.Select(col))
} else {
require.False(t, selector.Select(col))
}
}
}

{
selector := selectors.GetSelector("test3", "t3")
tableInfo1 := BuildTableInfo("test3", "t3", []*Column{
{
Name: "a",
},
{
Name: "col2",
},
{
Name: "col1",
},
}, nil)
for _, col := range tableInfo1.Columns {
if col.Name.O == "col1" {
require.True(t, selector.Select(col))
} else {
require.False(t, selector.Select(col))
}
}
}

{
selector := selectors.GetSelector("test4", "t4")
tableInfo1 := BuildTableInfo("test4", "t4", []*Column{
{
Name: "a",
},
{
Name: "col2",
},
{
Name: "col1",
},
}, nil)
for _, col := range tableInfo1.Columns {
require.True(t, selector.Select(col))
}
}
}
2 changes: 1 addition & 1 deletion pkg/common/row_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ type RowEvent struct {
TableInfo *TableInfo
CommitTs uint64
Event RowDelta
ColumnSelector *Selector
ColumnSelector Selector
Callback func()
}

Expand Down
Loading

0 comments on commit 2f15b9e

Please sign in to comment.