From 8e99d89aff740ee9fa28eddcdff0b1100a0c082a Mon Sep 17 00:00:00 2001 From: lidezhu <47731263+lidezhu@users.noreply.github.com> Date: Fri, 3 Jan 2025 13:59:43 +0800 Subject: [PATCH 1/4] schemastore: fix create tables query (#773) * fix test utils * fix split create tables query --- .../persist_storage_ddl_handlers.go | 76 ++---------- .../schemastore/persist_storage_test.go | 75 ++++++++++-- .../schemastore/persist_storage_test_utils.go | 43 +++++-- logservice/schemastore/utils.go | 115 ++++++++++++++++++ 4 files changed, 217 insertions(+), 92 deletions(-) create mode 100644 logservice/schemastore/utils.go diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index 644d5bf59..8d2907ed1 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -17,16 +17,12 @@ import ( "fmt" "strings" - "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/filter" "github.com/pingcap/tidb/pkg/meta/model" - "github.com/pingcap/tidb/pkg/parser" - "github.com/pingcap/tidb/pkg/parser/ast" - "github.com/pingcap/tidb/pkg/parser/format" "go.uber.org/zap" ) @@ -419,65 +415,6 @@ func getSchemaID(tableMap map[int64]*BasicTableInfo, tableID int64) int64 { return tableInfo.SchemaID } -// transform ddl query based on sql mode. -func transformDDLJobQuery(job *model.Job) (string, error) { - p := parser.New() - // We need to use the correct SQL mode to parse the DDL query. - // Otherwise, the parser may fail to parse the DDL query. - // For example, it is needed to parse the following DDL query: - // `alter table "t" add column "c" int default 1;` - // by adding `ANSI_QUOTES` to the SQL mode. - p.SetSQLMode(job.SQLMode) - stmts, _, err := p.Parse(job.Query, job.Charset, job.Collate) - if err != nil { - return "", errors.Trace(err) - } - var result string - buildQuery := func(stmt ast.StmtNode) (string, error) { - var sb strings.Builder - // translate TiDB feature to special comment - restoreFlags := format.RestoreTiDBSpecialComment - // escape the keyword - restoreFlags |= format.RestoreNameBackQuotes - // upper case keyword - restoreFlags |= format.RestoreKeyWordUppercase - // wrap string with single quote - restoreFlags |= format.RestoreStringSingleQuotes - // remove placement rule - restoreFlags |= format.SkipPlacementRuleForRestore - // force disable ttl - restoreFlags |= format.RestoreWithTTLEnableOff - if err = stmt.Restore(format.NewRestoreCtx(restoreFlags, &sb)); err != nil { - return "", errors.Trace(err) - } - return sb.String(), nil - } - if len(stmts) > 1 { - results := make([]string, 0, len(stmts)) - for _, stmt := range stmts { - query, err := buildQuery(stmt) - if err != nil { - return "", errors.Trace(err) - } - results = append(results, query) - } - result = strings.Join(results, ";") - } else { - result, err = buildQuery(stmts[0]) - if err != nil { - return "", errors.Trace(err) - } - } - - log.Info("transform ddl query to result", - zap.String("DDL", job.Query), - zap.String("charset", job.Charset), - zap.String("collate", job.Collate), - zap.String("result", result)) - - return result, nil -} - // ======= // buildPersistedDDLEventFunc start // ======= @@ -536,9 +473,7 @@ func buildPersistedDDLEventForDropTable(args buildPersistedDDLEventFuncArgs) Per event := buildPersistedDDLEventCommon(args) event.CurrentSchemaName = getSchemaName(args.databaseMap, event.CurrentSchemaID) event.CurrentTableName = getTableName(args.tableMap, event.CurrentTableID) - if event.Query != "" { - event.Query = fmt.Sprintf("DROP TABLE `%s`.`%s`", event.CurrentSchemaName, event.CurrentTableName) - } + event.Query = fmt.Sprintf("DROP TABLE `%s`.`%s`", event.CurrentSchemaName, event.CurrentTableName) return event } @@ -583,7 +518,7 @@ func buildPersistedDDLEventForRenameTable(args buildPersistedDDLEventFuncArgs) P // we can use event.PrevSchemaID(even it is wrong) to update the internal state of the cdc. // TODO: not sure whether kafka sink will use event.PrevSchemaName and event.PrevTableName // But event.Query will be emit to downstream(out of cdc), we must make it correct. - if args.job.Query != "" { + if len(args.job.InvolvingSchemaInfo) > 0 { event.Query = fmt.Sprintf("RENAME TABLE `%s`.`%s` TO `%s`.`%s`", args.job.InvolvingSchemaInfo[0].Database, args.job.InvolvingSchemaInfo[0].Table, event.CurrentSchemaName, event.CurrentTableName) @@ -1603,7 +1538,10 @@ func buildDDLEventForCreateTables(rawEvent *PersistedDDLEvent, tableFilter filte physicalTableCount += 1 } } - querys := strings.Split(rawEvent.Query, ";") + querys, err := SplitQueries(rawEvent.Query) + if err != nil { + log.Panic("split queries failed", zap.Error(err)) + } ddlEvent.NeedAddedTables = make([]commonEvent.Table, 0, physicalTableCount) addName := make([]commonEvent.SchemaTableName, 0, logicalTableCount) resultQuerys := make([]string, 0, logicalTableCount) @@ -1633,7 +1571,7 @@ func buildDDLEventForCreateTables(rawEvent *PersistedDDLEvent, tableFilter filte ddlEvent.TableNameChange = &commonEvent.TableNameChange{ AddName: addName, } - ddlEvent.Query = strings.Join(resultQuerys, ";") + ddlEvent.Query = strings.Join(resultQuerys, "") if len(ddlEvent.NeedAddedTables) == 0 { log.Fatal("should not happen") } diff --git a/logservice/schemastore/persist_storage_test.go b/logservice/schemastore/persist_storage_test.go index 66ad1279b..14a547f3d 100644 --- a/logservice/schemastore/persist_storage_test.go +++ b/logservice/schemastore/persist_storage_test.go @@ -93,11 +93,11 @@ func TestApplyDDLJobs(t *testing.T) { nil, func() []*model.Job { return []*model.Job{ - buildCreateSchemaJobForTest(100, "test", 1000), // create schema 100 - buildCreateTableJobForTest(100, 200, "t1", 1010), // create table 200 - buildCreateTableJobForTest(100, 201, "t2", 1020), // create table 201 - buildDropTableJobForTest(100, 201, 1030, "drop table t2, t100"), // drop table 201 - buildTruncateTableJobForTest(100, 200, 202, "t1", 1040), // truncate table 200 to 202 + buildCreateSchemaJobForTest(100, "test", 1000), // create schema 100 + buildCreateTableJobForTest(100, 200, "t1", 1010), // create table 200 + buildCreateTableJobForTest(100, 201, "t2", 1020), // create table 201 + buildDropTableJobForTest(100, 201, 1030), // drop table 201 + buildTruncateTableJobForTest(100, 200, 202, "t1", 1040), // truncate table 200 to 202 } }(), map[int64]*BasicTableInfo{ @@ -701,11 +701,14 @@ func TestApplyDDLJobs(t *testing.T) { }, func() []*model.Job { return []*model.Job{ - buildCreateTableJobForTest(100, 300, "t1", 1010), // create table 300 - buildRenameTableJobForTest(105, 300, "t2", 1020, "rename table t1 to test2.t2", "test", "t1"), // rename table 300 to schema 105 + buildCreateTableJobForTest(100, 300, "t1", 1010), // create table 300 + buildRenameTableJobForTest(105, 300, "t2", 1020, &model.InvolvingSchemaInfo{ + Database: "test", + Table: "t1", + }), // rename table 300 to schema 105 // rename table 300 to schema 105 with the same name again // check comments in buildPersistedDDLEventForRenameTable to see why this would happen - buildRenameTableJobForTest(105, 300, "t2", 1030, "", "", ""), + buildRenameTableJobForTest(105, 300, "t2", 1030, nil), } }(), map[int64]*BasicTableInfo{ @@ -1025,6 +1028,12 @@ func TestApplyDDLJobs(t *testing.T) { func() []*model.Job { return []*model.Job{ buildCreateTablesJobForTest(100, []int64{301, 302, 303}, []string{"t1", "t2", "t3"}, 1010), // create table 301, 302, 303 + buildCreateTablesJobWithQueryForTest( + 100, + []int64{304, 305}, + []string{"t4", "t5"}, + "CREATE TABLE t4 (COL1 VARBINARY(10) NOT NULL, PRIMARY KEY(COL1)); CREATE TABLE t5 (COL2 ENUM('ABC','IRG','KT;J'), COL3 TINYINT(50) NOT NULL, PRIMARY KEY(COL3));", + 1020), // create table 304, 305, 306 with query } }(), map[int64]*BasicTableInfo{ @@ -1040,6 +1049,14 @@ func TestApplyDDLJobs(t *testing.T) { SchemaID: 100, Name: "t3", }, + 304: { + SchemaID: 100, + Name: "t4", + }, + 305: { + SchemaID: 100, + Name: "t5", + }, }, nil, map[int64]*BasicDatabaseInfo{ @@ -1049,6 +1066,8 @@ func TestApplyDDLJobs(t *testing.T) { 301: true, 302: true, 303: true, + 304: true, + 305: true, }, }, }, @@ -1056,8 +1075,10 @@ func TestApplyDDLJobs(t *testing.T) { 301: {1010}, 302: {1010}, 303: {1010}, + 304: {1020}, + 305: {1020}, }, - []uint64{1010}, + []uint64{1010, 1020}, nil, nil, []FetchTableTriggerDDLEventsTestCase{ @@ -1103,17 +1124,49 @@ func TestApplyDDLJobs(t *testing.T) { }, }, }, + { + Type: byte(model.ActionCreateTables), + FinishedTs: 1020, + Query: "CREATE TABLE `t4` (`COL1` VARBINARY(10) NOT NULL,PRIMARY KEY(`COL1`));CREATE TABLE `t5` (`COL2` ENUM('ABC','IRG','KT;J'),`COL3` TINYINT(50) NOT NULL,PRIMARY KEY(`COL3`));", + BlockedTables: &commonEvent.InfluencedTables{ + InfluenceType: commonEvent.InfluenceTypeNormal, + TableIDs: []int64{0}, + }, + NeedAddedTables: []commonEvent.Table{ + { + SchemaID: 100, + TableID: 304, + }, + { + SchemaID: 100, + TableID: 305, + }, + }, + TableNameChange: &commonEvent.TableNameChange{ + AddName: []commonEvent.SchemaTableName{ + { + SchemaName: "test", + TableName: "t4", + }, + { + SchemaName: "test", + TableName: "t5", + }, + }, + }, + }, }, }, // filter t2 and t3 { tableFilter: buildTableFilterByNameForTest("test", "t1"), startTs: 1000, - limit: 10, + limit: 1, result: []commonEvent.DDLEvent{ { Type: byte(model.ActionCreateTables), FinishedTs: 1010, + Query: "CREATE TABLE `t1` (`a` INT PRIMARY KEY);", BlockedTables: &commonEvent.InfluencedTables{ InfluenceType: commonEvent.InfluenceTypeNormal, TableIDs: []int64{0}, @@ -1679,7 +1732,7 @@ func TestRegisterTable(t *testing.T) { }, ddlJobs: func() []*model.Job { return []*model.Job{ - buildRenameTableJobForTest(50, 99, "t2", 1000, "", "", ""), // rename table 99 to t2 + buildRenameTableJobForTest(50, 99, "t2", 1000, nil), // rename table 99 to t2 buildCreateTableJobForTest(50, 100, "t3", 1010), // create table 100 buildTruncateTableJobForTest(50, 100, 101, "t3", 1020), // truncate table 100 to 101 buildCreatePartitionTableJobForTest(50, 102, "t4", []int64{201, 202, 203}, 1030), // create partition table 102 diff --git a/logservice/schemastore/persist_storage_test_utils.go b/logservice/schemastore/persist_storage_test_utils.go index 55486f9df..80d262d20 100644 --- a/logservice/schemastore/persist_storage_test_utils.go +++ b/logservice/schemastore/persist_storage_test_utils.go @@ -142,7 +142,6 @@ func mockWriteKVSnapOnDisk(db *pebble.DB, snapTs uint64, dbInfos []mockDBInfo) { func buildTableFilterByNameForTest(schemaName, tableName string) filter.Filter { filterRule := fmt.Sprintf("%s.%s", schemaName, tableName) - log.Info("filterRule", zap.String("filterRule", filterRule)) filterConfig := &config.FilterConfig{ Rules: []string{filterRule}, } @@ -203,12 +202,31 @@ func buildCreateTablesJobForTest(schemaID int64, tableIDs []int64, tableNames [] ID: id, Name: pmodel.NewCIStr(tableNames[i]), }) - querys = append(querys, fmt.Sprintf("create table %s(a int primary key)", tableNames[i])) + querys = append(querys, fmt.Sprintf("create table %s(a int primary key);", tableNames[i])) } return &model.Job{ Type: model.ActionCreateTables, SchemaID: schemaID, - Query: strings.Join(querys, ";"), + Query: strings.Join(querys, ""), + BinlogInfo: &model.HistoryInfo{ + MultipleTableInfos: multiTableInfos, + FinishedTS: finishedTs, + }, + } +} + +func buildCreateTablesJobWithQueryForTest(schemaID int64, tableIDs []int64, tableNames []string, query string, finishedTs uint64) *model.Job { + multiTableInfos := make([]*model.TableInfo, 0, len(tableIDs)) + for i, id := range tableIDs { + multiTableInfos = append(multiTableInfos, &model.TableInfo{ + ID: id, + Name: pmodel.NewCIStr(tableNames[i]), + }) + } + return &model.Job{ + Type: model.ActionCreateTables, + SchemaID: schemaID, + Query: query, BinlogInfo: &model.HistoryInfo{ MultipleTableInfos: multiTableInfos, FinishedTS: finishedTs, @@ -246,12 +264,11 @@ func buildCreatePartitionTablesJobForTest(schemaID int64, tableIDs []int64, tabl } } -func buildRenameTableJobForTest(schemaID, tableID int64, tableName string, finishedTs uint64, query, prevSchemaName, prevTableName string) *model.Job { - return &model.Job{ +func buildRenameTableJobForTest(schemaID, tableID int64, tableName string, finishedTs uint64, prevInfo *model.InvolvingSchemaInfo) *model.Job { + job := &model.Job{ Type: model.ActionRenameTable, SchemaID: schemaID, TableID: tableID, - Query: query, BinlogInfo: &model.HistoryInfo{ TableInfo: &model.TableInfo{ ID: tableID, @@ -259,13 +276,16 @@ func buildRenameTableJobForTest(schemaID, tableID int64, tableName string, finis }, FinishedTS: finishedTs, }, - InvolvingSchemaInfo: []model.InvolvingSchemaInfo{ + } + if prevInfo != nil { + job.InvolvingSchemaInfo = []model.InvolvingSchemaInfo{ { - Database: prevSchemaName, - Table: prevTableName, + Database: prevInfo.Database, + Table: prevInfo.Table, }, - }, + } } + return job } func buildRenamePartitionTableJobForTest(schemaID, tableID int64, tableName string, partitionIDs []int64, finishedTs uint64) *model.Job { @@ -301,7 +321,7 @@ func buildCreatePartitionTableJobForTest(schemaID, tableID int64, tableName stri return buildPartitionTableRelatedJobForTest(model.ActionCreateTable, schemaID, tableID, tableName, partitionIDs, finishedTs) } -func buildDropTableJobForTest(schemaID, tableID int64, finishedTs uint64, query string) *model.Job { +func buildDropTableJobForTest(schemaID, tableID int64, finishedTs uint64) *model.Job { return &model.Job{ Type: model.ActionDropTable, SchemaID: schemaID, @@ -309,7 +329,6 @@ func buildDropTableJobForTest(schemaID, tableID int64, finishedTs uint64, query BinlogInfo: &model.HistoryInfo{ FinishedTS: finishedTs, }, - Query: query, } } diff --git a/logservice/schemastore/utils.go b/logservice/schemastore/utils.go new file mode 100644 index 000000000..154bfebce --- /dev/null +++ b/logservice/schemastore/utils.go @@ -0,0 +1,115 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package schemastore + +import ( + "strings" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/parser" + "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/parser/format" + "go.uber.org/zap" +) + +func SplitQueries(queries string) ([]string, error) { + p := parser.New() + stmts, warns, err := p.ParseSQL(queries) + for _, w := range warns { + log.Warn("parse sql warnning", zap.Error(w)) + } + if err != nil { + return nil, errors.Trace(err) + } + + var res []string + for _, stmt := range stmts { + var sb strings.Builder + err := stmt.Restore(&format.RestoreCtx{ + Flags: format.DefaultRestoreFlags, + In: &sb, + }) + // The (ast.Node).Restore function generates a SQL string representation of the AST (Abstract Syntax Tree) node. + // By default, the resulting SQL string does not include a trailing semicolon ";". + // Therefore, we explicitly append a semicolon here to ensure the SQL statement is complete. + sb.WriteByte(';') + if err != nil { + return nil, errors.Trace(err) + } + res = append(res, sb.String()) + } + + return res, nil +} + +// transform ddl query based on sql mode. +func transformDDLJobQuery(job *model.Job) (string, error) { + p := parser.New() + // We need to use the correct SQL mode to parse the DDL query. + // Otherwise, the parser may fail to parse the DDL query. + // For example, it is needed to parse the following DDL query: + // `alter table "t" add column "c" int default 1;` + // by adding `ANSI_QUOTES` to the SQL mode. + p.SetSQLMode(job.SQLMode) + stmts, _, err := p.Parse(job.Query, job.Charset, job.Collate) + if err != nil { + return "", errors.Trace(err) + } + var result string + buildQuery := func(stmt ast.StmtNode) (string, error) { + var sb strings.Builder + // translate TiDB feature to special comment + restoreFlags := format.RestoreTiDBSpecialComment + // escape the keyword + restoreFlags |= format.RestoreNameBackQuotes + // upper case keyword + restoreFlags |= format.RestoreKeyWordUppercase + // wrap string with single quote + restoreFlags |= format.RestoreStringSingleQuotes + // remove placement rule + restoreFlags |= format.SkipPlacementRuleForRestore + // force disable ttl + restoreFlags |= format.RestoreWithTTLEnableOff + if err = stmt.Restore(format.NewRestoreCtx(restoreFlags, &sb)); err != nil { + return "", errors.Trace(err) + } + return sb.String(), nil + } + if len(stmts) > 1 { + results := make([]string, 0, len(stmts)) + for _, stmt := range stmts { + query, err := buildQuery(stmt) + if err != nil { + return "", errors.Trace(err) + } + results = append(results, query) + } + result = strings.Join(results, ";") + } else { + result, err = buildQuery(stmts[0]) + if err != nil { + return "", errors.Trace(err) + } + } + + log.Info("transform ddl query to result", + zap.String("DDL", job.Query), + zap.String("charset", job.Charset), + zap.String("collate", job.Collate), + zap.String("result", result)) + + return result, nil +} From 28440a7fa8bfbd41817a1cd5d2b344ff6a63b964 Mon Sep 17 00:00:00 2001 From: lidezhu <47731263+lidezhu@users.noreply.github.com> Date: Fri, 3 Jan 2025 14:59:06 +0800 Subject: [PATCH 2/4] schemastore: small reorganize (#774) --- .../persist_storage_ddl_handlers.go | 107 +++++++++--------- 1 file changed, 55 insertions(+), 52 deletions(-) diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index 8d2907ed1..1cfec34fd 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -112,15 +112,6 @@ var allDDLHandlers = map[model.ActionType]*persistStorageDDLHandler{ extractTableInfoFunc: extractTableInfoFuncIgnore, buildDDLEventFunc: buildDDLEventForDropSchema, }, - model.ActionModifyTableCharsetAndCollate: { - buildPersistedDDLEventFunc: buildPersistedDDLEventForNormalDDLOnSingleTable, - updateDDLHistoryFunc: updateDDLHistoryForNormalDDLOnSingleTable, - updateSchemaMetadataFunc: updateSchemaMetadataIgnore, - iterateEventTablesFunc: iterateEventTablesForSingleTableDDL, - extractTableInfoFunc: extractTableInfoFuncForSingleTableDDL, - buildDDLEventFunc: buildDDLEventForNormalDDLOnSingleTable, - }, - model.ActionCreateTable: { buildPersistedDDLEventFunc: buildPersistedDDLEventForCreateTable, updateDDLHistoryFunc: updateDDLHistoryForAddDropTable, @@ -169,30 +160,6 @@ var allDDLHandlers = map[model.ActionType]*persistStorageDDLHandler{ extractTableInfoFunc: extractTableInfoFuncForSingleTableDDL, buildDDLEventFunc: buildDDLEventForNormalDDLOnSingleTable, }, - model.ActionAlterIndexVisibility: { - buildPersistedDDLEventFunc: buildPersistedDDLEventForNormalDDLOnSingleTable, - updateDDLHistoryFunc: updateDDLHistoryForNormalDDLOnSingleTable, - updateSchemaMetadataFunc: updateSchemaMetadataIgnore, - iterateEventTablesFunc: iterateEventTablesForSingleTableDDL, - extractTableInfoFunc: extractTableInfoFuncForSingleTableDDL, - buildDDLEventFunc: buildDDLEventForNormalDDLOnSingleTable, - }, - model.ActionAddPrimaryKey: { - buildPersistedDDLEventFunc: buildPersistedDDLEventForNormalDDLOnSingleTable, - updateDDLHistoryFunc: updateDDLHistoryForNormalDDLOnSingleTable, - updateSchemaMetadataFunc: updateSchemaMetadataIgnore, - iterateEventTablesFunc: iterateEventTablesForSingleTableDDL, - extractTableInfoFunc: extractTableInfoFuncForSingleTableDDL, - buildDDLEventFunc: buildDDLEventForNormalDDLOnSingleTable, - }, - model.ActionDropPrimaryKey: { - buildPersistedDDLEventFunc: buildPersistedDDLEventForNormalDDLOnSingleTable, - updateDDLHistoryFunc: updateDDLHistoryForNormalDDLOnSingleTable, - updateSchemaMetadataFunc: updateSchemaMetadataIgnore, - iterateEventTablesFunc: iterateEventTablesForSingleTableDDL, - extractTableInfoFunc: extractTableInfoFuncForSingleTableDDL, - buildDDLEventFunc: buildDDLEventForNormalDDLOnSingleTable, - }, model.ActionAddForeignKey: { buildPersistedDDLEventFunc: buildPersistedDDLEventForNormalDDLOnSingleTable, updateDDLHistoryFunc: updateDDLHistoryForNormalDDLOnSingleTable, @@ -249,15 +216,6 @@ var allDDLHandlers = map[model.ActionType]*persistStorageDDLHandler{ extractTableInfoFunc: extractTableInfoFuncForSingleTableDDL, buildDDLEventFunc: buildDDLEventForNormalDDLOnSingleTable, }, - model.ActionSetTiFlashReplica: { - buildPersistedDDLEventFunc: buildPersistedDDLEventForNormalDDLOnSingleTable, - updateDDLHistoryFunc: updateDDLHistoryForNormalDDLOnSingleTable, - updateSchemaMetadataFunc: updateSchemaMetadataIgnore, - iterateEventTablesFunc: iterateEventTablesForSingleTableDDL, - extractTableInfoFunc: extractTableInfoFuncForSingleTableDDL, - buildDDLEventFunc: buildDDLEventForNormalDDLOnSingleTableForTiDB, - }, - model.ActionShardRowID: { buildPersistedDDLEventFunc: buildPersistedDDLEventForNormalDDLOnSingleTable, updateDDLHistoryFunc: updateDDLHistoryForNormalDDLOnSingleTable, @@ -306,6 +264,14 @@ var allDDLHandlers = map[model.ActionType]*persistStorageDDLHandler{ extractTableInfoFunc: extractTableInfoFuncIgnore, buildDDLEventFunc: buildDDLEventForCreateDropView, }, + model.ActionModifyTableCharsetAndCollate: { + buildPersistedDDLEventFunc: buildPersistedDDLEventForNormalDDLOnSingleTable, + updateDDLHistoryFunc: updateDDLHistoryForNormalDDLOnSingleTable, + updateSchemaMetadataFunc: updateSchemaMetadataIgnore, + iterateEventTablesFunc: iterateEventTablesForSingleTableDDL, + extractTableInfoFunc: extractTableInfoFuncForSingleTableDDL, + buildDDLEventFunc: buildDDLEventForNormalDDLOnSingleTable, + }, model.ActionTruncateTablePartition: { buildPersistedDDLEventFunc: buildPersistedDDLEventForNormalPartitionDDL, updateDDLHistoryFunc: updateDDLHistoryForTruncatePartition, @@ -314,7 +280,7 @@ var allDDLHandlers = map[model.ActionType]*persistStorageDDLHandler{ extractTableInfoFunc: extractTableInfoFuncForTruncateAndReorganizePartition, buildDDLEventFunc: buildDDLEventForTruncateAndReorganizePartition, }, - + // TODO: model.ActionDropView model.ActionRecoverTable: { buildPersistedDDLEventFunc: buildPersistedDDLEventForCreateTable, updateDDLHistoryFunc: updateDDLHistoryForAddDropTable, @@ -323,6 +289,44 @@ var allDDLHandlers = map[model.ActionType]*persistStorageDDLHandler{ extractTableInfoFunc: extractTableInfoFuncForSingleTableDDL, buildDDLEventFunc: buildDDLEventForNewTableDDL, }, + // TODO: model.ActionModifySchemaCharsetAndCollate + // TODO: model.LockTable + // TODO: model.UnlockTable + // TODO: model.ActionRepairTable + model.ActionSetTiFlashReplica: { + buildPersistedDDLEventFunc: buildPersistedDDLEventForNormalDDLOnSingleTable, + updateDDLHistoryFunc: updateDDLHistoryForNormalDDLOnSingleTable, + updateSchemaMetadataFunc: updateSchemaMetadataIgnore, + iterateEventTablesFunc: iterateEventTablesForSingleTableDDL, + extractTableInfoFunc: extractTableInfoFuncForSingleTableDDL, + buildDDLEventFunc: buildDDLEventForNormalDDLOnSingleTableForTiDB, + }, + // TODO: model.ActionUpdateTiFlashReplicaStatus + model.ActionAddPrimaryKey: { + buildPersistedDDLEventFunc: buildPersistedDDLEventForNormalDDLOnSingleTable, + updateDDLHistoryFunc: updateDDLHistoryForNormalDDLOnSingleTable, + updateSchemaMetadataFunc: updateSchemaMetadataIgnore, + iterateEventTablesFunc: iterateEventTablesForSingleTableDDL, + extractTableInfoFunc: extractTableInfoFuncForSingleTableDDL, + buildDDLEventFunc: buildDDLEventForNormalDDLOnSingleTable, + }, + model.ActionDropPrimaryKey: { + buildPersistedDDLEventFunc: buildPersistedDDLEventForNormalDDLOnSingleTable, + updateDDLHistoryFunc: updateDDLHistoryForNormalDDLOnSingleTable, + updateSchemaMetadataFunc: updateSchemaMetadataIgnore, + iterateEventTablesFunc: iterateEventTablesForSingleTableDDL, + extractTableInfoFunc: extractTableInfoFuncForSingleTableDDL, + buildDDLEventFunc: buildDDLEventForNormalDDLOnSingleTable, + }, + + model.ActionAlterIndexVisibility: { + buildPersistedDDLEventFunc: buildPersistedDDLEventForNormalDDLOnSingleTable, + updateDDLHistoryFunc: updateDDLHistoryForNormalDDLOnSingleTable, + updateSchemaMetadataFunc: updateSchemaMetadataIgnore, + iterateEventTablesFunc: iterateEventTablesForSingleTableDDL, + extractTableInfoFunc: extractTableInfoFuncForSingleTableDDL, + buildDDLEventFunc: buildDDLEventForNormalDDLOnSingleTable, + }, model.ActionExchangeTablePartition: { buildPersistedDDLEventFunc: buildPersistedDDLEventForExchangePartition, @@ -341,6 +345,14 @@ var allDDLHandlers = map[model.ActionType]*persistStorageDDLHandler{ extractTableInfoFunc: extractTableInfoFuncForCreateTables, buildDDLEventFunc: buildDDLEventForCreateTables, }, + model.ActionMultiSchemaChange: { + buildPersistedDDLEventFunc: buildPersistedDDLEventForNormalDDLOnSingleTable, + updateDDLHistoryFunc: updateDDLHistoryForNormalDDLOnSingleTable, + updateSchemaMetadataFunc: updateSchemaMetadataIgnore, + iterateEventTablesFunc: iterateEventTablesForSingleTableDDL, + extractTableInfoFunc: extractTableInfoFuncForSingleTableDDL, + buildDDLEventFunc: buildDDLEventForNormalDDLOnSingleTable, + }, model.ActionReorganizePartition: { buildPersistedDDLEventFunc: buildPersistedDDLEventForNormalPartitionDDL, @@ -367,15 +379,6 @@ var allDDLHandlers = map[model.ActionType]*persistStorageDDLHandler{ extractTableInfoFunc: extractTableInfoFuncForSingleTableDDL, buildDDLEventFunc: buildDDLEventForNormalDDLOnSingleTableForTiDB, }, - - model.ActionMultiSchemaChange: { - buildPersistedDDLEventFunc: buildPersistedDDLEventForNormalDDLOnSingleTable, - updateDDLHistoryFunc: updateDDLHistoryForNormalDDLOnSingleTable, - updateSchemaMetadataFunc: updateSchemaMetadataIgnore, - iterateEventTablesFunc: iterateEventTablesForSingleTableDDL, - extractTableInfoFunc: extractTableInfoFuncForSingleTableDDL, - buildDDLEventFunc: buildDDLEventForNormalDDLOnSingleTable, - }, } func isPartitionTable(tableInfo *model.TableInfo) bool { From 3e73cc55a668ee72f7e33b41a35bc42664657d94 Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Fri, 3 Jan 2025 16:22:39 +0800 Subject: [PATCH 3/4] enable the kafka sink metrics (#778) --- pkg/metrics/init.go | 4 ++++ pkg/sink/kafka/factory.go | 8 ++++---- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/pkg/metrics/init.go b/pkg/metrics/init.go index b46ef5b56..db8506870 100644 --- a/pkg/metrics/init.go +++ b/pkg/metrics/init.go @@ -15,6 +15,8 @@ package metrics import ( "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/sink/codec" + "github.com/pingcap/ticdc/pkg/sink/kafka" "github.com/prometheus/client_golang/prometheus" ) @@ -34,4 +36,6 @@ func InitMetrics(registry *prometheus.Registry) { InitLogPullerMetrics(registry) common.InitCommonMetrics(registry) InitDynamicStreamMetrics(registry) + kafka.InitMetrics(registry) + codec.InitMetrics(registry) } diff --git a/pkg/sink/kafka/factory.go b/pkg/sink/kafka/factory.go index 500c52325..c53e802b8 100644 --- a/pkg/sink/kafka/factory.go +++ b/pkg/sink/kafka/factory.go @@ -23,7 +23,7 @@ import ( commonType "github.com/pingcap/ticdc/pkg/common" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/sink/codec/common" - tikafka "github.com/pingcap/tiflow/pkg/sink/kafka" + "github.com/pingcap/tiflow/pkg/sink/kafka" "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" ) @@ -31,13 +31,13 @@ import ( // Factory is used to produce all kafka components. type Factory interface { // AdminClient return a kafka cluster admin client - AdminClient(ctx context.Context) (tikafka.ClusterAdminClient, error) + AdminClient(ctx context.Context) (kafka.ClusterAdminClient, error) // SyncProducer creates a sync producer to writer message to kafka SyncProducer(ctx context.Context) (SyncProducer, error) // AsyncProducer creates an async producer to writer message to kafka - AsyncProducer(ctx context.Context, failpointCh chan error) (tikafka.AsyncProducer, error) + AsyncProducer(ctx context.Context, failpointCh chan error) (kafka.AsyncProducer, error) // MetricsCollector returns the kafka metrics collector - MetricsCollector(role util.Role, adminClient tikafka.ClusterAdminClient) tikafka.MetricsCollector + MetricsCollector(role util.Role, adminClient kafka.ClusterAdminClient) kafka.MetricsCollector } // FactoryCreator defines the type of factory creator. From 76c3a0b51cad5efdc20adc59fa897c0bf229401b Mon Sep 17 00:00:00 2001 From: dongmen <20351731+asddongmen@users.noreply.github.com> Date: Fri, 3 Jan 2025 16:42:53 +0800 Subject: [PATCH 4/4] test: fix_upload_ci_log3 (#775) Signed-off-by: dongmen <414110582@qq.com> --- .github/actions/upload-test-logs/action.yml | 34 ++++++++ .github/workflows/integration_test_mysql.yaml | 78 +++++-------------- 2 files changed, 53 insertions(+), 59 deletions(-) create mode 100644 .github/actions/upload-test-logs/action.yml diff --git a/.github/actions/upload-test-logs/action.yml b/.github/actions/upload-test-logs/action.yml new file mode 100644 index 000000000..e963f27d2 --- /dev/null +++ b/.github/actions/upload-test-logs/action.yml @@ -0,0 +1,34 @@ +name: 'Upload Test Logs' +description: 'Copy and upload test logs' + +runs: + using: "composite" + steps: + - name: Copy logs to hack permission + if: always() + shell: bash + run: | + DIR=$(sudo find /tmp/tidb_cdc_test/ -type d -name 'cdc_data*' -exec dirname {} \; | tail -n 1) + echo "Found the last dir: $DIR" + [ -z "$DIR" ] && exit 0 + CASE=$(basename $DIR) + mkdir -p ./logs/$CASE + cat $DIR/stdout.log || true + tail -n 10 $DIR/cdc.log || true + sudo cp -r -L $DIR/*.log ./logs/$CASE/ || true + sudo cp -r -L $DIR/sync_diff ./logs/$CASE/ || true + sudo chown -R runner ./logs + sudo tar -czvf ./logs.tar.gz ./logs + + - name: Upload logs + uses: actions/upload-artifact@v4 + if: always() + with: + name: ${{ inputs.log-name }} + path: | + ./logs.tar.gz + +inputs: + log-name: + description: 'Name of the log artifact' + required: true \ No newline at end of file diff --git a/.github/workflows/integration_test_mysql.yaml b/.github/workflows/integration_test_mysql.yaml index 7688ac9bf..c59061a82 100644 --- a/.github/workflows/integration_test_mysql.yaml +++ b/.github/workflows/integration_test_mysql.yaml @@ -71,27 +71,12 @@ jobs: run: | export TICDC_NEWARCH=true && make integration_test CASE=changefeed_reconstruct - - name: Copy logs to hack permission - if: ${{ always() }} - run: | - DIR=$(sudo find /tmp/tidb_cdc_test/ -type d -name 'cdc_data' -exec dirname {} \;) - [ -z "$DIR" ] && exit 0 - CASE=$(basename $DIR) - mkdir -p ./logs/$CASE - cat $DIR/stdout.log - tail -n 10 $DIR/cdc.log - sudo cp -r -L $DIR/{*.log} ./logs/$CASE/ - sudo cp -r -L $DIR/{sync_diff} ./logs/$CASE/ || true - sudo chown -R runner ./logs - sudo tar -czvf ./logs.tar.gz ./logs - # Update logs as artifact seems not stable, so we set `continue-on-error: true` here. - - name: Upload logs - uses: actions/upload-artifact@v4 - if: ${{ always() }} + - name: Upload test logs + if: always() + uses: ./.github/actions/upload-test-logs with: - name: upstream-switch-logs - path: | - ./logs.tar.gz + log-name: basic_e2e_group1 + failover_e2e_test1: runs-on: ubuntu-latest @@ -132,20 +117,12 @@ jobs: pwd && ls -l bin/ && ls -l tools/bin/ export TICDC_NEWARCH=true && make integration_test CASE=fail_over_ddl_C - - name: Copy logs to hack permission - if: ${{ always() }} - run: | - TEMP_DIR=$(mktemp -d) - mkdir -p "$TEMP_DIR" - sudo find /tmp/tidb_cdc_test -type d -name 'fail_over*' -exec sh -c 'find "{}" -type f -name "cdc*.log" -print0' \; | tar -czvf logs.tar.gz -C /tmp/tidb_cdc_test --null -T - - # Update logs as artifact seems not stable, so we set `continue-on-error: true` here. - - name: Upload logs - uses: actions/upload-artifact@v4 - if: ${{ always() }} + - name: Upload test logs + if: always() + uses: ./.github/actions/upload-test-logs with: - name: upstream-failover-logs1 - path: | - ./logs.tar.gz + log-name: failover_group1 + failover_e2e_test2: runs-on: ubuntu-latest @@ -186,20 +163,12 @@ jobs: pwd && ls -l bin/ && ls -l tools/bin/ export TICDC_NEWARCH=true && make integration_test CASE=fail_over_ddl_G - - name: Copy logs to hack permission - if: ${{ always() }} - run: | - TEMP_DIR=$(mktemp -d) - mkdir -p "$TEMP_DIR" - sudo find /tmp/tidb_cdc_test -type d -name 'fail_over*' -exec sh -c 'find "{}" -type f -name "cdc*.log" -print0' \; | tar -czvf logs.tar.gz -C /tmp/tidb_cdc_test --null -T - - # Update logs as artifact seems not stable, so we set `continue-on-error: true` here. - - name: Upload logs - uses: actions/upload-artifact@v4 - if: ${{ always() }} + - name: Upload test logs + if: always() + uses: ./.github/actions/upload-test-logs with: - name: upstream-failover-logs2 - path: | - ./logs.tar.gz + log-name: failover_group_2 + failover_e2e_test3: runs-on: ubuntu-latest @@ -225,17 +194,8 @@ jobs: pwd && ls -l bin/ && ls -l tools/bin/ export TICDC_NEWARCH=true && make integration_test CASE=fail_over_ddl_J - - name: Copy logs to hack permission - if: ${{ always() }} - run: | - TEMP_DIR=$(mktemp -d) - mkdir -p "$TEMP_DIR" - sudo find /tmp/tidb_cdc_test -type d -name 'fail_over*' -exec sh -c 'find "{}" -type f -name "cdc*.log" -print0' \; | tar -czvf logs.tar.gz -C /tmp/tidb_cdc_test --null -T - - # Update logs as artifact seems not stable, so we set `continue-on-error: true` here. - - name: Upload logs - uses: actions/upload-artifact@v4 - if: ${{ always() }} + - name: Upload test logs + if: always() + uses: ./.github/actions/upload-test-logs with: - name: upstream-failover-logs3 - path: | - ./logs.tar.gz \ No newline at end of file + log-name: failover-group3 \ No newline at end of file