Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
loader: revert #1544 to reduce risks (#1657)
Browse files Browse the repository at this point in the history
  • Loading branch information
lichunzhu authored May 8, 2021
1 parent 33832b6 commit 2707ad5
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 272 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ vendor
tidb-slow.log
/monitoring/dashboards/dm.json
/monitoring/rules/dm_worker.rules.yml
mysql.*.log
mysql.*.log
241 changes: 52 additions & 189 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"encoding/hex"
"fmt"
"io"
"io/ioutil"
"os"
Expand All @@ -26,8 +27,6 @@ import (
"sync"
"time"

"golang.org/x/sync/errgroup"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/dm/unit"
Expand Down Expand Up @@ -1185,246 +1184,110 @@ func fetchMatchedLiteral(ctx *tcontext.Context, router *router.Table, schema, ta
return targetSchema, targetTable
}

// `restore Schema Job` present a data structure of schema restoring job.
type restoreSchemaJob struct {
loader *Loader
session *DBConn
database string // database name
table string // table name, empty if it's a schema of database
filepath string // file path of dumpped schema file
}

// `jobQueue` of schema restoring which (only) support consumptions concurrently.
type jobQueue struct {
ctx context.Context
msgq chan *restoreSchemaJob // job message queue channel
consumerCount int // count of consumers
eg *errgroup.Group // err wait group of consumer's go-routines
}

// `newJobQueue` consturct a jobQueue.
func newJobQueue(ctx context.Context, consumerCount, length int) *jobQueue {
eg, selfCtx := errgroup.WithContext(ctx)
return &jobQueue{
ctx: selfCtx,
msgq: make(chan *restoreSchemaJob, length),
consumerCount: consumerCount,
eg: eg,
}
}
func (l *Loader) restoreData(ctx context.Context) error {
begin := time.Now()

// `push` will append a job to the queue.
func (q *jobQueue) push(job *restoreSchemaJob) error {
var err error
select {
case <-q.ctx.Done():
err = q.ctx.Err()
case q.msgq <- job:
baseConn, err := l.toDB.GetBaseConn(ctx)
if err != nil {
return err
}
return terror.WithScope(err, terror.ScopeInternal)
}

// `close` wait jobs done and close queue forever.
func (q *jobQueue) close() error {
// queue is closing
close(q.msgq)
// wait until go-routines of consumption was exited
return q.eg.Wait()
}
defer func() {
err2 := l.toDB.CloseBaseConn(baseConn)
if err2 != nil {
l.logger.Warn("fail to close connection", zap.Error(err2))
}
}()

// `startConsumers` run multiple go-routines of job consumption with user defined handler.
func (q *jobQueue) startConsumers(handler func(ctx context.Context, job *restoreSchemaJob) error) {
for i := 0; i < q.consumerCount; i++ {
q.eg.Go(func() error {
var session *DBConn
consumeLoop:
for {
select {
case <-q.ctx.Done():
err := q.ctx.Err()
return err
case job, active := <-q.msgq:
if !active {
break consumeLoop
}
// test condition for `job.session` means db session still could be controlled outside,
// it's used in unit test for now.
if session == nil && job.session == nil {
baseConn, err2 := job.loader.toDB.GetBaseConn(q.ctx)
if err2 != nil {
return err2
}
defer func(baseConn *conn.BaseConn) {
err2 := job.loader.toDB.CloseBaseConn(baseConn)
if err2 != nil {
job.loader.logger.Warn("fail to close connection", zap.Error(err2))
}
}(baseConn)
session = &DBConn{
cfg: job.loader.cfg,
baseConn: baseConn,
resetBaseConnFn: func(*tcontext.Context, *conn.BaseConn) (*conn.BaseConn, error) {
return nil, terror.ErrDBBadConn.Generate("bad connection error restoreData")
},
}
}
if job.session == nil {
job.session = session
}
err := handler(q.ctx, job)
if err != nil {
return err
}
}
}
return nil
})
dbConn := &DBConn{
cfg: l.cfg,
baseConn: baseConn,
resetBaseConnFn: func(*tcontext.Context, *conn.BaseConn) (*conn.BaseConn, error) {
return nil, terror.ErrDBBadConn.Generate("bad connection error restoreData")
},
}
}

func (l *Loader) restoreData(ctx context.Context) error {
begin := time.Now()
dispatchMap := make(map[string]*fileJob)
concurrency := l.cfg.PoolSize
// `for v := range map` would present random order
// `dbs` array keep same order for restore schema job generating
var err error

// restore db in sort
dbs := make([]string, 0, len(l.db2Tables))
for db := range l.db2Tables {
dbs = append(dbs, db)
}
tctx := tcontext.NewContext(ctx, l.logger)

// run consumers of restore database schema queue
dbRestoreQueue := newJobQueue(ctx, concurrency, concurrency /** length of queue */)
dbRestoreQueue.startConsumers(func(ctx context.Context, job *restoreSchemaJob) error {
// restore database schema
job.loader.logger.Info("start to create schema", zap.String("schema file", job.filepath))
err2 := job.loader.restoreSchema(ctx, job.session, job.filepath, job.database)
if err2 != nil {
return err2
}
job.loader.logger.Info("finish to create schema", zap.String("schema file", job.filepath))
return nil
})
tctx := tcontext.NewContext(ctx, l.logger)

// push database schema restoring jobs to the queue
for _, db := range dbs {
schemaFile := l.cfg.Dir + "/" + db + "-schema-create.sql" // cache friendly
err = dbRestoreQueue.push(&restoreSchemaJob{
loader: l,
database: db,
table: "",
filepath: schemaFile,
})
tables := l.db2Tables[db]

// create db
dbFile := fmt.Sprintf("%s/%s-schema-create.sql", l.cfg.Dir, db)
l.logger.Info("start to create schema", zap.String("schema file", dbFile))
err = l.restoreSchema(ctx, dbConn, dbFile, db)
if err != nil {
break
return err
}
}
l.logger.Info("finish to create schema", zap.String("schema file", dbFile))

// check producing error
if err != nil {
runtimeErr := dbRestoreQueue.close()
if errors.ErrorEqual(err, context.Canceled) {
err = runtimeErr
tnames := make([]string, 0, len(tables))
for t := range tables {
tnames = append(tnames, t)
}
return err
}
// wait whole task done & close queue
err = dbRestoreQueue.close()
if err != nil {
return err
}

// run consumers of restore table schema queue
tblRestoreQueue := newJobQueue(ctx, concurrency, concurrency /** length of queue */)
tblRestoreQueue.startConsumers(func(ctx context.Context, job *restoreSchemaJob) error {
job.loader.logger.Info("start to create table", zap.String("table file", job.filepath))
err2 := job.loader.restoreTable(ctx, job.session, job.filepath, job.database, job.table)
if err2 != nil {
return err2
}
job.loader.logger.Info("finish to create table", zap.String("table file", job.filepath))
return nil
})

// push table schema restoring jobs to the queue
tblSchemaLoop:
for _, db := range dbs {
for table := range l.db2Tables[db] {
schemaFile := l.cfg.Dir + "/" + db + "." + table + "-schema.sql" // cache friendly
for _, table := range tnames {
dataFiles := tables[table]
tableFile := l.cfg.Dir + "/" + db + "." + table + "-schema.sql" // cache friendly
if _, ok := l.tableInfos[tableName(db, table)]; !ok {
l.tableInfos[tableName(db, table)], err = parseTable(tctx, l.tableRouter, db, table, schemaFile, l.cfg.LoaderConfig.SQLMode)
l.tableInfos[tableName(db, table)], err = parseTable(tctx, l.tableRouter, db, table, tableFile, l.cfg.LoaderConfig.SQLMode)
if err != nil {
err = terror.Annotatef(err, "parse table %s/%s", db, table)
break tblSchemaLoop
return terror.Annotatef(err, "parse table %s/%s", db, table)
}
}

if l.checkPoint.IsTableFinished(db, table) {
l.logger.Info("table has finished, skip it.", zap.String("schema", db), zap.String("table", table))
continue
}
err = tblRestoreQueue.push(&restoreSchemaJob{
loader: l,
database: db,
table: table,
filepath: schemaFile,
})

// create table
l.logger.Info("start to create table", zap.String("table file", tableFile))
err := l.restoreTable(ctx, dbConn, tableFile, db, table)
if err != nil {
break tblSchemaLoop
return err
}
}
}
l.logger.Info("finish to create table", zap.String("table file", tableFile))

// check producing error
if err != nil {
runtimeErr := tblRestoreQueue.close()
if errors.ErrorEqual(err, context.Canceled) {
err = runtimeErr
}
return err
}
// wait whole task done & close queue
err = tblRestoreQueue.close()
if err != nil {
return err
}

// all schemas was restored
l.logger.Info("finish to create tables", zap.Duration("cost time", time.Since(begin)))

// generate restore table data file job
for _, db := range dbs {
table2DataFileMap := l.db2Tables[db]
for table := range table2DataFileMap {
restoringFiles := l.checkPoint.GetRestoringFileInfo(db, table)
l.logger.Debug("restoring table data", zap.String("schema", db), zap.String("table", table), zap.Reflect("data files", restoringFiles))

for _, file := range table2DataFileMap[table] {
info := l.tableInfos[tableName(db, table)]
for _, file := range dataFiles {
select {
case <-ctx.Done():
l.logger.Warn("stop generate data file job", log.ShortError(ctx.Err()))
return ctx.Err()
default:
// do nothing
}

l.logger.Debug("dispatch data file", zap.String("schema", db), zap.String("table", table), zap.String("data file", file))

offset := int64(uninitializedOffset)
posSet, ok := restoringFiles[file]
if ok {
offset = posSet[0]
}
dispatchMap[db+"_"+table+"_"+file] = &fileJob{

j := &fileJob{
schema: db,
table: table,
dataFile: file,
offset: offset,
info: l.tableInfos[tableName(db, table)],
info: info,
}
dispatchMap[fmt.Sprintf("%s_%s_%s", db, table, file)] = j
}
}
}
l.logger.Info("finish to create tables", zap.Duration("cost time", time.Since(begin)))

// a simple and naive approach to dispatch files randomly based on the feature of golang map(range by random)
for _, j := range dispatchMap {
Expand Down
Loading

0 comments on commit 2707ad5

Please sign in to comment.