Skip to content

Commit

Permalink
Add a test for checkpoint in checksum
Browse files Browse the repository at this point in the history
  • Loading branch information
morgo committed Sep 2, 2024
1 parent 7488738 commit 4c974e0
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 34 deletions.
59 changes: 31 additions & 28 deletions pkg/checksum/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,23 @@ import (

type Checker struct {
sync.Mutex
table *table.TableInfo
newTable *table.TableInfo
concurrency int
feed *repl.Client
db *sql.DB
trxPool *dbconn.TrxPool
isInvalid bool
chunker table.Chunker
startTime time.Time
ExecTime time.Duration
recentValue interface{} // used for status
dbConfig *dbconn.DBConfig
logger loggers.Advanced
fixDifferences bool
differencesFound atomic.Uint64
recopyLock sync.Mutex
isResumeFromCheckpoint bool
table *table.TableInfo
newTable *table.TableInfo
concurrency int
feed *repl.Client
db *sql.DB
trxPool *dbconn.TrxPool
isInvalid bool
chunker table.Chunker
startTime time.Time
ExecTime time.Duration
recentValue interface{} // used for status
dbConfig *dbconn.DBConfig
logger loggers.Advanced
fixDifferences bool
differencesFound atomic.Uint64
recopyLock sync.Mutex
isResume bool
}

type CheckerConfig struct {
Expand Down Expand Up @@ -87,16 +87,16 @@ func NewChecker(db *sql.DB, tbl, newTable *table.TableInfo, feed *repl.Client, c
}
}
checksum := &Checker{
table: tbl,
newTable: newTable,
concurrency: config.Concurrency,
db: db,
feed: feed,
chunker: chunker,
dbConfig: config.DBConfig,
logger: config.Logger,
fixDifferences: config.FixDifferences,
isResumeFromCheckpoint: config.Watermark != "",
table: tbl,
newTable: newTable,
concurrency: config.Concurrency,
db: db,
feed: feed,
chunker: chunker,
dbConfig: config.DBConfig,
logger: config.Logger,
fixDifferences: config.FixDifferences,
isResume: config.Watermark != "",
}
return checksum, nil
}
Expand Down Expand Up @@ -170,6 +170,9 @@ func (c *Checker) RecentValue() string {
}

func (c *Checker) GetLowWatermark() (string, error) {
if c.chunker == nil {
return "", errors.New("chunker not initialized")
}
return c.chunker.GetLowWatermark()
}

