From 9ddf008eafe09eeecb6c969efe0c6c18c14d5c5d Mon Sep 17 00:00:00 2001 From: bb7133 Date: Sun, 4 Aug 2024 22:30:48 -0700 Subject: [PATCH 1/5] add support for dump/import and BR --- go.mod | 1 + go.sum | 2 + r/example.result | 36 +++++++++++--- src/main.go | 124 +++++++++++++++++++++++++++++++++++++++++++++++ src/query.go | 2 + src/type.go | 2 + src/util.go | 19 ++++++++ t/example.test | 4 ++ 8 files changed, 184 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index dd3c41c..a45e217 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect + github.com/google/uuid v1.6.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect go.uber.org/atomic v1.11.0 // indirect golang.org/x/sys v0.5.0 // indirect diff --git a/go.sum b/go.sum index 77d729c..de86ac2 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/defined2014/mysql v0.0.0-20231121061906-fcfacaa39f49 h1:Q3Ri7Ycix4T+Ig7I896I6w0WuCajid2SgyierI16NSo= github.com/defined2014/mysql v0.0.0-20231121061906-fcfacaa39f49/go.mod h1:5GYlY+PrT+c8FHAJTMIsyOuHUNf62KAQuRPMGssbixo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32 h1:m5ZsBa5o/0CkzZXfXLaThzKuR85SnHHetqBCpzQ30h8= github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/r/example.result b/r/example.result index 6e5cb8d..09a8d7d 100644 --- a/r/example.result +++ b/r/example.result @@ -6,8 +6,9 @@ a b SELECT 1 FROM NON_EXISTING_TABLE; Error 1146 (42S02): Table 'example.NON_EXISTING_TABLE' doesn't exist SELECT 2 FROM NON_EXISTING_TABLE; +Error 1146 (42S02): Table 'example.NON_EXISTING_TABLE' doesn't exist SELECT 3 FROM NON_EXISTING_TABLE; -Got one of the listed errors +Error 1146 (42S02): Table 'example.NON_EXISTING_TABLE' doesn't exist SELECT 4; 4 4 @@ -20,19 +21,42 @@ SELECT 6; 1 SELECT; Error 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use line 1 column 1 near "1 SELECT;" 2 SELECT; +Error 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use line 1 column 1 near "2 SELECT;" 3 SELECT; -Got one of the listed errors +Error 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use line 1 column 1 near "3 SELECT;" explain analyze format='brief' select * from t; id estRows actRows task access object execution info operator info memory disk -TableReader 10000.00 5 root NULL time:, loops:, RU:, cop_task: {num:, max:, proc_keys:, rpc_num:, rpc_time:, copr_cache_hit_ratio:, build_task_duration:, max_distsql_concurrency:} data:TableFullScan Bytes N/A -└─TableFullScan 10000.00 5 cop[tikv] table:t tikv_task:{time:, loops:} keep order:false, stats:pseudo N/A N/A +TableReader 10000.00 5 root NULL time:, loops:, RU:, cop_task: {num:, max:, proc_keys:, tot_proc:, tot_wait:, copr_cache_hit_ratio:, build_task_duration:, max_distsql_concurrency:, rpc_info:{Cop:{num_rpc:, total_time:}} data:TableFullScan Bytes N/A +└─TableFullScan 10000.00 5 cop[tikv] table:t tikv_task:{time:, loops:, scan_detail: {total_process_keys:, total_process_keys_size:, total_keys:, get_snapshot_time:, rocksdb: {key_skipped_count:, block: {}}}, time_detail: {total_process_time:, total_wait_time:, tikv_wall_time:} keep order:false, stats:pseudo N/A N/A explain analyze select * from t; id estRows actRows task access object execution info operator info memory disk -TableReader_5 10000.00 5 root NULL time:, loops:, RU:, cop_task: {num:, max:, proc_keys:, rpc_num:, rpc_time:, copr_cache_hit_ratio:, build_task_duration:, max_distsql_concurrency:} data:TableFullScan_4 Bytes N/A -└─TableFullScan_4 10000.00 5 cop[tikv] table:t tikv_task:{time:, loops:} keep order:false, stats:pseudo N/A N/A +TableReader_5 10000.00 5 root NULL time:, loops:, RU:, cop_task: {num:, max:, proc_keys:, tot_proc:, tot_wait:, copr_cache_hit_ratio:, build_task_duration:, max_distsql_concurrency:, rpc_info:{Cop:{num_rpc:, total_time:}} data:TableFullScan_4 Bytes N/A +└─TableFullScan_4 10000.00 5 cop[tikv] table:t tikv_task:{time:, loops:, scan_detail: {total_process_keys:, total_process_keys_size:, total_keys:, get_snapshot_time:, rocksdb: {key_skipped_count:, block: {}}}, time_detail: {total_process_time:, total_wait_time:, tikv_wall_time:} keep order:false, stats:pseudo N/A N/A insert into t values (6, 6); affected rows: 1 info: +Destination Size BackupTS Queue Time Execution Time +/tmp/t_b7987183-d39f-4572-868d-75e25c8cd215 1876 451473879442653228 2024-07-29 14:56:13 2024-07-29 14:56:13 +affected rows: 0 +info: +affected rows: 0 +info: +affected rows: 0 +info: +Destination Size BackupTS Cluster TS Queue Time Execution Time +/tmp/t_b7987183-d39f-4572-868d-75e25c8cd215 1876 451473879442653228 451473880386371620 2024-07-29 14:56:17 2024-07-29 14:56:17 +affected rows: 0 +info: +affected rows: 0 +info: +affected rows: 0 +info: +affected rows: 0 +info: +Job_ID Data_Source Target_Table Table_ID Phase Status Source_File_Size Imported_Rows Result_Message Create_Time Start_Time End_Time Created_By +3 /tmp/t_6cac1a43-c66c-4af9-962f-95287fa12432/example.t.000000000.csv `example`.`td` 453 finished 30B 6 2024-07-29 14:56:17.619215 2024-07-29 14:56:18.125792 2024-07-29 14:56:19.640005 root@% +affected rows: 0 +info: DROP TABLE IF EXISTS t1; affected rows: 0 info: diff --git a/src/main.go b/src/main.go index 3da256a..17d922c 100644 --- a/src/main.go +++ b/src/main.go @@ -20,6 +20,7 @@ import ( "flag" "fmt" "os" + "os/exec" "path/filepath" "regexp" "sort" @@ -29,6 +30,7 @@ import ( "time" "github.com/defined2014/mysql" + "github.com/google/uuid" "github.com/pingcap/errors" log "github.com/sirupsen/logrus" ) @@ -47,6 +49,8 @@ var ( retryConnCount int collationDisable bool checkErr bool + pathBR string + pathDumpling string ) func init() { @@ -63,6 +67,8 @@ func init() { flag.IntVar(&retryConnCount, "retry-connection-count", 120, "The max number to retry to connect to the database.") flag.BoolVar(&checkErr, "check-error", false, "if --error ERR does not match, return error instead of just warn") flag.BoolVar(&collationDisable, "collation-disable", false, "run collation related-test with new-collation disabled") + flag.StringVar(&pathBR, "path-br", "", "Path of BR") + flag.StringVar(&pathDumpling, "path-dumpling", "", "Path of Dumpling") } const ( @@ -98,6 +104,11 @@ type ReplaceRegex struct { replace string } +type SourceAndTarget struct { + sourceTable string + targetTable string +} + type tester struct { mdb *sql.DB name string @@ -148,6 +159,12 @@ type tester struct { // replace output result through --replace_regex /\.dll/.so/ replaceRegex []*ReplaceRegex + + // backup and restore context through --backup_and_restore $BACKUP_TABLE as $RESTORE_TABLE' + backupAndRestore *SourceAndTarget + + // dump and import context through --dump_and_import $SOURCE_TABLE as $TARGET_TABLE' + dumpAndImport *SourceAndTarget } func newTester(name string) *tester { @@ -352,6 +369,58 @@ func (t *tester) addSuccess(testSuite *XUnitTestSuite, startTime *time.Time, cnt }) } +func generateBRStatements(source, target string) (string, string) { + // Generate a random UUID + uuid := uuid.NewString() + + // Create the TMP_DIR path + tmpDir := fmt.Sprintf("/tmp/%s_%s", source, uuid) + + // Generate the SQL statements + backupSQL := fmt.Sprintf("BACKUP TABLE `%s` TO '%s'", source, tmpDir) + restoreSQL := fmt.Sprintf("RESTORE TABLE `%s` FROM '%s'", source, tmpDir) + + return backupSQL, restoreSQL +} + +func (t *tester) dumpTable(source string) (string, error) { + log.Warnf("Start dumping table: %s", source) + path := "/tmp/" + source + "_" + uuid.NewString() + cmdArgs := []string{ + fmt.Sprintf("-h%s", host), + fmt.Sprintf("-P%s", port), + fmt.Sprintf("-u%s", user), + fmt.Sprintf("-T%s.%s", t.name, source), + fmt.Sprintf("-o%s", path), + "--no-header", + "--filetype", + "csv", + } + + if passwd != "" { + cmdArgs = append(cmdArgs, fmt.Sprintf("-p%s", passwd)) + } + + cmd := exec.Command(pathDumpling, cmdArgs...) + + output, err := cmd.CombinedOutput() + if err != nil { + log.Warnf("Failed executing commands: %s, output: %s)", + cmd.String(), string(output)) + return "", err + } + log.Warnf("Done executing commands: %s, output: %s)", + cmd.String(), string(output)) + return path, nil +} + +func (t *tester) importTableStmt(path, target string) string { + return fmt.Sprintf(` + IMPORT INTO %s + FROM '%s/example.t.000000000.csv' + `, target, path) +} + func (t *tester) Run() error { t.preProcess() defer t.postProcess() @@ -523,6 +592,61 @@ func (t *tester) Run() error { return errors.Annotate(err, fmt.Sprintf("Could not parse regex in --replace_regex: line: %d sql:%v", q.Line, q.Query)) } t.replaceRegex = regex + case Q_BACKUP_AND_RESTORE: + t.backupAndRestore, err = parseSourceAndTarget(q.Query) + if err != nil { + return errors.Annotate(err, fmt.Sprintf("Could not parse backup table and restore table name in --backup_and_restore, line: %d sql:%v", q.Line, q.Query)) + } + backupStmt, restoreStmt := generateBRStatements(t.backupAndRestore.sourceTable, t.backupAndRestore.targetTable) + log.WithFields(log.Fields{"stmt": backupStmt, "line": q.Line}).Warn("Backup started") + if err := t.executeStmt(backupStmt); err != nil { + return err + } + log.WithFields(log.Fields{"stmt": backupStmt, "line": q.Line}).Warn("Backup end") + tempTable := t.backupAndRestore.sourceTable + uuid.NewString() + renameStmt := fmt.Sprintf("RENAME TABLE `%s` TO `%s`", t.backupAndRestore.sourceTable, tempTable) + if err := t.executeStmt(renameStmt); err != nil { + return err + } + dupTableStmt := fmt.Sprintf("CREATE TABLE `%s` LIKE `%s`", t.backupAndRestore.sourceTable, tempTable) + if err := t.executeStmt(dupTableStmt); err != nil { + return err + } + log.WithFields(log.Fields{"stmt": restoreStmt, "line": q.Line}).Warn("Restore start") + if err := t.executeStmt(restoreStmt); err != nil { + return err + } + log.WithFields(log.Fields{"stmt": restoreStmt, "line": q.Line}).Warn("Restore end") + renameStmt = fmt.Sprintf("RENAME TABLE `%s` TO `%s`", t.backupAndRestore.sourceTable, t.backupAndRestore.targetTable) + if err := t.executeStmt(renameStmt); err != nil { + return err + } + renameStmt = fmt.Sprintf("RENAME TABLE `%s` TO `%s`", tempTable, t.backupAndRestore.sourceTable) + if err := t.executeStmt(renameStmt); err != nil { + return err + } + case Q_DUMP_AND_IMPORT: + t.dumpAndImport, err = parseSourceAndTarget(q.Query) + if err != nil { + return err + } + path, err := t.dumpTable(t.dumpAndImport.sourceTable) + if err != nil { + return err + } + + dupTableStmt := fmt.Sprintf("CREATE TABLE `%s` LIKE `%s`", t.dumpAndImport.targetTable, t.backupAndRestore.sourceTable) + if err := t.executeStmt(dupTableStmt); err != nil { + return err + } + + importStmt := t.importTableStmt(path, t.dumpAndImport.targetTable) + log.WithFields(log.Fields{"stmt": importStmt, "line": q.Line}).Warn("Import start") + if err = t.executeStmt(importStmt); err != nil { + return err + } + log.WithFields(log.Fields{"stmt": importStmt, "line": q.Line}).Warn("Restore end") + default: log.WithFields(log.Fields{"command": q.firstWord, "arguments": q.Query, "line": q.Line}).Warn("command not implemented") } diff --git a/src/query.go b/src/query.go index 6a128d8..de14658 100644 --- a/src/query.go +++ b/src/query.go @@ -124,6 +124,8 @@ const ( Q_COMMENT /* Comments, ignored. */ Q_COMMENT_WITH_COMMAND Q_EMPTY_LINE + Q_BACKUP_AND_RESTORE + Q_DUMP_AND_IMPORT ) // ParseQueries parses an array of string into an array of query object. diff --git a/src/type.go b/src/type.go index 50ea5a6..2267d65 100644 --- a/src/type.go +++ b/src/type.go @@ -114,6 +114,8 @@ var commandMap = map[string]int{ "single_query": Q_SINGLE_QUERY, "begin_concurrent": Q_BEGIN_CONCURRENT, "end_concurrent": Q_END_CONCURRENT, + "backup_and_restore": Q_BACKUP_AND_RESTORE, + "dump_and_import": Q_DUMP_AND_IMPORT, } func findType(cmdName string) int { diff --git a/src/util.go b/src/util.go index b62c64b..33b3604 100644 --- a/src/util.go +++ b/src/util.go @@ -15,6 +15,7 @@ package main import ( "database/sql" + "fmt" "regexp" "strings" "time" @@ -104,3 +105,21 @@ func ParseReplaceRegex(originalString string) ([]*ReplaceRegex, error) { } return ret, nil } + +func parseSourceAndTarget(s string) (*SourceAndTarget, error) { + s = strings.ToLower(strings.TrimSpace(s)) + + parts := strings.Split(s, "as") + if len(parts) != 2 { + return nil, errors.Errorf("Could not parse source table and target table name: %v", s) + } + + st := &SourceAndTarget{ + sourceTable: strings.TrimSpace(parts[0]), + targetTable: strings.TrimSpace(parts[1]), + } + + fmt.Printf("Parse source: %s and target: %s\n", st.sourceTable, st.targetTable) + + return st, nil +} diff --git a/t/example.test b/t/example.test index 4b6f18d..2cb4a00 100644 --- a/t/example.test +++ b/t/example.test @@ -38,6 +38,10 @@ explain analyze select * from t; --enable_info insert into t values (6, 6); +--backup_and_restore t AS tt + +--dump_and_import t AS td + DROP TABLE IF EXISTS t1; CREATE TABLE t1 (f1 INT PRIMARY KEY, f2 INT NOT NULL UNIQUE); INSERT t1 VALUES (1, 1); From 3e10b3bc279be114e8046a09f87825cf71efbfaa Mon Sep 17 00:00:00 2001 From: bb7133 Date: Tue, 8 Oct 2024 10:53:19 -0700 Subject: [PATCH 2/5] support TiCDC --- r/example.result | 31 ++--- src/main.go | 191 ++++++++++++++++++++++++++--- src/query.go | 2 + src/type.go | 4 +- src/util.go | 4 - t/br_integration.test | 11 ++ t/cdc_integration.test | 12 ++ t/dumpling_import_integration.test | 10 ++ t/example.test | 11 +- 9 files changed, 226 insertions(+), 50 deletions(-) create mode 100644 t/br_integration.test create mode 100644 t/cdc_integration.test create mode 100644 t/dumpling_import_integration.test diff --git a/r/example.result b/r/example.result index 09a8d7d..e009bb0 100644 --- a/r/example.result +++ b/r/example.result @@ -27,36 +27,14 @@ Error 1064 (42000): You have an error in your SQL syntax; check the manual that explain analyze format='brief' select * from t; id estRows actRows task access object execution info operator info memory disk TableReader 10000.00 5 root NULL time:, loops:, RU:, cop_task: {num:, max:, proc_keys:, tot_proc:, tot_wait:, copr_cache_hit_ratio:, build_task_duration:, max_distsql_concurrency:, rpc_info:{Cop:{num_rpc:, total_time:}} data:TableFullScan Bytes N/A -└─TableFullScan 10000.00 5 cop[tikv] table:t tikv_task:{time:, loops:, scan_detail: {total_process_keys:, total_process_keys_size:, total_keys:, get_snapshot_time:, rocksdb: {key_skipped_count:, block: {}}}, time_detail: {total_process_time:, total_wait_time:, tikv_wall_time:} keep order:false, stats:pseudo N/A N/A +└─TableFullScan 10000.00 5 cop[tikv] table:t tikv_task:{time:, loops:, scan_detail: {total_process_keys:, total_process_keys_size:, total_keys:, get_snapshot_time:, rocksdb: {delete_skipped_count:, key_skipped_count:, block: {}}}, time_detail: {total_process_time:, total_wait_time:, tikv_wall_time:} keep order:false, stats:pseudo N/A N/A explain analyze select * from t; id estRows actRows task access object execution info operator info memory disk TableReader_5 10000.00 5 root NULL time:, loops:, RU:, cop_task: {num:, max:, proc_keys:, tot_proc:, tot_wait:, copr_cache_hit_ratio:, build_task_duration:, max_distsql_concurrency:, rpc_info:{Cop:{num_rpc:, total_time:}} data:TableFullScan_4 Bytes N/A -└─TableFullScan_4 10000.00 5 cop[tikv] table:t tikv_task:{time:, loops:, scan_detail: {total_process_keys:, total_process_keys_size:, total_keys:, get_snapshot_time:, rocksdb: {key_skipped_count:, block: {}}}, time_detail: {total_process_time:, total_wait_time:, tikv_wall_time:} keep order:false, stats:pseudo N/A N/A +└─TableFullScan_4 10000.00 5 cop[tikv] table:t tikv_task:{time:, loops:, scan_detail: {total_process_keys:, total_process_keys_size:, total_keys:, get_snapshot_time:, rocksdb: {delete_skipped_count:, key_skipped_count:, block: {}}}, time_detail: {total_process_time:, total_wait_time:, tikv_wall_time:} keep order:false, stats:pseudo N/A N/A insert into t values (6, 6); affected rows: 1 info: -Destination Size BackupTS Queue Time Execution Time -/tmp/t_b7987183-d39f-4572-868d-75e25c8cd215 1876 451473879442653228 2024-07-29 14:56:13 2024-07-29 14:56:13 -affected rows: 0 -info: -affected rows: 0 -info: -affected rows: 0 -info: -Destination Size BackupTS Cluster TS Queue Time Execution Time -/tmp/t_b7987183-d39f-4572-868d-75e25c8cd215 1876 451473879442653228 451473880386371620 2024-07-29 14:56:17 2024-07-29 14:56:17 -affected rows: 0 -info: -affected rows: 0 -info: -affected rows: 0 -info: -affected rows: 0 -info: -Job_ID Data_Source Target_Table Table_ID Phase Status Source_File_Size Imported_Rows Result_Message Create_Time Start_Time End_Time Created_By -3 /tmp/t_6cac1a43-c66c-4af9-962f-95287fa12432/example.t.000000000.csv `example`.`td` 453 finished 30B 6 2024-07-29 14:56:17.619215 2024-07-29 14:56:18.125792 2024-07-29 14:56:19.640005 root@% -affected rows: 0 -info: DROP TABLE IF EXISTS t1; affected rows: 0 info: @@ -71,3 +49,8 @@ affected rows: 3 info: Records: 2 Duplicates: 1 Warnings: 0 1 use `test`;; +use example; +select * from t1; +f1 f2 +1 1 +2 2 diff --git a/src/main.go b/src/main.go index 17d922c..d59eeae 100644 --- a/src/main.go +++ b/src/main.go @@ -51,6 +51,15 @@ var ( checkErr bool pathBR string pathDumpling string + pathCDC string + addressCDC string + downstream string + + downStreamHost string + downStreamPort string + downStreamUser string + downStreamPassword string + downStreamDB string ) func init() { @@ -67,8 +76,11 @@ func init() { flag.IntVar(&retryConnCount, "retry-connection-count", 120, "The max number to retry to connect to the database.") flag.BoolVar(&checkErr, "check-error", false, "if --error ERR does not match, return error instead of just warn") flag.BoolVar(&collationDisable, "collation-disable", false, "run collation related-test with new-collation disabled") - flag.StringVar(&pathBR, "path-br", "", "Path of BR") - flag.StringVar(&pathDumpling, "path-dumpling", "", "Path of Dumpling") + flag.StringVar(&pathBR, "path-br", "", "Path of BR binary") + flag.StringVar(&pathDumpling, "path-dumpling", "", "Path of Dumpling binary") + flag.StringVar(&pathCDC, "path-cdc", "", "Path of TiCDC binary") + flag.StringVar(&addressCDC, "address-cdc", "127.0.0.1:8300", "Address of Server") + flag.StringVar(&downstream, "downstream", "", "Connection string of downstream TiDB cluster") } const ( @@ -165,6 +177,12 @@ type tester struct { // dump and import context through --dump_and_import $SOURCE_TABLE as $TARGET_TABLE' dumpAndImport *SourceAndTarget + + // replication checkpoint database name + replicationCheckpointDB string + + // replication checkpoint ID + replicationCheckpointID int } func newTester(name string) *tester { @@ -179,6 +197,8 @@ func newTester(name string) *tester { t.enableConcurrent = false t.enableInfo = false + t.replicationCheckpointDB = "checkpoint-" + uuid.NewString() + t.replicationCheckpointID = 0 return t } @@ -219,7 +239,7 @@ func isTiDB(db *sql.DB) bool { return true } -func (t *tester) addConnection(connName, hostName, userName, password, db string) { +func (t *tester) addConnection(connName, hostName, port, userName, password, db string) { var ( mdb *sql.DB err error @@ -285,6 +305,64 @@ func (t *tester) disconnect(connName string) { t.currConnName = default_connection } +func parseUserInfo(userInfo string) (string, string, error) { + colonIndex := strings.Index(userInfo, ":") + if colonIndex == -1 { + return "", "", fmt.Errorf("missing password in userinfo") + } + return userInfo[:colonIndex], userInfo[colonIndex+1:], nil +} + +func parseHostPort(hostPort string) (string, string, error) { + colonIndex := strings.Index(hostPort, ":") + if colonIndex == -1 { + return "", "", fmt.Errorf("missing port in host:port") + } + return hostPort[:colonIndex], hostPort[colonIndex+1:], nil +} + +func parseDownstream(connStr string) (dbname string, host string, port string, user string, password string) { + // Splitting into userinfo and network/database parts + parts := strings.SplitN(connStr, "@", 2) + if len(parts) != 2 { + fmt.Println("Invalid connection string format") + return + } + + // Parsing userinfo + userInfo := parts[0] + user, password, err := parseUserInfo(userInfo) + if err != nil { + fmt.Println("Error parsing userinfo:", err) + return + } + + // Splitting network type and database part + networkAndDB := parts[1] + networkTypeIndex := strings.Index(networkAndDB, "(") + if networkTypeIndex == -1 { + fmt.Println("Invalid connection string format: missing network type") + return + } + + // Extracting host, port, and database name + hostPortDB := networkAndDB[networkTypeIndex+1:] + hostPortDBParts := strings.SplitN(hostPortDB, ")/", 2) + if len(hostPortDBParts) != 2 { + fmt.Println("Invalid connection string format") + return + } + + host, port, err = parseHostPort(hostPortDBParts[0]) + if err != nil { + fmt.Println("Error parsing host and port:", err) + return + } + + dbname = hostPortDBParts[1] + return +} + func (t *tester) preProcess() { dbName := "test" mdb, err := OpenDBWithRetry("mysql", user+":"+passwd+"@tcp("+host+":"+port+")/"+dbName+"?time_zone=%27Asia%2FShanghai%27&allowAllFiles=true"+params, retryConnCount) @@ -313,6 +391,7 @@ func (t *tester) preProcess() { log.Fatalf("Executing create db %s err[%v]", dbName, err) } t.mdb = mdb + conn, err := initConn(mdb, user, passwd, host, dbName) if err != nil { log.Fatalf("Open db err %v", err) @@ -320,6 +399,17 @@ func (t *tester) preProcess() { t.conn[default_connection] = conn t.curr = conn t.currConnName = default_connection + + if downstream != "" { + // create replication checkpoint database + if _, err := t.mdb.Exec(fmt.Sprintf("create database if not exists `%s`", t.replicationCheckpointDB)); err != nil { + log.Fatalf("Executing create db %s err[%v]", t.replicationCheckpointDB, err) + } + + downStreamDB, downStreamHost, downStreamPort, downStreamUser, downStreamPassword = parseDownstream(downstream) + t.addConnection("downstream", downStreamHost, downStreamPort, downStreamUser, downStreamPassword, downStreamDB) + } + t.switchConnection(default_connection) } func (t *tester) postProcess() { @@ -329,6 +419,7 @@ func (t *tester) postProcess() { } t.mdb.Close() }() + t.switchConnection(default_connection) if !reserveSchema { rows, err := t.mdb.Query("show databases") if err != nil { @@ -384,6 +475,11 @@ func generateBRStatements(source, target string) (string, string) { } func (t *tester) dumpTable(source string) (string, error) { + // Check if the file exists + if _, err := os.Stat(pathDumpling); os.IsNotExist(err) { + return "", errors.New(fmt.Sprintf("path-dumpling [%s] does not exist.", pathDumpling)) + } + log.Warnf("Start dumping table: %s", source) path := "/tmp/" + source + "_" + uuid.NewString() cmdArgs := []string{ @@ -392,6 +488,8 @@ func (t *tester) dumpTable(source string) (string, error) { fmt.Sprintf("-u%s", user), fmt.Sprintf("-T%s.%s", t.name, source), fmt.Sprintf("-o%s", path), + "--output-filename-template", + "tempDump", "--no-header", "--filetype", "csv", @@ -405,9 +503,7 @@ func (t *tester) dumpTable(source string) (string, error) { output, err := cmd.CombinedOutput() if err != nil { - log.Warnf("Failed executing commands: %s, output: %s)", - cmd.String(), string(output)) - return "", err + return "", errors.Annotate(err, fmt.Sprintf("Dumpling failed: %s, output: %s.", cmd.String(), string(output))) } log.Warnf("Done executing commands: %s, output: %s)", cmd.String(), string(output)) @@ -417,10 +513,57 @@ func (t *tester) dumpTable(source string) (string, error) { func (t *tester) importTableStmt(path, target string) string { return fmt.Sprintf(` IMPORT INTO %s - FROM '%s/example.t.000000000.csv' + FROM '%s/tempDump.csv' `, target, path) } +func (t *tester) startReplication(tables string) error { + return nil +} + +func (t *tester) waitForReplicationCheckpoint() error { + curr := t.currConnName + defer t.switchConnection(curr) + + if err := t.executeStmt(fmt.Sprintf("use `%s`", t.replicationCheckpointDB)); err != nil { + return err + } + + markerTable := fmt.Sprintf("marker_%d", t.replicationCheckpointID) + if err := t.executeStmt(fmt.Sprintf("create table `%s`.`%s` (id int primary key)", t.replicationCheckpointDB, markerTable)); err != nil { + return err + } + + t.switchConnection("downstream") + + checkInterval := 1 * time.Second + queryTimeout := 10 * time.Second + + // Keep querying until the table is found + for { + ctx, cancel := context.WithTimeout(context.Background(), queryTimeout) + defer cancel() + + query := fmt.Sprintf("select * from information_schema.tables where table_schema = '%s' and table_name = '%s';", t.replicationCheckpointDB, markerTable) + rows, err := t.mdb.QueryContext(ctx, query) + if err != nil { + log.Printf("Error checking for table: %v", err) + return err + } + + if rows.Next() { + fmt.Printf("Table '%s' found!\n", markerTable) + break + } else { + fmt.Printf("Table '%s' not found. Retrying in %v...\n", markerTable, checkInterval) + } + + time.Sleep(checkInterval) + } + + return nil +} + func (t *tester) Run() error { t.preProcess() defer t.postProcess() @@ -543,7 +686,7 @@ func (t *tester) Run() error { for i := 0; i < 4; i++ { args = append(args, "") } - t.addConnection(args[0], args[1], args[2], args[3], args[4]) + t.addConnection(args[0], args[1], port, args[2], args[3], args[4]) case Q_CONNECTION: q.Query = strings.TrimSpace(q.Query) if q.Query[len(q.Query)-1] == ';' { @@ -593,16 +736,17 @@ func (t *tester) Run() error { } t.replaceRegex = regex case Q_BACKUP_AND_RESTORE: + if !isTiDB(t.mdb) { + return errors.New(fmt.Sprintf("backup_and_restore is only supported on TiDB, line: %d sql:%v", q.Line, q.Query)) + } t.backupAndRestore, err = parseSourceAndTarget(q.Query) if err != nil { return errors.Annotate(err, fmt.Sprintf("Could not parse backup table and restore table name in --backup_and_restore, line: %d sql:%v", q.Line, q.Query)) } backupStmt, restoreStmt := generateBRStatements(t.backupAndRestore.sourceTable, t.backupAndRestore.targetTable) - log.WithFields(log.Fields{"stmt": backupStmt, "line": q.Line}).Warn("Backup started") if err := t.executeStmt(backupStmt); err != nil { return err } - log.WithFields(log.Fields{"stmt": backupStmt, "line": q.Line}).Warn("Backup end") tempTable := t.backupAndRestore.sourceTable + uuid.NewString() renameStmt := fmt.Sprintf("RENAME TABLE `%s` TO `%s`", t.backupAndRestore.sourceTable, tempTable) if err := t.executeStmt(renameStmt); err != nil { @@ -612,11 +756,9 @@ func (t *tester) Run() error { if err := t.executeStmt(dupTableStmt); err != nil { return err } - log.WithFields(log.Fields{"stmt": restoreStmt, "line": q.Line}).Warn("Restore start") if err := t.executeStmt(restoreStmt); err != nil { return err } - log.WithFields(log.Fields{"stmt": restoreStmt, "line": q.Line}).Warn("Restore end") renameStmt = fmt.Sprintf("RENAME TABLE `%s` TO `%s`", t.backupAndRestore.sourceTable, t.backupAndRestore.targetTable) if err := t.executeStmt(renameStmt); err != nil { return err @@ -626,6 +768,9 @@ func (t *tester) Run() error { return err } case Q_DUMP_AND_IMPORT: + if !isTiDB(t.mdb) { + return errors.New(fmt.Sprintf("dump_and_import is only supported on TiDB, line: %d sql:%v", q.Line, q.Query)) + } t.dumpAndImport, err = parseSourceAndTarget(q.Query) if err != nil { return err @@ -634,19 +779,28 @@ func (t *tester) Run() error { if err != nil { return err } - - dupTableStmt := fmt.Sprintf("CREATE TABLE `%s` LIKE `%s`", t.dumpAndImport.targetTable, t.backupAndRestore.sourceTable) + dupTableStmt := fmt.Sprintf("CREATE TABLE `%s` LIKE `%s`", t.dumpAndImport.targetTable, t.dumpAndImport.sourceTable) if err := t.executeStmt(dupTableStmt); err != nil { return err } - importStmt := t.importTableStmt(path, t.dumpAndImport.targetTable) - log.WithFields(log.Fields{"stmt": importStmt, "line": q.Line}).Warn("Import start") if err = t.executeStmt(importStmt); err != nil { return err } - log.WithFields(log.Fields{"stmt": importStmt, "line": q.Line}).Warn("Restore end") - + case Q_REPLICATION: + if !isTiDB(t.mdb) { + return errors.New(fmt.Sprintf("replication is only supported on TiDB, line: %d sql:%v", q.Line, q.Query)) + } + if err := t.startReplication(q.Query); err != nil { + return err + } + case Q_REPLICATION_CHECKPOINT: + if !isTiDB(t.mdb) { + return errors.New(fmt.Sprintf("replication_checkpoint is only supported on TiDB, line: %d sql:%v", q.Line, q.Query)) + } + if err := t.waitForReplicationCheckpoint(); err != nil { + return err + } default: log.WithFields(log.Fields{"command": q.firstWord, "arguments": q.Query, "line": q.Line}).Warn("command not implemented") } @@ -663,7 +817,6 @@ func (t *tester) Run() error { if xmlPath != "" { t.addSuccess(&testSuite, &startTime, testCnt) } - return t.flushResult() } diff --git a/src/query.go b/src/query.go index de14658..461ab10 100644 --- a/src/query.go +++ b/src/query.go @@ -126,6 +126,8 @@ const ( Q_EMPTY_LINE Q_BACKUP_AND_RESTORE Q_DUMP_AND_IMPORT + Q_REPLICATION + Q_REPLICATION_CHECKPOINT ) // ParseQueries parses an array of string into an array of query object. diff --git a/src/type.go b/src/type.go index 2267d65..a4343c9 100644 --- a/src/type.go +++ b/src/type.go @@ -115,7 +115,9 @@ var commandMap = map[string]int{ "begin_concurrent": Q_BEGIN_CONCURRENT, "end_concurrent": Q_END_CONCURRENT, "backup_and_restore": Q_BACKUP_AND_RESTORE, - "dump_and_import": Q_DUMP_AND_IMPORT, + "dump_and_import": Q_DUMP_AND_IMPORT, + "replication_checkpoint": Q_REPLICATION_CHECKPOINT, + "replication": Q_REPLICATION, } func findType(cmdName string) int { diff --git a/src/util.go b/src/util.go index 33b3604..a677d9b 100644 --- a/src/util.go +++ b/src/util.go @@ -15,7 +15,6 @@ package main import ( "database/sql" - "fmt" "regexp" "strings" "time" @@ -118,8 +117,5 @@ func parseSourceAndTarget(s string) (*SourceAndTarget, error) { sourceTable: strings.TrimSpace(parts[0]), targetTable: strings.TrimSpace(parts[1]), } - - fmt.Printf("Parse source: %s and target: %s\n", st.sourceTable, st.targetTable) - return st, nil } diff --git a/t/br_integration.test b/t/br_integration.test new file mode 100644 index 0000000..2188841 --- /dev/null +++ b/t/br_integration.test @@ -0,0 +1,11 @@ +# Test BR and AutoIncrement + +CREATE TABLE t1 (a INT PRIMARY KEY NONCLUSTERED AUTO_INCREMENT, b INT) AUTO_ID_CACHE = 1; +INSERT INTO t1 (b) VALUES (1), (2), (3); +SHOW TABLE t1 NEXT_ROW_ID; + +--backup_and_restore t1 AS tt1 + +SHOW TABLE tt1 NEXT_ROW_ID; +INSERT INTO tt1 (b) VALUES (4), (5), (6); +SHOW TABLE tt1 NEXT_ROW_ID; \ No newline at end of file diff --git a/t/cdc_integration.test b/t/cdc_integration.test new file mode 100644 index 0000000..e0cd210 --- /dev/null +++ b/t/cdc_integration.test @@ -0,0 +1,12 @@ +# Test TiCDC replication + +CREATE TABLE t3 (a INT PRIMARY KEY, b INT, UNIQUE KEY (b)); + +INSERT INTO t3 VALUES (1, 23); +--error ER_DUP_ENTRY: Duplicate entry '23' for key 'b' +INSERT INTO t3 VALUES (11, 23); + +--replication_checkpoint +--connection downstream +--error ER_DUP_ENTRY: Duplicate entry '23' for key 'b' +INSERT INTO t3 VALUES (11, 23); diff --git a/t/dumpling_import_integration.test b/t/dumpling_import_integration.test new file mode 100644 index 0000000..27c56a7 --- /dev/null +++ b/t/dumpling_import_integration.test @@ -0,0 +1,10 @@ +# Test Lightning and AutoRandom + +CREATE TABLE t2(c BIGINT AUTO_RANDOM PRIMARY KEY, a INT, b INT); +INSERT INTO t2(a, b) VALUES (1, 1), (2, 2), (3, 3); +SELECT * FROM t2; + +--dump_and_import t2 AS tt2 + +INSERT INTO tt2(a, b) VALUES (1, 1), (2, 2), (3, 3); +SELECT * FROM tt2; \ No newline at end of file diff --git a/t/example.test b/t/example.test index 2cb4a00..8a1f389 100644 --- a/t/example.test +++ b/t/example.test @@ -38,9 +38,9 @@ explain analyze select * from t; --enable_info insert into t values (6, 6); ---backup_and_restore t AS tt +# --backup_and_restore t AS tt ---dump_and_import t AS td +# --dump_and_import t AS td DROP TABLE IF EXISTS t1; CREATE TABLE t1 (f1 INT PRIMARY KEY, f2 INT NOT NULL UNIQUE); @@ -52,3 +52,10 @@ INSERT t1 VALUES (1, 1), (1, 1) ON DUPLICATE KEY UPDATE f1 = 2, f2 = 2; --echo $a use `test`;; + +sleep 10; + +--replication_checkpoint +connection default; +use example; +select * from t1; From 26d4bb6c5b47639f4a6ec6dfe1fd846713204436 Mon Sep 17 00:00:00 2001 From: bb7133 Date: Thu, 10 Oct 2024 19:10:01 -0700 Subject: [PATCH 3/5] update go.mod for my personal repo --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index a45e217..37b9597 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/pingcap/mysql-tester +module github.com/bb7133/mysql-tester go 1.21 From 67e4fbf902623a98a859fb3d1bd39088bec8da6f Mon Sep 17 00:00:00 2001 From: bb7133 Date: Thu, 10 Oct 2024 22:55:29 -0700 Subject: [PATCH 4/5] use enableResultLog to refine the result --- src/main.go | 122 +++++++++++++++++++++++++++------------------------ src/query.go | 1 - src/type.go | 1 - 3 files changed, 64 insertions(+), 60 deletions(-) diff --git a/src/main.go b/src/main.go index d59eeae..9a78c87 100644 --- a/src/main.go +++ b/src/main.go @@ -474,6 +474,45 @@ func generateBRStatements(source, target string) (string, string) { return backupSQL, restoreSQL } +func (t *tester) handleBackupAndRestore(q query) error { + if !isTiDB(t.mdb) { + return errors.New(fmt.Sprintf("backup_and_restore is only supported on TiDB, line: %d sql:%v", q.Line, q.Query)) + } + t.enableResultLog = false + defer func() { t.enableResultLog = true }() + + var err error + t.backupAndRestore, err = parseSourceAndTarget(q.Query) + if err != nil { + return errors.Annotate(err, fmt.Sprintf("Could not parse backup table and restore table name in --backup_and_restore, line: %d sql:%v", q.Line, q.Query)) + } + backupStmt, restoreStmt := generateBRStatements(t.backupAndRestore.sourceTable, t.backupAndRestore.targetTable) + if err := t.executeStmt(backupStmt); err != nil { + return err + } + tempTable := t.backupAndRestore.sourceTable + uuid.NewString() + renameStmt := fmt.Sprintf("RENAME TABLE `%s` TO `%s`", t.backupAndRestore.sourceTable, tempTable) + if err := t.executeStmt(renameStmt); err != nil { + return err + } + dupTableStmt := fmt.Sprintf("CREATE TABLE `%s` LIKE `%s`", t.backupAndRestore.sourceTable, tempTable) + if err := t.executeStmt(dupTableStmt); err != nil { + return err + } + if err := t.executeStmt(restoreStmt); err != nil { + return err + } + renameStmt = fmt.Sprintf("RENAME TABLE `%s` TO `%s`", t.backupAndRestore.sourceTable, t.backupAndRestore.targetTable) + if err := t.executeStmt(renameStmt); err != nil { + return err + } + renameStmt = fmt.Sprintf("RENAME TABLE `%s` TO `%s`", tempTable, t.backupAndRestore.sourceTable) + if err := t.executeStmt(renameStmt); err != nil { + return err + } + return nil +} + func (t *tester) dumpTable(source string) (string, error) { // Check if the file exists if _, err := os.Stat(pathDumpling); os.IsNotExist(err) { @@ -517,7 +556,29 @@ func (t *tester) importTableStmt(path, target string) string { `, target, path) } -func (t *tester) startReplication(tables string) error { +func (t *tester) handleDumpAndImport(q query) error { + if !isTiDB(t.mdb) { + return errors.New(fmt.Sprintf("dump_and_import is only supported on TiDB, line: %d sql:%v", q.Line, q.Query)) + } + t.enableResultLog = false + defer func() { t.enableResultLog = true }() + var err error + t.dumpAndImport, err = parseSourceAndTarget(q.Query) + if err != nil { + return err + } + path, err := t.dumpTable(t.dumpAndImport.sourceTable) + if err != nil { + return err + } + dupTableStmt := fmt.Sprintf("CREATE TABLE `%s` LIKE `%s`", t.dumpAndImport.targetTable, t.dumpAndImport.sourceTable) + if err := t.executeStmt(dupTableStmt); err != nil { + return err + } + importStmt := t.importTableStmt(path, t.dumpAndImport.targetTable) + if err = t.executeStmt(importStmt); err != nil { + return err + } return nil } @@ -736,64 +797,9 @@ func (t *tester) Run() error { } t.replaceRegex = regex case Q_BACKUP_AND_RESTORE: - if !isTiDB(t.mdb) { - return errors.New(fmt.Sprintf("backup_and_restore is only supported on TiDB, line: %d sql:%v", q.Line, q.Query)) - } - t.backupAndRestore, err = parseSourceAndTarget(q.Query) - if err != nil { - return errors.Annotate(err, fmt.Sprintf("Could not parse backup table and restore table name in --backup_and_restore, line: %d sql:%v", q.Line, q.Query)) - } - backupStmt, restoreStmt := generateBRStatements(t.backupAndRestore.sourceTable, t.backupAndRestore.targetTable) - if err := t.executeStmt(backupStmt); err != nil { - return err - } - tempTable := t.backupAndRestore.sourceTable + uuid.NewString() - renameStmt := fmt.Sprintf("RENAME TABLE `%s` TO `%s`", t.backupAndRestore.sourceTable, tempTable) - if err := t.executeStmt(renameStmt); err != nil { - return err - } - dupTableStmt := fmt.Sprintf("CREATE TABLE `%s` LIKE `%s`", t.backupAndRestore.sourceTable, tempTable) - if err := t.executeStmt(dupTableStmt); err != nil { - return err - } - if err := t.executeStmt(restoreStmt); err != nil { - return err - } - renameStmt = fmt.Sprintf("RENAME TABLE `%s` TO `%s`", t.backupAndRestore.sourceTable, t.backupAndRestore.targetTable) - if err := t.executeStmt(renameStmt); err != nil { - return err - } - renameStmt = fmt.Sprintf("RENAME TABLE `%s` TO `%s`", tempTable, t.backupAndRestore.sourceTable) - if err := t.executeStmt(renameStmt); err != nil { - return err - } + t.handleBackupAndRestore(q) case Q_DUMP_AND_IMPORT: - if !isTiDB(t.mdb) { - return errors.New(fmt.Sprintf("dump_and_import is only supported on TiDB, line: %d sql:%v", q.Line, q.Query)) - } - t.dumpAndImport, err = parseSourceAndTarget(q.Query) - if err != nil { - return err - } - path, err := t.dumpTable(t.dumpAndImport.sourceTable) - if err != nil { - return err - } - dupTableStmt := fmt.Sprintf("CREATE TABLE `%s` LIKE `%s`", t.dumpAndImport.targetTable, t.dumpAndImport.sourceTable) - if err := t.executeStmt(dupTableStmt); err != nil { - return err - } - importStmt := t.importTableStmt(path, t.dumpAndImport.targetTable) - if err = t.executeStmt(importStmt); err != nil { - return err - } - case Q_REPLICATION: - if !isTiDB(t.mdb) { - return errors.New(fmt.Sprintf("replication is only supported on TiDB, line: %d sql:%v", q.Line, q.Query)) - } - if err := t.startReplication(q.Query); err != nil { - return err - } + t.handleDumpAndImport(q) case Q_REPLICATION_CHECKPOINT: if !isTiDB(t.mdb) { return errors.New(fmt.Sprintf("replication_checkpoint is only supported on TiDB, line: %d sql:%v", q.Line, q.Query)) diff --git a/src/query.go b/src/query.go index 461ab10..1ecf40c 100644 --- a/src/query.go +++ b/src/query.go @@ -126,7 +126,6 @@ const ( Q_EMPTY_LINE Q_BACKUP_AND_RESTORE Q_DUMP_AND_IMPORT - Q_REPLICATION Q_REPLICATION_CHECKPOINT ) diff --git a/src/type.go b/src/type.go index a4343c9..b346e10 100644 --- a/src/type.go +++ b/src/type.go @@ -117,7 +117,6 @@ var commandMap = map[string]int{ "backup_and_restore": Q_BACKUP_AND_RESTORE, "dump_and_import": Q_DUMP_AND_IMPORT, "replication_checkpoint": Q_REPLICATION_CHECKPOINT, - "replication": Q_REPLICATION, } func findType(cmdName string) int { From 2148bd9e5299de307244a15ed0047c953a035dc4 Mon Sep 17 00:00:00 2001 From: bb7133 Date: Wed, 23 Oct 2024 21:30:51 -0700 Subject: [PATCH 5/5] fix a bug related to BR --- src/main.go | 12 ++++++------ t/br_integration.test | 4 +--- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/src/main.go b/src/main.go index 9a78c87..1682edd 100644 --- a/src/main.go +++ b/src/main.go @@ -495,10 +495,6 @@ func (t *tester) handleBackupAndRestore(q query) error { if err := t.executeStmt(renameStmt); err != nil { return err } - dupTableStmt := fmt.Sprintf("CREATE TABLE `%s` LIKE `%s`", t.backupAndRestore.sourceTable, tempTable) - if err := t.executeStmt(dupTableStmt); err != nil { - return err - } if err := t.executeStmt(restoreStmt); err != nil { return err } @@ -797,9 +793,13 @@ func (t *tester) Run() error { } t.replaceRegex = regex case Q_BACKUP_AND_RESTORE: - t.handleBackupAndRestore(q) + if err := t.handleBackupAndRestore(q); err != nil { + return err + } case Q_DUMP_AND_IMPORT: - t.handleDumpAndImport(q) + if err := t.handleDumpAndImport(q); err != nil { + return err + } case Q_REPLICATION_CHECKPOINT: if !isTiDB(t.mdb) { return errors.New(fmt.Sprintf("replication_checkpoint is only supported on TiDB, line: %d sql:%v", q.Line, q.Query)) diff --git a/t/br_integration.test b/t/br_integration.test index 2188841..5645c37 100644 --- a/t/br_integration.test +++ b/t/br_integration.test @@ -1,11 +1,9 @@ # Test BR and AutoIncrement -CREATE TABLE t1 (a INT PRIMARY KEY NONCLUSTERED AUTO_INCREMENT, b INT) AUTO_ID_CACHE = 1; +CREATE TABLE t1 (a INT PRIMARY KEY NONCLUSTERED AUTO_INCREMENT, b INT) AUTO_ID_CACHE = 100; INSERT INTO t1 (b) VALUES (1), (2), (3); SHOW TABLE t1 NEXT_ROW_ID; --backup_and_restore t1 AS tt1 -SHOW TABLE tt1 NEXT_ROW_ID; -INSERT INTO tt1 (b) VALUES (4), (5), (6); SHOW TABLE tt1 NEXT_ROW_ID; \ No newline at end of file