Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

codec: Add test for column selector and open codec #268

Merged
merged 2 commits into from
Sep 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion downstreamadapter/sink/kafka_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type KafkaSink struct {

protocol config.Protocol

columnSelector *common.ColumnSelector
columnSelector *common.ColumnSelectors
// eventRouter used to route events to the right topic and partition.
eventRouter *eventrouter.EventRouter
// topicManager used to manage topics.
Expand Down
156 changes: 32 additions & 124 deletions pkg/common/column_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,32 @@ package common
import (
"github.com/pingcap/tidb/pkg/parser/model"
filter "github.com/pingcap/tidb/pkg/util/table-filter"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher/partition"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/errors"
)

type Selector struct {
type Selector interface {
Select(colInfo *model.ColumnInfo) bool
}

type DefaultColumnSelector struct{}

func NewDefaultColumnSelector() *DefaultColumnSelector {
return &DefaultColumnSelector{}
}

func (d *DefaultColumnSelector) Select(colInfo *model.ColumnInfo) bool {
return true
}

type ColumnSelector struct {
tableF filter.Filter
columnM filter.ColumnFilter
}

func newSelector(
func newColumnSelector(
rule *config.ColumnSelector, caseSensitive bool,
) (*Selector, error) {
) (*ColumnSelector, error) {
tableM, err := filter.Parse(rule.Matcher)
if err != nil {
return nil, errors.WrapError(errors.ErrFilterRuleInvalid, err, rule.Matcher)
Expand All @@ -42,154 +54,50 @@ func newSelector(
return nil, errors.WrapError(errors.ErrFilterRuleInvalid, err, rule.Columns)
}

return &Selector{
return &ColumnSelector{
tableF: tableM,
columnM: columnM,
}, nil
}

// Match implements Transformer interface
func (s *Selector) Match(schema, table string) bool {
func (s *ColumnSelector) match(schema, table string) bool {
return s.tableF.MatchTable(schema, table)
}

// Select decide whether the col should be encoded or not.
func (s *Selector) Select(colInfo *model.ColumnInfo) bool {
func (s *ColumnSelector) Select(colInfo *model.ColumnInfo) bool {
colName := colInfo.Name.O
if s.columnM.MatchColumn(colName) {
return true
}
return false
return s.columnM.MatchColumn(colName)
}

// ColumnSelector manages an array of selectors, the first selector match the given
// ColumnSelectors manages an array of selectors, the first selector match the given
// event is used to select out columns.
type ColumnSelector struct {
selectors []*Selector
type ColumnSelectors struct {
selectors []*ColumnSelector
}

// New return a column selector
func New(cfg *config.ReplicaConfig) (*ColumnSelector, error) {
selectors := make([]*Selector, 0, len(cfg.Sink.ColumnSelectors))
// New return a column selectors
func New(cfg *config.ReplicaConfig) (*ColumnSelectors, error) {
selectors := make([]*ColumnSelector, 0, len(cfg.Sink.ColumnSelectors))
for _, r := range cfg.Sink.ColumnSelectors {
selector, err := newSelector(r, cfg.CaseSensitive)
selector, err := newColumnSelector(r, cfg.CaseSensitive)
if err != nil {
return nil, err
}
selectors = append(selectors, selector)
}

return &ColumnSelector{
return &ColumnSelectors{
selectors: selectors,
}, nil
}

func (c *ColumnSelector) GetSelector(schema, table string) *Selector {
func (c *ColumnSelectors) GetSelector(schema, table string) Selector {
for _, s := range c.selectors {
if s.Match(schema, table) {
if s.match(schema, table) {
return s
}
}
return nil
}

// VerifyTables return the error if any given table cannot satisfy the column selector constraints.
// 1. if the column is filter out, it must not be a part of handle key or the unique key.
// 2. if the filtered out column is used in the column dispatcher, return error.
func (c *ColumnSelector) VerifyTables(
infos []*TableInfo, eventRouter *dispatcher.EventRouter,
) error {
if len(c.selectors) == 0 {
return nil
}

for _, table := range infos {
for _, s := range c.selectors {
if !s.Match(table.TableName.Schema, table.TableName.Table) {
continue
}

retainedColumns := make(map[string]struct{})
for columnID := range table.ColumnsFlag {
columnInfo, ok := table.GetColumnInfo(columnID)
if !ok {
return errors.ErrColumnSelectorFailed.GenWithStack(
"column not found when verify the table for the column selector, table: %v, column: %s",
table.TableName, columnInfo.Name)
}
columnName := columnInfo.Name.O
if s.columnM.MatchColumn(columnName) {
retainedColumns[columnName] = struct{}{}
continue
}

partitionDispatcher := eventRouter.GetPartitionDispatcher(table.TableName.Schema, table.TableName.Table)
switch v := partitionDispatcher.(type) {
case *partition.ColumnsDispatcher:
for _, col := range v.Columns {
if col == columnInfo.Name.O {
return errors.ErrColumnSelectorFailed.GenWithStack(
"the filtered out column is used in the column dispatcher, "+
"table: %v, column: %s", table.TableName, columnInfo.Name)
}
}
default:
}
}

if !verifyIndices(table, retainedColumns) {
return errors.ErrColumnSelectorFailed.GenWithStack(
"no primary key columns or unique key columns obtained after filter out, table: %+v", table.TableName)
}
}
}
return nil
}

// verifyIndices return true if the primary key retained,
// else at least there are one unique key columns in the retained columns.
func verifyIndices(table *TableInfo, retainedColumns map[string]struct{}) bool {
primaryKeyColumns := table.GetPrimaryKeyColumnNames()

retained := true
for _, name := range primaryKeyColumns {
if _, ok := retainedColumns[name]; !ok {
retained = false
break
}
}
// primary key columns are retained, return true.
if retained {
return true
}

// at least one unique key columns are retained, return true.
for _, index := range table.Indices {
if !index.Unique {
continue
}

retained = true
for _, col := range index.Columns {
if _, ok := retainedColumns[col.Name.O]; !ok {
retained = false
break
}
}
if retained {
return true
}
}
return false
}

// VerifyColumn return true if the given `schema.table` column is matched.
func (c *ColumnSelector) VerifyColumn(schema, table, column string) bool {
for _, s := range c.selectors {
if !s.Match(schema, table) {
continue
}
return s.columnM.MatchColumn(column)
}
return true
return NewDefaultColumnSelector()
}
169 changes: 169 additions & 0 deletions pkg/common/column_selector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package common

import (
"testing"

"github.com/pingcap/tiflow/pkg/config"
"github.com/stretchr/testify/require"
)

func TestNewColumnSelector(t *testing.T) {
// the column selector is not set
replicaConfig := config.GetDefaultReplicaConfig()
selectors, err := New(replicaConfig)
require.NoError(t, err)
require.NotNil(t, selectors)
require.Len(t, selectors.selectors, 0)

replicaConfig.Sink.ColumnSelectors = []*config.ColumnSelector{
{
Matcher: []string{"test.*"},
Columns: []string{"a", "b"},
},
{
Matcher: []string{"test1.*"},
Columns: []string{"*", "!a"},
},
{
Matcher: []string{"test2.*"},
Columns: []string{"co*", "!col2"},
},
{
Matcher: []string{"test3.*"},
Columns: []string{"co?1"},
},
}
selectors, err = New(replicaConfig)
require.NoError(t, err)
require.Len(t, selectors.selectors, 4)
}

func TestColumnSelectorGetSelector(t *testing.T) {
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Sink.ColumnSelectors = []*config.ColumnSelector{
{
Matcher: []string{"test.*"},
Columns: []string{"a", "b"},
},
{
Matcher: []string{"test1.*"},
Columns: []string{"*", "!a"},
},
{
Matcher: []string{"test2.*"},
Columns: []string{"co*", "!col2"},
},
{
Matcher: []string{"test3.*"},
Columns: []string{"co?1"},
},
}
selectors, err := New(replicaConfig)
require.NoError(t, err)

{
selector := selectors.GetSelector("test", "t1")
tableInfo1 := BuildTableInfo("test", "t1", []*Column{
{
Name: "a",
},
{
Name: "b",
},
{
Name: "c",
},
}, nil)
for _, col := range tableInfo1.Columns {
if col.Name.O != "c" {
require.True(t, selector.Select(col))
} else {
require.False(t, selector.Select(col))
}
}
}

{
selector := selectors.GetSelector("test1", "aaa")
tableInfo1 := BuildTableInfo("test1", "aaa", []*Column{
{
Name: "a",
},
{
Name: "b",
},
{
Name: "c",
},
}, nil)
for _, col := range tableInfo1.Columns {
if col.Name.O != "a" {
require.True(t, selector.Select(col))
} else {
require.False(t, selector.Select(col))
}
}
}

{
selector := selectors.GetSelector("test2", "t2")
tableInfo1 := BuildTableInfo("test2", "t2", []*Column{
{
Name: "a",
},
{
Name: "col2",
},
{
Name: "col1",
},
}, nil)
for _, col := range tableInfo1.Columns {
if col.Name.O == "col1" {
require.True(t, selector.Select(col))
} else {
require.False(t, selector.Select(col))
}
}
}

{
selector := selectors.GetSelector("test3", "t3")
tableInfo1 := BuildTableInfo("test3", "t3", []*Column{
{
Name: "a",
},
{
Name: "col2",
},
{
Name: "col1",
},
}, nil)
for _, col := range tableInfo1.Columns {
if col.Name.O == "col1" {
require.True(t, selector.Select(col))
} else {
require.False(t, selector.Select(col))
}
}
}

{
selector := selectors.GetSelector("test4", "t4")
tableInfo1 := BuildTableInfo("test4", "t4", []*Column{
{
Name: "a",
},
{
Name: "col2",
},
{
Name: "col1",
},
}, nil)
for _, col := range tableInfo1.Columns {
require.True(t, selector.Select(col))
}
}
}
2 changes: 1 addition & 1 deletion pkg/common/row_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ type RowEvent struct {
TableInfo *TableInfo
CommitTs uint64
Event RowDelta
ColumnSelector *Selector
ColumnSelector Selector
Callback func()
}

Expand Down
Loading
Loading