Expand Down Expand Up @@ -380,7 +383,7 @@ func (c *Checker) Run(ctx context.Context) error {
// Open the chunker if it's not open.
// It will already be open if this is a resume from checkpoint.
// This is a little annoying, but just the way the chunker API works.
if !c.isResumeFromCheckpoint {
if !c.isResume {
if err := c.chunker.Open(); err != nil {
return err
}
Expand Down
33 changes: 33 additions & 0 deletions pkg/checksum/checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,3 +258,36 @@ func TestChangeDataTypeDatetime(t *testing.T) {
assert.NoError(t, err)
assert.NoError(t, checker.Run(context.Background())) // fails
}

func TestFromWatermark(t *testing.T) {
testutils.RunSQL(t, "DROP TABLE IF EXISTS tfromwatermark, _tfromwatermark_new, _tfromwatermark_chkpnt")
testutils.RunSQL(t, "CREATE TABLE tfromwatermark (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE _tfromwatermark_new (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE _tfromwatermark_chkpnt (a INT)") // for binlog advancement
testutils.RunSQL(t, "INSERT INTO tfromwatermark VALUES (1, 2, 3)")
testutils.RunSQL(t, "INSERT INTO _tfromwatermark_new VALUES (1, 2, 3)")

db, err := dbconn.New(testutils.DSN(), dbconn.NewDBConfig())
assert.NoError(t, err)

t1 := table.NewTableInfo(db, "test", "tfromwatermark")
assert.NoError(t, t1.SetInfo(context.TODO()))
t2 := table.NewTableInfo(db, "test", "_tfromwatermark_new")
assert.NoError(t, t2.SetInfo(context.TODO()))
logger := logrus.New()

cfg, err := mysql.ParseDSN(testutils.DSN())
assert.NoError(t, err)
feed := repl.NewClient(db, cfg.Addr, t1, t2, cfg.User, cfg.Passwd, &repl.ClientConfig{
Logger: logger,
Concurrency: 4,
TargetBatchTime: time.Second,
})
assert.NoError(t, feed.Run())

config := NewCheckerDefaultConfig()
config.Watermark = "{\"Key\":[\"a\"],\"ChunkSize\":1000,\"LowerBound\":{\"Value\": [\"2\"],\"Inclusive\":true},\"UpperBound\":{\"Value\": [\"3\"],\"Inclusive\":false}}"
checker, err := NewChecker(db, t1, t2, feed, config)
assert.NoError(t, err)
assert.NoError(t, checker.Run(context.Background()))
}
12 changes: 7 additions & 5 deletions pkg/migration/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const (

// These are really consts, but set to var for testing.
var (
checkpointDumpInterval = 2 * time.Second
checkpointDumpInterval = 50 * time.Second
tableStatUpdateInterval = 5 * time.Minute
statusInterval = 30 * time.Second
sentinelCheckInterval = 1 * time.Second
Expand Down Expand Up @@ -917,10 +917,12 @@ func (r *Runner) dumpCheckpoint(ctx context.Context) error {
var checksumWatermark string
if r.getCurrentState() >= stateChecksum {
r.checkerLock.Lock()
checksumWatermark, err = r.checker.GetLowWatermark()
r.checkerLock.Unlock()
if err != nil {
return err
defer r.checkerLock.Unlock()
if r.checker != nil {
checksumWatermark, err = r.checker.GetLowWatermark()
if err != nil {
return err
}
}
}
copyRows := atomic.LoadUint64(&r.copier.CopyRowsCount)
Expand Down
73 changes: 72 additions & 1 deletion pkg/migration/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,77 @@ func TestCheckpointRestore(t *testing.T) {
assert.True(t, r2.usedResumeFromCheckpoint)
}

func TestCheckpointResumeDuringChecksum(t *testing.T) {
tbl := `CREATE TABLE cptresume (
id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
id2 INT NOT NULL,
pad VARCHAR(100) NOT NULL default 0)`
cfg, err := mysql.ParseDSN(testutils.DSN())
assert.NoError(t, err)
testutils.RunSQL(t, `DROP TABLE IF EXISTS cptresume, _cptresume_new, _cptresume_chkpnt, _cptresume_sentinel`)
testutils.RunSQL(t, tbl)
testutils.RunSQL(t, `CREATE TABLE _cptresume_sentinel (id INT NOT NULL PRIMARY KEY)`)
testutils.RunSQL(t, `insert into cptresume (id2,pad) SELECT 1, REPEAT('a', 100) FROM dual`)
testutils.RunSQL(t, `insert into cptresume (id2,pad) SELECT 1, REPEAT('a', 100) FROM cptresume`)
testutils.RunSQL(t, `insert into cptresume (id2,pad) SELECT 1, REPEAT('a', 100) FROM cptresume a JOIN cptresume b JOIN cptresume c`)

r, err := NewRunner(&Migration{
Host: cfg.Addr,
Username: cfg.User,
Password: cfg.Passwd,
Database: cfg.DBName,
Threads: 4,
TargetChunkTime: 100 * time.Millisecond,
Table: "cptresume",
Alter: "ENGINE=InnoDB",
Checksum: true,
})
assert.NoError(t, err)

// Call r.Run() with our context in a go-routine.
// When we see that we are waiting on the sentinel table,
// we then manually start the first bits of checksum, and then close()
// We should be able to resume from the checkpoint into the checksum state.
ctx, cancel := context.WithCancel(context.Background())
go func() {
err := r.Run(ctx)
assert.Error(t, err)
}()
for {
// Wait for the sentinel table.
if r.getCurrentState() >= stateWaitingOnSentinelTable {
break
}
time.Sleep(time.Millisecond)
}

assert.NoError(t, r.checksum(context.TODO())) // run the checksum, the original Run is blocked on sentinel.
assert.NoError(t, r.dumpCheckpoint(context.TODO())) // dump a checkpoint with the watermark.
cancel() // unblock the original waiting on sentinel.
assert.NoError(t, r.Close()) // close the run.

// drop the sentinel table.
testutils.RunSQL(t, `DROP TABLE _cptresume_sentinel`)

// Start again as a new runner,
r2, err := NewRunner(&Migration{
Host: cfg.Addr,
Username: cfg.User,
Password: cfg.Passwd,
Database: cfg.DBName,
Threads: 4,
TargetChunkTime: 100 * time.Millisecond,
Table: "cptresume",
Alter: "ENGINE=InnoDB",
Checksum: true,
})
assert.NoError(t, err)
err = r2.Run(context.Background())
assert.NoError(t, err)
assert.True(t, r2.usedResumeFromCheckpoint)
assert.NotEmpty(t, r2.checksumWatermark) // it had a checksum watermark
}

func TestCheckpointDifferentRestoreOptions(t *testing.T) {
tbl := `CREATE TABLE cpt1difft1 (
id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
Expand Down Expand Up @@ -2674,7 +2745,7 @@ func TestResumeFromCheckpointE2EWithManualSentinel(t *testing.T) {

go func() {
err := runner.Run(ctx)
assert.ErrorContains(t, err, "context canceled") // it gets interrupted as soon as there is a checkpoint saved.
assert.Error(t, err) // it gets interrupted as soon as there is a checkpoint saved.
}()

// wait until a checkpoint is saved (which means copy is in progress)
Expand Down

0 comments on commit 4c974e0

Please sign in to comment.