diff --git a/.gitignore b/.gitignore index 6b79b237e4..a2a2f8c23d 100644 --- a/.gitignore +++ b/.gitignore @@ -16,4 +16,4 @@ vendor tidb-slow.log /monitoring/dashboards/dm.json /monitoring/rules/dm_worker.rules.yml -mysql.*.log \ No newline at end of file +mysql.*.log diff --git a/loader/loader.go b/loader/loader.go index b9809a0fba..4255a93408 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -18,6 +18,7 @@ import ( "bytes" "context" "encoding/hex" + "fmt" "io" "io/ioutil" "os" @@ -26,8 +27,6 @@ import ( "sync" "time" - "golang.org/x/sync/errgroup" - "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/dm/unit" @@ -1185,222 +1184,82 @@ func fetchMatchedLiteral(ctx *tcontext.Context, router *router.Table, schema, ta return targetSchema, targetTable } -// `restore Schema Job` present a data structure of schema restoring job. -type restoreSchemaJob struct { - loader *Loader - session *DBConn - database string // database name - table string // table name, empty if it's a schema of database - filepath string // file path of dumpped schema file -} - -// `jobQueue` of schema restoring which (only) support consumptions concurrently. -type jobQueue struct { - ctx context.Context - msgq chan *restoreSchemaJob // job message queue channel - consumerCount int // count of consumers - eg *errgroup.Group // err wait group of consumer's go-routines -} - -// `newJobQueue` consturct a jobQueue. -func newJobQueue(ctx context.Context, consumerCount, length int) *jobQueue { - eg, selfCtx := errgroup.WithContext(ctx) - return &jobQueue{ - ctx: selfCtx, - msgq: make(chan *restoreSchemaJob, length), - consumerCount: consumerCount, - eg: eg, - } -} +func (l *Loader) restoreData(ctx context.Context) error { + begin := time.Now() -// `push` will append a job to the queue. -func (q *jobQueue) push(job *restoreSchemaJob) error { - var err error - select { - case <-q.ctx.Done(): - err = q.ctx.Err() - case q.msgq <- job: + baseConn, err := l.toDB.GetBaseConn(ctx) + if err != nil { + return err } - return terror.WithScope(err, terror.ScopeInternal) -} - -// `close` wait jobs done and close queue forever. -func (q *jobQueue) close() error { - // queue is closing - close(q.msgq) - // wait until go-routines of consumption was exited - return q.eg.Wait() -} + defer func() { + err2 := l.toDB.CloseBaseConn(baseConn) + if err2 != nil { + l.logger.Warn("fail to close connection", zap.Error(err2)) + } + }() -// `startConsumers` run multiple go-routines of job consumption with user defined handler. -func (q *jobQueue) startConsumers(handler func(ctx context.Context, job *restoreSchemaJob) error) { - for i := 0; i < q.consumerCount; i++ { - q.eg.Go(func() error { - var session *DBConn - consumeLoop: - for { - select { - case <-q.ctx.Done(): - err := q.ctx.Err() - return err - case job, active := <-q.msgq: - if !active { - break consumeLoop - } - // test condition for `job.session` means db session still could be controlled outside, - // it's used in unit test for now. - if session == nil && job.session == nil { - baseConn, err2 := job.loader.toDB.GetBaseConn(q.ctx) - if err2 != nil { - return err2 - } - defer func(baseConn *conn.BaseConn) { - err2 := job.loader.toDB.CloseBaseConn(baseConn) - if err2 != nil { - job.loader.logger.Warn("fail to close connection", zap.Error(err2)) - } - }(baseConn) - session = &DBConn{ - cfg: job.loader.cfg, - baseConn: baseConn, - resetBaseConnFn: func(*tcontext.Context, *conn.BaseConn) (*conn.BaseConn, error) { - return nil, terror.ErrDBBadConn.Generate("bad connection error restoreData") - }, - } - } - if job.session == nil { - job.session = session - } - err := handler(q.ctx, job) - if err != nil { - return err - } - } - } - return nil - }) + dbConn := &DBConn{ + cfg: l.cfg, + baseConn: baseConn, + resetBaseConnFn: func(*tcontext.Context, *conn.BaseConn) (*conn.BaseConn, error) { + return nil, terror.ErrDBBadConn.Generate("bad connection error restoreData") + }, } -} -func (l *Loader) restoreData(ctx context.Context) error { - begin := time.Now() dispatchMap := make(map[string]*fileJob) - concurrency := l.cfg.PoolSize - // `for v := range map` would present random order - // `dbs` array keep same order for restore schema job generating - var err error + + // restore db in sort dbs := make([]string, 0, len(l.db2Tables)) for db := range l.db2Tables { dbs = append(dbs, db) } - tctx := tcontext.NewContext(ctx, l.logger) - // run consumers of restore database schema queue - dbRestoreQueue := newJobQueue(ctx, concurrency, concurrency /** length of queue */) - dbRestoreQueue.startConsumers(func(ctx context.Context, job *restoreSchemaJob) error { - // restore database schema - job.loader.logger.Info("start to create schema", zap.String("schema file", job.filepath)) - err2 := job.loader.restoreSchema(ctx, job.session, job.filepath, job.database) - if err2 != nil { - return err2 - } - job.loader.logger.Info("finish to create schema", zap.String("schema file", job.filepath)) - return nil - }) + tctx := tcontext.NewContext(ctx, l.logger) - // push database schema restoring jobs to the queue for _, db := range dbs { - schemaFile := l.cfg.Dir + "/" + db + "-schema-create.sql" // cache friendly - err = dbRestoreQueue.push(&restoreSchemaJob{ - loader: l, - database: db, - table: "", - filepath: schemaFile, - }) + tables := l.db2Tables[db] + + // create db + dbFile := fmt.Sprintf("%s/%s-schema-create.sql", l.cfg.Dir, db) + l.logger.Info("start to create schema", zap.String("schema file", dbFile)) + err = l.restoreSchema(ctx, dbConn, dbFile, db) if err != nil { - break + return err } - } + l.logger.Info("finish to create schema", zap.String("schema file", dbFile)) - // check producing error - if err != nil { - runtimeErr := dbRestoreQueue.close() - if errors.ErrorEqual(err, context.Canceled) { - err = runtimeErr + tnames := make([]string, 0, len(tables)) + for t := range tables { + tnames = append(tnames, t) } - return err - } - // wait whole task done & close queue - err = dbRestoreQueue.close() - if err != nil { - return err - } - - // run consumers of restore table schema queue - tblRestoreQueue := newJobQueue(ctx, concurrency, concurrency /** length of queue */) - tblRestoreQueue.startConsumers(func(ctx context.Context, job *restoreSchemaJob) error { - job.loader.logger.Info("start to create table", zap.String("table file", job.filepath)) - err2 := job.loader.restoreTable(ctx, job.session, job.filepath, job.database, job.table) - if err2 != nil { - return err2 - } - job.loader.logger.Info("finish to create table", zap.String("table file", job.filepath)) - return nil - }) - - // push table schema restoring jobs to the queue -tblSchemaLoop: - for _, db := range dbs { - for table := range l.db2Tables[db] { - schemaFile := l.cfg.Dir + "/" + db + "." + table + "-schema.sql" // cache friendly + for _, table := range tnames { + dataFiles := tables[table] + tableFile := l.cfg.Dir + "/" + db + "." + table + "-schema.sql" // cache friendly if _, ok := l.tableInfos[tableName(db, table)]; !ok { - l.tableInfos[tableName(db, table)], err = parseTable(tctx, l.tableRouter, db, table, schemaFile, l.cfg.LoaderConfig.SQLMode) + l.tableInfos[tableName(db, table)], err = parseTable(tctx, l.tableRouter, db, table, tableFile, l.cfg.LoaderConfig.SQLMode) if err != nil { - err = terror.Annotatef(err, "parse table %s/%s", db, table) - break tblSchemaLoop + return terror.Annotatef(err, "parse table %s/%s", db, table) } } + if l.checkPoint.IsTableFinished(db, table) { l.logger.Info("table has finished, skip it.", zap.String("schema", db), zap.String("table", table)) continue } - err = tblRestoreQueue.push(&restoreSchemaJob{ - loader: l, - database: db, - table: table, - filepath: schemaFile, - }) + + // create table + l.logger.Info("start to create table", zap.String("table file", tableFile)) + err := l.restoreTable(ctx, dbConn, tableFile, db, table) if err != nil { - break tblSchemaLoop + return err } - } - } + l.logger.Info("finish to create table", zap.String("table file", tableFile)) - // check producing error - if err != nil { - runtimeErr := tblRestoreQueue.close() - if errors.ErrorEqual(err, context.Canceled) { - err = runtimeErr - } - return err - } - // wait whole task done & close queue - err = tblRestoreQueue.close() - if err != nil { - return err - } - - // all schemas was restored - l.logger.Info("finish to create tables", zap.Duration("cost time", time.Since(begin))) - - // generate restore table data file job - for _, db := range dbs { - table2DataFileMap := l.db2Tables[db] - for table := range table2DataFileMap { restoringFiles := l.checkPoint.GetRestoringFileInfo(db, table) l.logger.Debug("restoring table data", zap.String("schema", db), zap.String("table", table), zap.Reflect("data files", restoringFiles)) - for _, file := range table2DataFileMap[table] { + info := l.tableInfos[tableName(db, table)] + for _, file := range dataFiles { select { case <-ctx.Done(): l.logger.Warn("stop generate data file job", log.ShortError(ctx.Err())) @@ -1408,6 +1267,7 @@ tblSchemaLoop: default: // do nothing } + l.logger.Debug("dispatch data file", zap.String("schema", db), zap.String("table", table), zap.String("data file", file)) offset := int64(uninitializedOffset) @@ -1415,16 +1275,19 @@ tblSchemaLoop: if ok { offset = posSet[0] } - dispatchMap[db+"_"+table+"_"+file] = &fileJob{ + + j := &fileJob{ schema: db, table: table, dataFile: file, offset: offset, - info: l.tableInfos[tableName(db, table)], + info: info, } + dispatchMap[fmt.Sprintf("%s_%s_%s", db, table, file)] = j } } } + l.logger.Info("finish to create tables", zap.Duration("cost time", time.Since(begin))) // a simple and naive approach to dispatch files randomly based on the feature of golang map(range by random) for _, j := range dispatchMap { diff --git a/loader/loader_test.go b/loader/loader_test.go deleted file mode 100644 index e124a30398..0000000000 --- a/loader/loader_test.go +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright 2021 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package loader - -import ( - "context" - - . "github.com/pingcap/check" - "github.com/pingcap/errors" -) - -var _ = Suite(&testLoaderSuite{}) - -type testLoaderSuite struct{} - -func (*testLoaderSuite) TestJobQueue(c *C) { - procedure := func(ctx context.Context, jobsCount int, handler func(ctx context.Context, job *restoreSchemaJob) error) error { - jobQueue := newJobQueue(ctx, 16, 16) - jobQueue.startConsumers(handler) - for i := 0; i < jobsCount; i++ { - job := &restoreSchemaJob{ - session: &DBConn{}, // just for testing - } - if i == jobsCount/2 { - job.database = "error" - } - err := jobQueue.push(job) - if err != nil { - runtimeErr := jobQueue.close() - if errors.ErrorEqual(err, context.Canceled) { - err = runtimeErr - } - return err - } - } - return jobQueue.close() - } - - injectErr := errors.New("random injected error") - cases := []struct { - ctx context.Context - jobsCount int - handler func(ctx context.Context, job *restoreSchemaJob) error - exceptedErr error - }{ - { - ctx: context.Background(), - jobsCount: 128, - handler: func(ctx context.Context, job *restoreSchemaJob) error { - if job.database == "error" { - return injectErr - } - return nil - }, - exceptedErr: injectErr, - }, - { - ctx: context.Background(), - jobsCount: 128, - handler: func(ctx context.Context, job *restoreSchemaJob) error { - return nil - }, - exceptedErr: nil, - }, - } - - for _, testcase := range cases { - err := procedure(testcase.ctx, testcase.jobsCount, testcase.handler) - c.Assert(err, Equals, testcase.exceptedErr) - } -}