diff --git a/dm/worker/log.go b/dm/worker/log.go index b0919bd5b7..aa77f5ee9f 100644 --- a/dm/worker/log.go +++ b/dm/worker/log.go @@ -22,15 +22,17 @@ import ( "sync/atomic" "time" - "github.com/pingcap/dm/dm/pb" - "github.com/pingcap/dm/pkg/log" "github.com/pingcap/errors" "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/util" + + "github.com/pingcap/dm/dm/pb" + "github.com/pingcap/dm/pkg/log" ) -// ErrInValidHandler indicates we meet an invalid Putter/Getter/Deleter +// ErrInValidHandler indicates we meet an invalid dbOperator. var ErrInValidHandler = errors.New("handler is nil, please pass a leveldb.DB or leveldb.Transaction") var ( @@ -40,19 +42,14 @@ var ( GCInterval = time.Hour ) -// Putter is interface which has Put method -type Putter interface { +// dbOperator is an interface which used to do Get/Put/Delete operation on levelDB. +// It often can be an instance of leveldb.DB or leveldb.Transaction. +type dbOperator interface { + Get(key []byte, ro *opt.ReadOptions) ([]byte, error) Put(key, value []byte, opts *opt.WriteOptions) error -} - -// Deleter is interface which has Delete method -type Deleter interface { Delete(key []byte, wo *opt.WriteOptions) error -} - -// Getter is interface which has Get method -type Getter interface { - Get(key []byte, ro *opt.ReadOptions) ([]byte, error) + Write(batch *leveldb.Batch, wo *opt.WriteOptions) error + NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator } // HandledPointerKey is key of HandledPointer which point to the last handled log @@ -66,7 +63,7 @@ type Pointer struct { // MarshalBinary never return not nil err now func (p *Pointer) MarshalBinary() ([]byte, error) { data := make([]byte, 8) - binary.LittleEndian.PutUint64(data, uint64(p.Location)) + binary.BigEndian.PutUint64(data, uint64(p.Location)) return data, nil } @@ -77,18 +74,18 @@ func (p *Pointer) UnmarshalBinary(data []byte) error { return errors.Errorf("not valid length data as pointer % X", data) } - p.Location = int64(binary.LittleEndian.Uint64(data)) + p.Location = int64(binary.BigEndian.Uint64(data)) return nil } // LoadHandledPointer loads handled pointer value from kv DB -func LoadHandledPointer(db *leveldb.DB) (Pointer, error) { +func LoadHandledPointer(h dbOperator) (Pointer, error) { var p Pointer - if db == nil { + if whetherNil(h) { return p, errors.Trace(ErrInValidHandler) } - value, err := db.Get(HandledPointerKey, nil) + value, err := h.Get(HandledPointerKey, nil) if err != nil { // return zero value when not found if err == leveldb.ErrNotFound { @@ -106,6 +103,16 @@ func LoadHandledPointer(db *leveldb.DB) (Pointer, error) { return p, nil } +// ClearHandledPointer clears the handled pointer in kv DB. +func ClearHandledPointer(h dbOperator) error { + if whetherNil(h) { + return errors.Trace(ErrInValidHandler) + } + + err := h.Delete(HandledPointerKey, nil) + return errors.Annotate(err, "clear handled pointer") +} + var ( defaultGCForwardLog int64 = 10000 @@ -119,14 +126,14 @@ func DecodeTaskLogKey(key []byte) (int64, error) { return 0, errors.Errorf("not valid length data as task log key % X", key) } - return int64(binary.LittleEndian.Uint64(key[len(TaskLogPrefix):])), nil + return int64(binary.BigEndian.Uint64(key[len(TaskLogPrefix):])), nil } // EncodeTaskLogKey encodes log ID into a task log key func EncodeTaskLogKey(id int64) []byte { key := make([]byte, 8+len(TaskLogPrefix)) copy(key[:len(TaskLogPrefix)], TaskLogPrefix) - binary.LittleEndian.PutUint64(key[len(TaskLogPrefix):], uint64(id)) + binary.BigEndian.PutUint64(key[len(TaskLogPrefix):], uint64(id)) return key } @@ -138,12 +145,12 @@ type Logger struct { } // Initial initials Logger -func (logger *Logger) Initial(db *leveldb.DB) ([]*pb.TaskLog, error) { - if db == nil { +func (logger *Logger) Initial(h dbOperator) ([]*pb.TaskLog, error) { + if whetherNil(h) { return nil, errors.Trace(ErrInValidHandler) } - handledPointer, err := LoadHandledPointer(db) + handledPointer, err := LoadHandledPointer(h) if err != nil { return nil, errors.Trace(err) } @@ -154,7 +161,7 @@ func (logger *Logger) Initial(db *leveldb.DB) ([]*pb.TaskLog, error) { } logs = make([]*pb.TaskLog, 0, 4) ) - iter := db.NewIterator(util.BytesPrefix(TaskLogPrefix), nil) + iter := h.NewIterator(util.BytesPrefix(TaskLogPrefix), nil) startLocation := handledPointer.Location + 1 for ok := iter.Seek(EncodeTaskLogKey(startLocation)); ok; ok = iter.Next() { logBytes := iter.Value() @@ -170,7 +177,8 @@ func (logger *Logger) Initial(db *leveldb.DB) ([]*pb.TaskLog, error) { endPointer.Location = opLog.Id + 1 logs = append(logs, opLog) } else { - panic(fmt.Sprintf("out of sorted order from level db for task log key % X (log ID %d)", iter.Key(), opLog.Id)) + panic(fmt.Sprintf("out of sorted order from level db for task log key % X (log ID %d), start location %d, end location %d", + iter.Key(), opLog.Id, startLocation, endPointer.Location)) } } iter.Release() @@ -180,7 +188,7 @@ func (logger *Logger) Initial(db *leveldb.DB) ([]*pb.TaskLog, error) { err = iter.Error() if err != nil { - return nil, errors.Annotatef(err, "fetch logs from meta") + return nil, errors.Annotatef(err, "fetch logs from meta with handle pointer %+v", handledPointer) } logger.handledPointer = handledPointer @@ -192,7 +200,7 @@ func (logger *Logger) Initial(db *leveldb.DB) ([]*pb.TaskLog, error) { } // GetTaskLog returns task log by given log ID -func (logger *Logger) GetTaskLog(h Getter, id int64) (*pb.TaskLog, error) { +func (logger *Logger) GetTaskLog(h dbOperator, id int64) (*pb.TaskLog, error) { if whetherNil(h) { return nil, errors.Trace(ErrInValidHandler) } @@ -216,8 +224,8 @@ func (logger *Logger) GetTaskLog(h Getter, id int64) (*pb.TaskLog, error) { // ForwardTo forward handled pointer to specified ID location // not thread safe -func (logger *Logger) ForwardTo(db Putter, ID int64) error { - if whetherNil(db) { +func (logger *Logger) ForwardTo(h dbOperator, ID int64) error { + if whetherNil(h) { return errors.Trace(ErrInValidHandler) } @@ -227,7 +235,7 @@ func (logger *Logger) ForwardTo(db Putter, ID int64) error { handledPointerBytes, _ := handledPointer.MarshalBinary() - err := db.Put(HandledPointerKey, handledPointerBytes, nil) + err := h.Put(HandledPointerKey, handledPointerBytes, nil) if err != nil { return errors.Annotatef(err, "forward handled pointer to %d", ID) } @@ -237,8 +245,8 @@ func (logger *Logger) ForwardTo(db Putter, ID int64) error { } // MarkAndForwardLog marks result sucess or not in log, and forwards handledPointer -func (logger *Logger) MarkAndForwardLog(db Putter, opLog *pb.TaskLog) error { - if whetherNil(db) { +func (logger *Logger) MarkAndForwardLog(h dbOperator, opLog *pb.TaskLog) error { + if whetherNil(h) { return errors.Trace(ErrInValidHandler) } @@ -247,17 +255,17 @@ func (logger *Logger) MarkAndForwardLog(db Putter, opLog *pb.TaskLog) error { return errors.Annotatef(err, "marshal task log %+v", opLog) } - err = db.Put(EncodeTaskLogKey(opLog.Id), logBytes, nil) + err = h.Put(EncodeTaskLogKey(opLog.Id), logBytes, nil) if err != nil { return errors.Annotatef(err, "save task log %d", opLog.Id) } - return errors.Trace(logger.ForwardTo(db, opLog.Id)) + return errors.Trace(logger.ForwardTo(h, opLog.Id)) } // Append appends a task log -func (logger *Logger) Append(db Putter, opLog *pb.TaskLog) error { - if whetherNil(db) { +func (logger *Logger) Append(h dbOperator, opLog *pb.TaskLog) error { + if whetherNil(h) { return errors.Trace(ErrInValidHandler) } @@ -276,7 +284,7 @@ func (logger *Logger) Append(db Putter, opLog *pb.TaskLog) error { return errors.Annotatef(err, "marshal task log %+v", opLog) } - err = db.Put(EncodeTaskLogKey(id), logBytes, nil) + err = h.Put(EncodeTaskLogKey(id), logBytes, nil) if err != nil { return errors.Annotatef(err, "save task log %+v", opLog) } @@ -285,7 +293,7 @@ func (logger *Logger) Append(db Putter, opLog *pb.TaskLog) error { } // GC deletes useless log -func (logger *Logger) GC(ctx context.Context, db *leveldb.DB) { +func (logger *Logger) GC(ctx context.Context, h dbOperator) { ticker := time.NewTicker(GCInterval) defer ticker.Stop() for { @@ -299,13 +307,13 @@ func (logger *Logger) GC(ctx context.Context, db *leveldb.DB) { if handledPointerLocaltion > defaultGCForwardLog { gcID = handledPointerLocaltion - defaultGCForwardLog } - logger.doGC(db, gcID) + logger.doGC(h, gcID) } } } -func (logger *Logger) doGC(db *leveldb.DB, id int64) { - if db == nil { +func (logger *Logger) doGC(h dbOperator, id int64) { + if whetherNil(h) { log.Error(ErrInValidHandler) return } @@ -315,7 +323,7 @@ func (logger *Logger) doGC(db *leveldb.DB, id int64) { irange := &util.Range{ Start: EncodeTaskLogKey(0), } - iter := db.NewIterator(irange, nil) + iter := h.NewIterator(irange, nil) batch := new(leveldb.Batch) for iter.Next() { if bytes.Compare(endKey, iter.Key()) <= 0 { @@ -328,9 +336,9 @@ func (logger *Logger) doGC(db *leveldb.DB, id int64) { batch.Delete(iter.Key()) if batch.Len() == GCBatchSize { - err := db.Write(batch, nil) + err := h.Write(batch, nil) if err != nil { - log.Errorf("[task log gc] fail to delete keys from kv db %v", err) + log.Errorf("[task log gc] fail to delete keys from kv db %v until %s(% X)", err, iter.Key(), iter.Key()) } log.Infof("[task log gc] delete range [%s(% X), %s(% X)]", firstKey, firstKey, iter.Key(), iter.Key()) firstKey = firstKey[:0] @@ -340,18 +348,24 @@ func (logger *Logger) doGC(db *leveldb.DB, id int64) { iter.Release() err := iter.Error() if err != nil { - log.Errorf("[task log gc] query logs from meta error %v", err) + log.Errorf("[task log gc] query logs from meta error %v in range [%s(% X), %s(% X))", + err, firstKey, firstKey, endKey, endKey) } if batch.Len() > 0 { log.Infof("[task log gc] delete range [%s(% X), %s(% X))", firstKey, firstKey, endKey, endKey) - err := db.Write(batch, nil) + err := h.Write(batch, nil) if err != nil { log.Errorf("[task log gc] fail to delete keys from kv db %v", err) } } } +// ClearOperationLog clears the task operation log. +func ClearOperationLog(h dbOperator) error { + return errors.Annotate(clearByPrefix(h, TaskLogPrefix), "clear task operation log") +} + // **************** task meta oepration *************** // // TaskMetaPrefix is prefix of task meta key @@ -369,8 +383,8 @@ func EncodeTaskMetaKey(name string) []byte { } // LoadTaskMetas loads all task metas from kv db -func LoadTaskMetas(db *leveldb.DB) (map[string]*pb.TaskMeta, error) { - if db == nil { +func LoadTaskMetas(h dbOperator) (map[string]*pb.TaskMeta, error) { + if whetherNil(h) { return nil, errors.Trace(ErrInValidHandler) } @@ -379,7 +393,7 @@ func LoadTaskMetas(db *leveldb.DB) (map[string]*pb.TaskMeta, error) { err error ) - iter := db.NewIterator(util.BytesPrefix(TaskMetaPrefix), nil) + iter := h.NewIterator(util.BytesPrefix(TaskMetaPrefix), nil) for iter.Next() { taskBytes := iter.Value() task := &pb.TaskMeta{} @@ -398,14 +412,14 @@ func LoadTaskMetas(db *leveldb.DB) (map[string]*pb.TaskMeta, error) { err = iter.Error() if err != nil { - return nil, errors.Annotatef(err, "fetch tasks from meta") + return nil, errors.Annotatef(err, "fetch tasks from meta with prefix % X", TaskMetaPrefix) } return tasks, nil } // SetTaskMeta saves task meta into kv db -func SetTaskMeta(h Putter, task *pb.TaskMeta) error { +func SetTaskMeta(h dbOperator, task *pb.TaskMeta) error { if whetherNil(h) { return errors.Trace(ErrInValidHandler) } @@ -429,7 +443,7 @@ func SetTaskMeta(h Putter, task *pb.TaskMeta) error { } // GetTaskMeta returns task meta by given name -func GetTaskMeta(h Getter, name string) (*pb.TaskMeta, error) { +func GetTaskMeta(h dbOperator, name string) (*pb.TaskMeta, error) { if whetherNil(h) { return nil, errors.Trace(ErrInValidHandler) } @@ -449,7 +463,7 @@ func GetTaskMeta(h Getter, name string) (*pb.TaskMeta, error) { } // DeleteTaskMeta delete task meta from kv DB -func DeleteTaskMeta(h Deleter, name string) error { +func DeleteTaskMeta(h dbOperator, name string) error { if whetherNil(h) { return errors.Trace(ErrInValidHandler) } @@ -462,6 +476,11 @@ func DeleteTaskMeta(h Deleter, name string) error { return nil } +// ClearTaskMeta clears all task meta in kv DB. +func ClearTaskMeta(h dbOperator) error { + return errors.Annotate(clearByPrefix(h, TaskMetaPrefix), "clear task meta") +} + // VerifyTaskMeta verify legality of take meta func VerifyTaskMeta(task *pb.TaskMeta) error { if task == nil { @@ -505,3 +524,36 @@ func CloneTaskLog(log *pb.TaskLog) *pb.TaskLog { func whetherNil(handler interface{}) bool { return handler == nil || reflect.ValueOf(handler).IsNil() } + +// clearByPrefix clears all keys with the specified prefix. +func clearByPrefix(h dbOperator, prefix []byte) error { + if whetherNil(h) { + return errors.Trace(ErrInValidHandler) + } + + var err error + iter := h.NewIterator(util.BytesPrefix(prefix), nil) + batch := new(leveldb.Batch) + for iter.Next() { + batch.Delete(iter.Key()) + if batch.Len() >= GCBatchSize { + err = h.Write(batch, nil) + if err != nil { + iter.Release() + return errors.Annotatef(err, "delete kv with prefix % X until % X", prefix, iter.Key()) + } + log.Infof("[worker log] delete kv with prefix % X until % X", prefix, iter.Key()) + batch.Reset() + } + } + iter.Release() + err = iter.Error() + if err != nil { + return errors.Annotatef(err, "iterate kv with prefix % X", prefix) + } + + if batch.Len() > 0 { + err = h.Write(batch, nil) + } + return errors.Annotatef(err, "clear kv with prefix % X", prefix) +} diff --git a/dm/worker/log_test.go b/dm/worker/log_test.go index ae30cf4495..1bf8199126 100644 --- a/dm/worker/log_test.go +++ b/dm/worker/log_test.go @@ -14,7 +14,12 @@ package worker import ( + "bytes" + . "github.com/pingcap/check" + "github.com/pingcap/errors" + "github.com/syndtr/goleveldb/leveldb" + "github.com/pingcap/dm/dm/pb" ) @@ -35,7 +40,7 @@ func (t *testLog) TestPointer(c *C) { c.Assert(np.UnmarshalBinary([]byte("xx")), NotNil) } -func (t *testLog) TestLoadHandledPointer(c *C) { +func (t *testLog) TestHandledPointer(c *C) { p, err := LoadHandledPointer(nil) c.Assert(err, Equals, ErrInValidHandler) c.Assert(p.Location, Equals, int64(0)) @@ -59,6 +64,20 @@ func (t *testLog) TestLoadHandledPointer(c *C) { c.Assert(db.Put(HandledPointerKey, []byte("xx"), nil), IsNil) _, err = LoadHandledPointer(db) c.Assert(err, ErrorMatches, ".*not valid length data as.*") + + // clear the handled pointer + txn, err := db.OpenTransaction() + c.Assert(err, IsNil) + c.Assert(ClearHandledPointer(txn), IsNil) + c.Assert(txn.Commit(), IsNil) + + // try load handled pointer again + p, err = LoadHandledPointer(db) + c.Assert(err, IsNil) + c.Assert(p.Location, Equals, int64(0)) + + // clear with nil txn + c.Assert(errors.Cause(ClearHandledPointer(nil)), Equals, ErrInValidHandler) } func (t *testLog) TestTaskLogKey(c *C) { @@ -69,6 +88,11 @@ func (t *testLog) TestTaskLogKey(c *C) { _, err = DecodeTaskLogKey([]byte("xx")) c.Assert(err, ErrorMatches, ".*not valid length data as.*") + + // test compare + b23 := EncodeTaskLogKey(23) + b534 := EncodeTaskLogKey(534) + c.Assert(bytes.Compare(b23, b534), Less, 0) } func (t *testLog) TestTaskLog(c *C) { @@ -188,6 +212,14 @@ func (t *testLog) TestTaskLog(c *C) { c.Assert(logs, DeepEquals, []*pb.TaskLog{taskLog3}) c.Assert(logger.handledPointer.Location, Equals, int64(2)) c.Assert(logger.endPointer.Location, Equals, int64(4)) + + // clear operation log + c.Assert(ClearOperationLog(db), IsNil) + + // try initial again + logs, err = logger.Initial(db) + c.Assert(err, IsNil) + c.Assert(logs, HasLen, 0) } func (t *testLog) TestTaskLogGC(c *C) { @@ -310,4 +342,24 @@ func (t *testLog) TestTaskMeta(c *C) { c.Assert(err, NotNil) t2, err = GetTaskMeta(db, "task2") c.Assert(err, NotNil) + + // add some task meta again + c.Assert(SetTaskMeta(db, testTask1Meta), IsNil) + c.Assert(SetTaskMeta(db, testTask2Meta), IsNil) + c.Assert(SetTaskMeta(db, testTask3Meta), IsNil) + + // clear task meta + GCBatchSize = 2 // < 3 + c.Assert(ClearTaskMeta(db), IsNil) + + // try to get task meta back + _, err = GetTaskMeta(db, "task1") + c.Assert(errors.Cause(err), Equals, leveldb.ErrNotFound) + _, err = GetTaskMeta(db, "task2") + c.Assert(errors.Cause(err), Equals, leveldb.ErrNotFound) + _, err = GetTaskMeta(db, "task3") + c.Assert(errors.Cause(err), Equals, leveldb.ErrNotFound) + + // clear with nil txn + c.Assert(errors.Cause(ClearTaskMeta(nil)), Equals, ErrInValidHandler) } diff --git a/dm/worker/meta_test.go b/dm/worker/meta_test.go index 5a1867d9cf..e132d68d48 100644 --- a/dm/worker/meta_test.go +++ b/dm/worker/meta_test.go @@ -39,6 +39,13 @@ var ( } testTask2Meta *pb.TaskMeta testTask2Bytes []byte + + testTask3 = &config.SubTaskConfig{ + Name: "task3", + SourceID: "replica-1", + } + testTask3Meta *pb.TaskMeta + testTask3Bytes []byte ) type testMeta struct{} @@ -69,6 +76,16 @@ func testSetUpDB(c *C) (*leveldb.DB, string) { Task: testTask2Bytes, } + testTask3Str, err := testTask3.Toml() + c.Assert(err, IsNil) + testTask3Bytes = []byte(testTask3Str) + testTask3Meta = &pb.TaskMeta{ + Op: pb.TaskOp_Start, + Name: testTask3.Name, + Stage: pb.Stage_New, + Task: testTask3Bytes, + } + dir := c.MkDir() dbDir := path.Join(dir, "kv") db, err := openDB(dbDir, defaultKVConfig) diff --git a/dm/worker/upgrade.go b/dm/worker/upgrade.go new file mode 100644 index 0000000000..0f567962db --- /dev/null +++ b/dm/worker/upgrade.go @@ -0,0 +1,211 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package worker + +import ( + "encoding/json" + "os" + + "github.com/pingcap/errors" + "github.com/syndtr/goleveldb/leveldb" + + "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/pkg/utils" +) + +const ( + // The current internal version number of DM-worker used when upgrading from an older version, and it's different from the release version. + // NOTE: +1 when an incompatible problem is introduced. + currentWorkerInternalNo uint64 = 1 +) + +var ( + // The key used when saving the version of DM-worker + dmWorkerVersionKey = []byte("!DM-worker!version") + // The current version of DM-worker. + currentWorkerVersion = newVersion(currentWorkerInternalNo, utils.ReleaseVersion) + // The default previous version of DM-worker if no valid version exists in DB before the upgrade. + defaultPreviousWorkerVersion = newVersion(0, "None") + // all versions exists in the history. + workerVersion1 = newVersion(1, "v1.0.0-alpha") +) + +// The version of DM-worker used when upgrading from an older version. +type version struct { + InternalNo uint64 `json:"internal-no"` // internal version number + ReleaseVersion string `json:"release-version"` // release version, like `v1.0.0` +} + +// newVersion creates a new instance of version. +func newVersion(internalNo uint64, releaseVersion string) version { + return version{ + InternalNo: internalNo, + ReleaseVersion: releaseVersion, + } +} + +// compare compares the version with another version. +// NOTE: also compare `ReleaseVersion` when needed. +func (v *version) compare(other version) int { + if v.InternalNo < other.InternalNo { + return -1 + } else if v.InternalNo == other.InternalNo { + return 0 + } + return 1 +} + +// String implements Stringer.String. +func (v version) String() string { + data, err := v.MarshalBinary() + if err != nil { + log.Errorf("[worker upgrade] marshal version (internal-no: %d, release-version: %s) to binary error %v", + v.InternalNo, v.ReleaseVersion, err) + return "" + } + return string(data) +} + +// MarshalBinary implements encoding.BinaryMarshal. +func (v *version) MarshalBinary() ([]byte, error) { + return json.Marshal(v) +} + +// UnmarshalBinary implements encoding.BinaryMarshal. +func (v *version) UnmarshalBinary(data []byte) error { + return json.Unmarshal(data, v) +} + +// loadVersion loads the version of DM-worker from the levelDB. +func loadVersion(h dbOperator) (ver version, err error) { + if whetherNil(h) { + return ver, errors.Trace(ErrInValidHandler) + } + + data, err := h.Get(dmWorkerVersionKey, nil) + if err != nil { + if err == leveldb.ErrNotFound { + log.Warnf("[worker upgrade] no version found in levelDB, default %s used", defaultPreviousWorkerVersion) + return defaultPreviousWorkerVersion, nil + } + return ver, errors.Annotatef(err, "load version with key %v from levelDB", dmWorkerVersionKey) + } + err = ver.UnmarshalBinary(data) + return ver, errors.Annotatef(err, "unmarshal version from data % X", data) +} + +// saveVersion saves the version of DM-worker into the levelDB. +func saveVersion(h dbOperator, ver version) error { + if whetherNil(h) { + return errors.Trace(ErrInValidHandler) + } + + data, err := ver.MarshalBinary() + if err != nil { + return errors.Annotatef(err, "marshal version %s to binary data", ver) + } + + err = h.Put(dmWorkerVersionKey, data, nil) + return errors.Annotatef(err, "save version %v into levelDB with key %v", ver, dmWorkerVersionKey) +} + +// tryUpgrade tries to upgrade from an older version. +func tryUpgrade(dbDir string) error { + // 1. check the DB directory + notExist := false + fs, err := os.Stat(dbDir) + if err != nil { + if os.IsNotExist(err) { + notExist = true + } else { + return errors.Annotatef(err, "get stat for %s", dbDir) + } + } else if !fs.IsDir() { // should be a directory + return errors.NotValidf("directory %s for DB", dbDir) + } + + // 2. open the kv DB + db, err := openDB(dbDir, defaultKVConfig) + if err != nil { + return errors.Annotatef(err, "open DB for %s", dbDir) + } + defer func() { + err = db.Close() + if err != nil { + log.Errorf("[worker upgrade] close DB fail %v", err) + } + }() + + if notExist { + log.Infof("[worker upgrade] no previous operation log exists, no need to upgrade") + // still need to save the current version version + currVer := currentWorkerVersion + err = saveVersion(db, currVer) + return errors.Annotatef(err, "save current version %s into DB %s", currVer, dbDir) + } + + // 3. load previous version + prevVer, err := loadVersion(db) + if err != nil { + return errors.Annotatef(err, "load previous version from DB %s", dbDir) + } + log.Infof("[worker upgrade] the previous version is %s", prevVer) + + // 4. check needing to upgrade + currVer := currentWorkerVersion + if prevVer.compare(currVer) == 0 { + log.Infof("[worker upgrade] the previous and current versions both are %s, no need to upgrade", prevVer) + return nil + } else if prevVer.compare(currVer) > 0 { + return errors.Errorf("the previous version %s is newer than current %s, automatic downgrade is not supported now, please handle it manually", prevVer, currVer) + } + + // 5. upgrade from previous version to +1, +2, ... + if prevVer.compare(workerVersion1) < 0 { + err = upgradeToVer1(db) + if err != nil { + return errors.Annotatef(err, "upgrade to version %s", workerVersion1) + } + } + + // 6. save current version after upgrade done + err = saveVersion(db, currVer) + return errors.Annotatef(err, "save current version %s into DB %s", currVer, dbDir) +} + +// upgradeToVer1 upgrades from version 0 to version 1. +// before this version, we use `LittleEndian` to encode/decode operation log ID, but it's not correct when scanning operation log by log ID. +// so, if upgrading from previous version to this one, we need to: +// 1. remove all operation log in the levelDB +// 2. reset handled pointer +// 3. remove all task meta in the levelDB +// and let user to restart all necessary tasks. +func upgradeToVer1(db *leveldb.DB) error { + log.Infof("[worker upgrade] upgrading to version %s", workerVersion1) + err := ClearOperationLog(db) + if err != nil { + return errors.Annotatef(err, "upgrade to version %s", workerVersion1) + } + err = ClearHandledPointer(db) + if err != nil { + return errors.Annotatef(err, "upgrade to version %s", workerVersion1) + } + err = ClearTaskMeta(db) + if err != nil { + return errors.Annotatef(err, "upgrade to version %s", workerVersion1) + } + + log.Warnf("[worker upgrade] upgraded to version %s, please restart all necessary tasks manually", workerVersion1) + return nil +} diff --git a/dm/worker/upgrade_test.go b/dm/worker/upgrade_test.go new file mode 100644 index 0000000000..fd9469471c --- /dev/null +++ b/dm/worker/upgrade_test.go @@ -0,0 +1,219 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package worker + +import ( + "fmt" + "io/ioutil" + "os" + "path" + "path/filepath" + + . "github.com/pingcap/check" + "github.com/pingcap/errors" + "github.com/syndtr/goleveldb/leveldb" + + "github.com/pingcap/dm/dm/pb" + "github.com/pingcap/dm/pkg/utils" +) + +type testUpgrade struct{} + +var _ = Suite(&testUpgrade{}) + +func (t *testUpgrade) TestIntervalVersion(c *C) { + currVer := currentWorkerVersion + c.Assert(currVer.InternalNo, Equals, currentWorkerInternalNo) + c.Assert(currVer.ReleaseVersion, Equals, utils.ReleaseVersion) + c.Assert(currVer.String(), Matches, fmt.Sprintf(".*%d.*", currentWorkerInternalNo)) + c.Assert(currVer.String(), Matches, fmt.Sprintf(".*%s.*", utils.ReleaseVersion)) + + // marshal and unmarshal + data, err := currVer.MarshalBinary() + c.Assert(err, IsNil) + var currVer2 version + c.Assert(currVer2.UnmarshalBinary(data), IsNil) + c.Assert(currVer2, DeepEquals, currVer) + + // compare by internal version number. + c.Assert(currVer.compare(newVersion(currentWorkerInternalNo-1, utils.ReleaseVersion)), Equals, 1) + c.Assert(currVer.compare(newVersion(currentWorkerInternalNo, utils.ReleaseVersion)), Equals, 0) + c.Assert(currVer.compare(newVersion(currentWorkerInternalNo+1, utils.ReleaseVersion)), Equals, -1) +} + +func (t *testUpgrade) openTestDB(c *C, dbDir string) *leveldb.DB { + if dbDir == "" { + dbDir = path.Join(c.MkDir(), "kv") + } + db, err := openDB(dbDir, defaultKVConfig) + if err != nil { + c.Fatalf("fail to open leveldb %v", err) + } + return db +} + +func (t *testUpgrade) TestLoadSaveInternalVersion(c *C) { + var ( + db *leveldb.DB + ver1234 = newVersion(1234, "v1.0.0") + ) + + // load with nil DB + _, err := loadVersion(nil) + c.Assert(errors.Cause(err), Equals, ErrInValidHandler) + _, err = loadVersion(db) + c.Assert(errors.Cause(err), Equals, ErrInValidHandler) + + // save with nil DB + err = saveVersion(nil, ver1234) + c.Assert(errors.Cause(err), Equals, ErrInValidHandler) + err = saveVersion(db, ver1234) + c.Assert(errors.Cause(err), Equals, ErrInValidHandler) + + // open DB + db = t.openTestDB(c, "") + defer db.Close() + + // load but no data exist + verLoad, err := loadVersion(db) + c.Assert(err, IsNil) + c.Assert(verLoad, DeepEquals, defaultPreviousWorkerVersion) + + // save into DB + err = saveVersion(db, ver1234) + c.Assert(err, IsNil) + + // load back + verLoad, err = loadVersion(db) + c.Assert(err, IsNil) + c.Assert(verLoad, DeepEquals, ver1234) +} + +func (t *testUpgrade) TestTryUpgrade(c *C) { + // DB directory not exists, no need to upgrade + dbDir := "./path-not-exists" + err := tryUpgrade(dbDir) + c.Assert(err, IsNil) + c.Assert(os.RemoveAll(dbDir), IsNil) + + // DB directory is a file path, invalid + tDir := c.MkDir() + dbDir = filepath.Join(tDir, "file-not-dir") + err = ioutil.WriteFile(dbDir, nil, 0600) + c.Assert(err, IsNil) + err = tryUpgrade(dbDir) + c.Assert(err, ErrorMatches, ".*directory.*for DB.*") + + // valid DB directory + dbDir = tDir + + // previousVer == currentVer, no need to upgrade + prevVer := currentWorkerVersion + t.verifyUpgrade(c, dbDir, + func() { + t.saveVerToDB(c, dbDir, prevVer) + }, func() { + currVer := t.loadVerFromDB(c, dbDir) + c.Assert(currVer, DeepEquals, prevVer) + }) + + // previousVer > currentVer, no need to upgrade, and can not automatic downgrade now + prevVer = newVersion(currentWorkerInternalNo+1, currentWorkerVersion.ReleaseVersion) + t.saveVerToDB(c, dbDir, prevVer) + c.Assert(tryUpgrade(dbDir), ErrorMatches, ".*automatic downgrade is not supported now, please handle it manually") + c.Assert(t.loadVerFromDB(c, dbDir), DeepEquals, prevVer) +} + +func (t *testUpgrade) TestUpgradeToVer1(c *C) { + dbDir := c.MkDir() + t.verifyUpgrade(c, dbDir, + func() { + t.prepareBeforeUpgradeVer1(c, dbDir) + }, func() { + t.verifyAfterUpgradeVer1(c, dbDir) + }) +} + +func (t *testUpgrade) prepareBeforeUpgradeVer1(c *C, dbDir string) { + db := t.openTestDB(c, dbDir) + defer db.Close() + + // 1. add some operation log into levelDB and set handled pointer + logger := new(Logger) + c.Assert(logger.MarkAndForwardLog(db, &pb.TaskLog{ + Id: 100, + Task: testTask1Meta, + }), IsNil) + c.Assert(logger.MarkAndForwardLog(db, &pb.TaskLog{ + Id: 200, + Task: testTask2Meta, + }), IsNil) + c.Assert(logger.MarkAndForwardLog(db, &pb.TaskLog{ + Id: 300, + Task: testTask3Meta, + }), IsNil) + c.Assert(logger.handledPointer.Location, Equals, int64(300)) + c.Assert(logger.endPointer.Location, Equals, int64(0)) + + // 2. add some task meta into levelDB + c.Assert(SetTaskMeta(db, testTask1Meta), IsNil) + c.Assert(SetTaskMeta(db, testTask2Meta), IsNil) + t1, err := GetTaskMeta(db, "task1") + c.Assert(err, IsNil) + c.Assert(t1, DeepEquals, testTask1Meta) + t2, err := GetTaskMeta(db, "task2") + c.Assert(err, IsNil) + c.Assert(t2, DeepEquals, testTask2Meta) +} + +func (t *testUpgrade) verifyAfterUpgradeVer1(c *C, dbDir string) { + db := t.openTestDB(c, dbDir) + defer db.Close() + + // 1. verify operation log and handled pointer + logger := new(Logger) + logs, err := logger.Initial(db) + c.Assert(err, IsNil) + c.Assert(logs, HasLen, 0) + c.Assert(logger.handledPointer.Location, Equals, int64(0)) + c.Assert(logger.endPointer.Location, Equals, int64(1)) + + // 2. verify task meta + _, err = GetTaskMeta(db, "task1") + c.Assert(errors.Cause(err), Equals, leveldb.ErrNotFound) + _, err = GetTaskMeta(db, "task2") + c.Assert(errors.Cause(err), Equals, leveldb.ErrNotFound) +} + +func (t *testUpgrade) saveVerToDB(c *C, dbDir string, ver version) { + db := t.openTestDB(c, dbDir) + defer db.Close() + err := saveVersion(db, ver) + c.Assert(err, IsNil) +} + +func (t *testUpgrade) loadVerFromDB(c *C, dbDir string) version { + db := t.openTestDB(c, dbDir) + defer db.Close() + ver, err := loadVersion(db) + c.Assert(err, IsNil) + return ver +} + +func (t *testUpgrade) verifyUpgrade(c *C, dir string, before func(), after func()) { + before() + err := tryUpgrade(dir) + c.Assert(err, IsNil) + after() +} diff --git a/dm/worker/worker.go b/dm/worker/worker.go index c0cd1a01a2..5ed672ba9a 100644 --- a/dm/worker/worker.go +++ b/dm/worker/worker.go @@ -80,8 +80,14 @@ func NewWorker(cfg *Config) (*Worker, error) { } w.relayPurger = purger - // open kv db + // try upgrade from an older version dbDir := path.Join(w.cfg.MetaDir, "kv") + err = tryUpgrade(dbDir) + if err != nil { + return nil, errors.Annotatef(err, "try to upgrade from any older version to %s", currentWorkerVersion) + } + + // open kv db w.db, err = openDB(dbDir, defaultKVConfig) if err != nil { return nil, errors.Trace(err) diff --git a/go.mod b/go.mod index 65ee82e213..ff51bdee2f 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,6 @@ require ( github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef // indirect github.com/golang/mock v1.3.1 github.com/golang/protobuf v1.3.1 - github.com/golang/snappy v0.0.1 // indirect github.com/google/btree v1.0.0 // indirect github.com/gorilla/mux v1.7.2 // indirect github.com/grpc-ecosystem/grpc-gateway v1.9.1 // indirect @@ -48,7 +47,7 @@ require ( github.com/soheilhy/cmux v0.1.4 github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spf13/cobra v0.0.4 - github.com/syndtr/goleveldb v1.0.0 + github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965 github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect github.com/uber-go/atomic v1.4.0 // indirect github.com/uber/jaeger-client-go v2.16.0+incompatible // indirect @@ -57,11 +56,12 @@ require ( go.etcd.io/bbolt v1.3.3 // indirect go.uber.org/atomic v1.4.0 // indirect go.uber.org/zap v1.10.0 // indirect - golang.org/x/net v0.0.0-20190613194153-d28f0bde5980 // indirect - golang.org/x/sys v0.0.0-20190613124609-5ed2794edfdc + golang.org/x/net v0.0.0-20190620200207-3b0461eec859 // indirect + golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0 golang.org/x/text v0.3.2 // indirect golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 - golang.org/x/tools v0.0.0-20190613204242-ed0dc450797f // indirect + golang.org/x/tools v0.0.0-20190624222133-a101b041ded4 // indirect + google.golang.org/appengine v1.4.0 // indirect google.golang.org/genproto v0.0.0-20190611190212-a7e196e89fd3 // indirect google.golang.org/grpc v1.21.1 gopkg.in/natefinch/lumberjack.v2 v2.0.0 diff --git a/go.sum b/go.sum index 5a22c3a5bb..7c20979944 100644 --- a/go.sum +++ b/go.sum @@ -291,8 +291,8 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/struCoder/pidusage v0.1.2/go.mod h1:pWBlW3YuSwRl6h7R5KbvA4N8oOqe9LjaKW5CwT1SPjI= -github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE= -github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= +github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965 h1:1oFLiOyVl+W7bnBzGhf7BbIv9loSFQcieWWYIjLqcAw= +github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965/go.mod h1:9OrXJhf154huy1nPWmuSrkgjPUtUNhA+Zmy+6AESzuA= github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 h1:lYIiVDtZnyTWlNwiAxLj0bbpTcx1BWCFhXjfsvmPdNc= github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= @@ -354,8 +354,8 @@ golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190613194153-d28f0bde5980 h1:dfGZHvZk057jK2MCeWus/TowKpJ8y4AmooUzdBSR9GU= -golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -371,8 +371,8 @@ golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190109145017-48ac38b7c8cb/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190613124609-5ed2794edfdc h1:x+/QxSNkVFAC+v4pL1f6mZr1z+qgi+FoR8ccXZPVC10= -golang.org/x/sys v0.0.0-20190613124609-5ed2794edfdc/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0 h1:HyfiK1WMnHj5FXFXatD+Qs1A/xC2Run6RzeW1SyHxpc= +golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= @@ -387,10 +387,11 @@ golang.org/x/tools v0.0.0-20190130214255-bb1329dc71a0/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190425150028-36563e24a262 h1:qsl9y/CJx34tuA7QCPNp86JNJe4spst6Ff8MjvPUdPg= golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= -golang.org/x/tools v0.0.0-20190613204242-ed0dc450797f h1:+zypR5600WBcnJgA2nzZAsBlM8cArEGa8dhhiNE4u3w= -golang.org/x/tools v0.0.0-20190613204242-ed0dc450797f/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= +golang.org/x/tools v0.0.0-20190624222133-a101b041ded4 h1:1mMox4TgefDwqluYCv677yNXwlfTkija4owZve/jr78= +golang.org/x/tools v0.0.0-20190624222133-a101b041ded4/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= google.golang.org/appengine v1.1.0 h1:igQkv0AAhEIvTEpD5LIpAfav2eeVO9HBTjvKHVJPRSs= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0 h1:/wp5JvzpHIxhs/dumFmF7BXTf3Z+dd4uXta4kVyO508= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=