Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

loader, syncer: support create/drop view #1310

Open
wants to merge 26 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
3fafe7b
save work
lance6716 Nov 27, 2020
246e4bb
add test
lance6716 Nov 27, 2020
a6878f8
add ut
lance6716 Nov 27, 2020
7412f64
Merge branch 'master' of https://github.com/pingcap/dm into support-view
lance6716 Nov 27, 2020
ba1e0b2
fix CI
lance6716 Nov 27, 2020
8089a8e
change script
lance6716 Nov 27, 2020
e431dda
fix CI
lance6716 Nov 27, 2020
0823ba2
Merge branch 'master' into support-view
GMHDBJD Nov 27, 2020
fbcd1fc
Merge branch 'master' of https://github.com/pingcap/dm into support-view
lance6716 Nov 28, 2020
7d6fa37
support "as select xxx" in view
lance6716 Nov 28, 2020
89934cc
remove debug
lance6716 Nov 28, 2020
b7f7438
remove TODO
lance6716 Nov 28, 2020
8a54a3d
try fix CI
lance6716 Nov 28, 2020
0b14140
add check and remove unused code
lance6716 Nov 28, 2020
d168716
address comment
lance6716 Dec 1, 2020
4d94b05
Merge branch 'master' into support-view
lance6716 Dec 1, 2020
16897d4
Merge branch 'master' of https://github.com/pingcap/dm into support-view
lance6716 Dec 23, 2020
ad14b9d
bring VIEW test back
lance6716 Dec 23, 2020
1fa1a7e
Merge branch 'master' of https://github.com/pingcap/dm into support-view
lance6716 Dec 30, 2020
a9fdc88
VIEW didn't need sync group
lance6716 Dec 30, 2020
5f55bf3
update dumpling
lance6716 Dec 31, 2020
a91adcb
register / remove metrics of dumpling
lance6716 Dec 31, 2020
ff9d825
Revert "register / remove metrics of dumpling"
lance6716 Dec 31, 2020
690b7e7
Revert "update dumpling"
lance6716 Dec 31, 2020
56ba826
test shard DDL mixed with VIEW
lance6716 Jan 7, 2021
1cd0dc4
improve wait time
lance6716 Jan 7, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ ErrLoadUnitNoTableFile,[code=34013:class=load-unit:scope=internal:level=high], "
ErrLoadUnitDumpDirNotFound,[code=34014:class=load-unit:scope=internal:level=high], "Message: %s does not exist or it's not a dir"
ErrLoadUnitDuplicateTableFile,[code=34015:class=load-unit:scope=internal:level=high], "Message: invalid table schema file, duplicated item - %s"
ErrLoadUnitGenBAList,[code=34016:class=load-unit:scope=internal:level=high], "Message: generate block allow list, Workaround: Please check the `block-allow-list` config in task configuration file."
ErrLoadUnitNoTableFileForView,[code=34017:class=load-unit:scope=internal:level=high], "Message: invalid view sql file, cannot find table - %s"
ErrSyncerUnitPanic,[code=36001:class=sync-unit:scope=internal:level=high], "Message: panic error: %v"
ErrSyncUnitInvalidTableName,[code=36002:class=sync-unit:scope=internal:level=high], "Message: extract table name for DML error: %s"
ErrSyncUnitTableNameQuery,[code=36003:class=sync-unit:scope=internal:level=high], "Message: table name parse error: %s"
Expand Down
1 change: 1 addition & 0 deletions dumpling/dumpling.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func (m *Dumpling) constructArgs() (*export.Config, error) {
dumpConfig.TableFilter = tableFilter
dumpConfig.CompleteInsert = true // always keep column name in `INSERT INTO` statements.
dumpConfig.Logger = m.logger.Logger
dumpConfig.NoViews = false

if cfg.Threads > 0 {
dumpConfig.Threads = cfg.Threads
Expand Down
6 changes: 6 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1468,6 +1468,12 @@ description = ""
workaround = "Please check the `block-allow-list` config in task configuration file."
tags = ["internal", "high"]

[error.DM-load-unit-34017]
message = "invalid view sql file, cannot find table - %s"
description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-sync-unit-36001]
message = "panic error: %v"
description = ""
Expand Down
162 changes: 160 additions & 2 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/parser"
tmysql "github.com/pingcap/parser/mysql"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/dm/unit"
Expand All @@ -36,6 +39,7 @@ import (
"github.com/pingcap/dm/pkg/dumpling"
fr "github.com/pingcap/dm/pkg/func-rollback"
"github.com/pingcap/dm/pkg/log"
parserpkg "github.com/pingcap/dm/pkg/parser"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"

Expand Down Expand Up @@ -384,7 +388,9 @@ type Loader struct {

// db -> tables
// table -> data files
db2Tables map[string]Tables2DataFiles
db2Tables map[string]Tables2DataFiles
// db -> views
db2Views map[string]map[string]struct{}
tableInfos map[string]*tableInfo

// for every worker goroutine, not for every data file
Expand Down Expand Up @@ -420,6 +426,7 @@ func NewLoader(cfg *config.SubTaskConfig) *Loader {
loader := &Loader{
cfg: cfg,
db2Tables: make(map[string]Tables2DataFiles),
db2Views: make(map[string]map[string]struct{}),
tableInfos: make(map[string]*tableInfo),
workerWg: new(sync.WaitGroup),
logger: log.With(zap.String("task", cfg.Name), zap.String("unit", "load")),
Expand Down Expand Up @@ -945,6 +952,44 @@ func (l *Loader) prepareTableFiles(files map[string]struct{}) error {
return nil
}

func (l *Loader) prepareViewFiles(files map[string]struct{}) error {
for file := range files {
if !strings.HasSuffix(file, "-schema-view.sql") {
continue
}

idx := strings.LastIndex(file, "-schema-view.sql")
name := file[:idx]
fields := strings.Split(name, ".")
if len(fields) != 2 {
l.logger.Warn("invalid view file", zap.String("file", file))
continue
}

db, table := fields[0], fields[1]
if l.skipSchemaAndTable(&filter.Table{Schema: db, Name: table}) {
l.logger.Warn("ignore view file", zap.String("view file", file))
continue
}
// because there's a table file for view file, we skip this check
tables := l.db2Tables[db]
if _, ok := tables[table]; !ok {
return terror.ErrLoadUnitNoTableFileForView.Generate(file)
}

views, ok := l.db2Views[db]
if !ok {
l.db2Views[db] = map[string]struct{}{}
views = l.db2Views[db]
}
views[table] = struct{}{}

l.totalFileCount.Add(1) // for view
}

return nil
}

func (l *Loader) prepareDataFiles(files map[string]struct{}) error {
var dataFilesNumber float64

Expand Down Expand Up @@ -1045,6 +1090,11 @@ func (l *Loader) prepare() error {
return err
}

// Sql file for create view
if err := l.prepareViewFiles(files); err != nil {
return err
}

// Sql file for restore data
return l.prepareDataFiles(files)
}
Expand Down Expand Up @@ -1083,6 +1133,103 @@ func (l *Loader) restoreTable(ctx context.Context, conn *DBConn, sqlFile, schema
return nil
}

// restoreView drops dummy table and create view
func (l *Loader) restoreView(ctx context.Context, conn *DBConn, sqlFile, schema, view string) error {
// dumpling will generate such a viewFile
// /*!40101 SET NAMES binary*/;
// DROP TABLE IF EXISTS `v2`;
// DROP VIEW IF EXISTS `v2`;
// SET @PREV_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT;
// SET @PREV_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS;
// SET @PREV_COLLATION_CONNECTION=@@COLLATION_CONNECTION;
// SET character_set_client = utf8;
// SET character_set_results = utf8;
// SET collation_connection = utf8_general_ci;
// CREATE ALGORITHM=UNDEFINED DEFINER="root"@"localhost" SQL SECURITY DEFINER VIEW "all_mode"."v2" AS select "all_mode"."t2"."id" AS "id" from "all_mode"."t2";
// SET character_set_client = @PREV_CHARACTER_SET_CLIENT;
// SET character_set_results = @PREV_CHARACTER_SET_RESULTS;
// SET collation_connection = @PREV_COLLATION_CONNECTION;
f, err := os.Open(sqlFile)
if err != nil {
return terror.ErrLoadUnitReadSchemaFile.Delegate(err)
}
defer f.Close()

tctx := tcontext.NewContext(ctx, l.logger)

var sqls []string
dstSchema, dstView := fetchMatchedLiteral(tctx, l.tableRouter, schema, view)
sqls = append(sqls, fmt.Sprintf("USE `%s`;", unescapePercent(dstSchema, l.logger)))
sqlMode, err := tmysql.GetSQLMode(l.cfg.SQLMode)
if err != nil {
// should not happened
return terror.ErrGetSQLModeFromStr.Generate(l.cfg.SQLMode)
}

data := make([]byte, 0, 1024*1024)
br := bufio.NewReader(f)
for {
line, err := br.ReadString('\n')
if err == io.EOF {
break
}

realLine := strings.TrimSpace(line[:len(line)-1])
if len(realLine) == 0 {
continue
}

data = append(data, []byte(realLine)...)
if data[len(data)-1] == ';' {
query := string(data)
data = data[0:0]
if strings.HasPrefix(query, "/*") && strings.HasSuffix(query, "*/;") {
continue
}

if strings.HasPrefix(query, "DROP") {
query = renameShardingTable(query, view, dstView, false)
} else if strings.HasPrefix(query, "CREATE") {
// create view statement could be complicated because it has a select
p := parser.New()
p.SetSQLMode(sqlMode)
stmt, err := p.ParseOneStmt(query, "", "")
if err != nil {
return terror.ErrLoadUnitParseStatement.Generate(query)
}

tableNames, err := parserpkg.FetchDDLTableNames(schema, stmt)
if err != nil {
return terror.WithScope(err, terror.ScopeInternal)
}
targetTableNames := make([]*filter.Table, 0, len(tableNames))
for i := range tableNames {
dstSchema, dstTable := fetchMatchedLiteral(tctx, l.tableRouter, tableNames[i].Schema, tableNames[i].Name)
tableName := &filter.Table{
Schema: dstSchema,
Name: dstTable,
}
targetTableNames = append(targetTableNames, tableName)
}
query, err = parserpkg.RenameDDLTable(stmt, targetTableNames)
if err != nil {
return terror.WithScope(err, terror.ScopeInternal)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sames that loader will ignore all SET sqls here. Is this by design?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forgot 😂

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, query will be added in L1222. We just didn't rename them

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, yes. My fault. Maybe you can add a comment before L1191 🤣


l.logger.Debug("view create statement", zap.String("sql", query))

sqls = append(sqls, query)
}
}
err = conn.executeSQL(tctx, sqls)
if err != nil {
return terror.WithScope(err, terror.ScopeDownstream)
}

return nil
}

// restoreStruture creates schema or table
func (l *Loader) restoreStructure(ctx context.Context, conn *DBConn, sqlFile string, schema string, table string) error {
f, err := os.Open(sqlFile)
Expand Down Expand Up @@ -1204,6 +1351,7 @@ func (l *Loader) restoreData(ctx context.Context) error {

for _, db := range dbs {
tables := l.db2Tables[db]
views := l.db2Views[db]

// create db
dbFile := fmt.Sprintf("%s/%s-schema-create.sql", l.cfg.Dir, db)
Expand Down Expand Up @@ -1272,8 +1420,18 @@ func (l *Loader) restoreData(ctx context.Context) error {
dispatchMap[fmt.Sprintf("%s_%s_%s", db, table, file)] = j
}
}

for view := range views {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to create the view in a different database with the table?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess user may create that type of view. And dumpling will generate a "table file" for each view at view's database, I guess this logic is fine?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in d168716

viewFile := fmt.Sprintf("%s/%s.%s-schema-view.sql", l.cfg.Dir, db, view)
l.logger.Info("start to create view", zap.String("view file", viewFile))
err := l.restoreView(ctx, dbConn, viewFile, db, view)
if err != nil {
return err
}
l.logger.Info("finish to create view", zap.String("view file", viewFile))
}
Comment on lines +1418 to +1426
Copy link
Contributor

@lichunzhu lichunzhu Nov 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if some views needed tables in other databases are not created yet? I think we should create views when all tables in all databases have been created correctly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in d168716

}
l.logger.Info("finish to create tables", zap.Duration("cost time", time.Since(begin)))
l.logger.Info("finish to create tables and views", zap.Duration("cost time", time.Since(begin)))

// a simple and naive approach to dispatch files randomly based on the feature of golang map(range by random)
for _, j := range dispatchMap {
Expand Down
64 changes: 59 additions & 5 deletions pkg/parser/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,27 @@ type tableNameExtractor struct {
}

func (tne *tableNameExtractor) Enter(in ast.Node) (ast.Node, bool) {
if t, ok := in.(*ast.TableName); ok {
tb := &filter.Table{Schema: t.Schema.L, Name: t.Name.L}
switch n := in.(type) {
case *ast.TableName:
tb := &filter.Table{Schema: n.Schema.L, Name: n.Name.L}
if tb.Schema == "" {
tb.Schema = tne.curDB
}
tne.names = append(tne.names, tb)
return in, true
case *ast.ColumnName:
tb := &filter.Table{Schema: n.Schema.L, Name: n.Table.L}
// this column has specified a table, such as
// CREATE VIEW `v1` AS SELECT `t1`.`c1` AS `c1` FROM `t1`
if tb.Name != "" {
if tb.Schema == "" {
tb.Schema = tne.curDB
}
tne.names = append(tne.names, tb)
}
return in, true
}

return in, false
}

Expand Down Expand Up @@ -107,16 +120,29 @@ func (v *tableRenameVisitor) Enter(in ast.Node) (ast.Node, bool) {
if v.hasErr {
return in, true
}
if t, ok := in.(*ast.TableName); ok {
switch n := in.(type) {
case *ast.TableName:
if v.i >= len(v.targetNames) {
v.hasErr = true
return in, true
}
t.Schema = model.NewCIStr(v.targetNames[v.i].Schema)
t.Name = model.NewCIStr(v.targetNames[v.i].Name)
n.Schema = model.NewCIStr(v.targetNames[v.i].Schema)
n.Name = model.NewCIStr(v.targetNames[v.i].Name)
v.i++
return in, true
case *ast.ColumnName:
if n.Table.L != "" {
if v.i >= len(v.targetNames) {
v.hasErr = true
return in, true
}
n.Schema = model.NewCIStr(v.targetNames[v.i].Schema)
n.Table = model.NewCIStr(v.targetNames[v.i].Name)
v.i++
}
return in, true
}

return in, false
}

Expand Down Expand Up @@ -167,6 +193,31 @@ func RenameDDLTable(stmt ast.StmtNode, targetTableNames []*filter.Table) (string
return bf.String(), nil
}

type dbNameAppender struct {
curDB model.CIStr
}

func (v *dbNameAppender) Enter(in ast.Node) (ast.Node, bool) {
switch n := in.(type) {
case *ast.TableName:
if n.Schema.O == "" {
n.Schema = v.curDB
}
return in, true
case *ast.ColumnName:
if n.Table.O != "" && n.Schema.O == "" {
n.Schema = v.curDB
}
return in, true
}

return in, false
}

func (v *dbNameAppender) Leave(in ast.Node) (ast.Node, bool) {
return in, true
}

// SplitDDL splits multiple operations in one DDL statement into multiple DDL statements
// returned DDL is formatted like StringSingleQuotes, KeyWordUppercase and NameBackQuotes
// if fail to restore, it would not restore the value of `stmt` (it changes it's values if `stmt` is one of DropTableStmt, RenameTableStmt, AlterTableStmt)
Expand Down Expand Up @@ -288,6 +339,9 @@ func SplitDDL(stmt ast.StmtNode, schema string) (sqls []string, err error) {
v.Table = table

return sqls, nil
case *ast.CreateViewStmt:
visitor := &dbNameAppender{curDB: schemaName}
v.Accept(visitor)
default:
return nil, terror.ErrUnknownTypeDDL.Generate(stmt)
}
Expand Down
Loading