Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

.*: add compactor for syncer #2261

Open
wants to merge 113 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 108 commits
Commits
Show all changes
113 commits
Select commit Hold shift + click to select a range
52e9f13
refine sync dml
GMHDBJD Aug 29, 2021
96008ac
fix bug
GMHDBJD Aug 29, 2021
b54a927
fix data race
GMHDBJD Aug 30, 2021
53de6ce
fix data race
GMHDBJD Aug 30, 2021
22dad39
fix test
GMHDBJD Aug 30, 2021
9606c90
update
GMHDBJD Aug 30, 2021
57145f9
update
GMHDBJD Aug 30, 2021
90f17bc
fix test
GMHDBJD Aug 30, 2021
34d4989
fix test
GMHDBJD Aug 30, 2021
687d4e0
fix ut
GMHDBJD Aug 30, 2021
0ae0f33
fix it
GMHDBJD Aug 30, 2021
3163999
save work
GMHDBJD Aug 31, 2021
6f1fe91
fix wait
GMHDBJD Aug 31, 2021
4851cd4
fix
GMHDBJD Sep 1, 2021
b7cf22d
Merge remote-tracking branch 'upstream/master' into refineSyncDML
GMHDBJD Sep 7, 2021
30be49c
update causality and dml worker
GMHDBJD Sep 6, 2021
ff65707
Merge remote-tracking branch 'upstream/master' into refineSyncDML
GMHDBJD Sep 7, 2021
2ca637c
refine
GMHDBJD Sep 7, 2021
9b5c1fe
fix ut
GMHDBJD Sep 7, 2021
735a087
Merge branch 'master' into refineSyncDML
GMHDBJD Sep 7, 2021
cb7a298
fix causality
GMHDBJD Sep 8, 2021
53ed0e5
Merge branch 'master' into refineSyncDML
GMHDBJD Sep 8, 2021
1401f4d
refine queue size metrics
GMHDBJD Sep 8, 2021
5da87a1
fix
GMHDBJD Sep 8, 2021
f7392fb
Merge branch 'master' into refineSyncDML
GMHDBJD Sep 8, 2021
06dda5d
debug ci
GMHDBJD Sep 8, 2021
f31328e
fix
GMHDBJD Sep 8, 2021
f02faeb
revert
GMHDBJD Sep 8, 2021
7c44aec
add test back
GMHDBJD Sep 8, 2021
22c9374
update metrics
GMHDBJD Sep 8, 2021
52b41d1
Merge remote-tracking branch 'upstream/master' into refineSyncDML
GMHDBJD Sep 9, 2021
1709363
refine dml param
GMHDBJD Sep 15, 2021
b5cf1b8
save work
GMHDBJD Sep 15, 2021
4f1fede
save work
GMHDBJD Sep 18, 2021
d09d41e
save work
GMHDBJD Sep 20, 2021
e28579a
Merge remote-tracking branch 'upstream/master' into refineSyncDML
GMHDBJD Sep 20, 2021
2f466f2
Merge branch 'refineSyncDML' into compactAndMerge
GMHDBJD Sep 20, 2021
2ffe397
save work
GMHDBJD Sep 20, 2021
f5675e0
save work
GMHDBJD Sep 21, 2021
9f6ba77
save work
GMHDBJD Sep 21, 2021
d444563
save work
GMHDBJD Sep 21, 2021
88534ba
save work
GMHDBJD Sep 21, 2021
100215a
save work
GMHDBJD Sep 26, 2021
a3fe7e6
Merge remote-tracking branch 'upstream/master' into refineSyncDML
GMHDBJD Sep 27, 2021
4eaba28
add more comment
GMHDBJD Sep 27, 2021
ddd4f64
reorder
GMHDBJD Sep 27, 2021
17e1e15
review causality
GMHDBJD Sep 27, 2021
5054276
fix
GMHDBJD Sep 27, 2021
f61e238
update
GMHDBJD Sep 27, 2021
1f80f27
review dml_worker
GMHDBJD Sep 27, 2021
b0f3053
Merge remote-tracking branch 'upstream/master' into refineSyncDML
GMHDBJD Sep 28, 2021
26c7ec8
update channel size
GMHDBJD Sep 28, 2021
0fd5543
Merge branch 'master' into refineSyncDML
GMHDBJD Sep 28, 2021
4c3a896
Merge branch 'master' into refineSyncDML
GMHDBJD Sep 28, 2021
13bed05
address comment
GMHDBJD Sep 30, 2021
0daf78e
Merge remote-tracking branch 'upstream/master' into refineSyncDML
GMHDBJD Sep 30, 2021
dd10d2a
fix typo
GMHDBJD Sep 30, 2021
68be61e
wrap causality
GMHDBJD Oct 8, 2021
f71704f
Merge remote-tracking branch 'upstream/master' into refineSyncDML
GMHDBJD Oct 8, 2021
e5b8f7d
address comment
GMHDBJD Oct 9, 2021
b6a2cd3
Merge branch 'master' into refineSyncDML
GMHDBJD Oct 9, 2021
6a37915
address comment
GMHDBJD Oct 9, 2021
ce8b8f4
Merge branch 'master' into refineSyncDML
GMHDBJD Oct 9, 2021
85718b1
Merge branch 'compactAndMerge' into refactorDML
GMHDBJD Oct 10, 2021
b7b1b9e
save work
GMHDBJD Oct 14, 2021
9ee5a7a
Merge remote-tracking branch 'upstream/master' into refineSyncDML
GMHDBJD Oct 15, 2021
fe9e286
Merge branch 'refineSyncDML' into refactorDML
GMHDBJD Oct 15, 2021
e9dbabf
update causality
GMHDBJD Oct 17, 2021
a679f69
remove waittime
GMHDBJD Oct 17, 2021
dc358f5
remove flush count
GMHDBJD Oct 18, 2021
594ecfb
Merge branch 'refineSyncDML' into refactorDML
GMHDBJD Oct 18, 2021
3f4b63e
refactor dml
GMHDBJD Oct 18, 2021
1bc65b9
remove flush count
GMHDBJD Oct 18, 2021
6164cf1
fix ci
GMHDBJD Oct 18, 2021
9d48744
Merge branch 'refineSyncDML' into refactorDML
GMHDBJD Oct 18, 2021
c944bde
fix ci
GMHDBJD Oct 18, 2021
451891a
Merge branch 'master' into refineSyncDML
GMHDBJD Oct 18, 2021
b5f504b
address comment
GMHDBJD Oct 19, 2021
c3c7d2a
fix fmt
GMHDBJD Oct 19, 2021
87ba1fb
address comment
GMHDBJD Oct 19, 2021
0518536
Merge branch 'refineSyncDML' into refactorDML
GMHDBJD Oct 19, 2021
6e6e872
add unit test
GMHDBJD Oct 19, 2021
26ee3ca
Merge remote-tracking branch 'upstream/master' into refactorDML
GMHDBJD Oct 19, 2021
b331e99
update
GMHDBJD Oct 19, 2021
ec6789d
fix typo
GMHDBJD Oct 19, 2021
ee93b7a
update comment
GMHDBJD Oct 19, 2021
5251c94
update comment
GMHDBJD Oct 19, 2021
401e560
address comment
GMHDBJD Oct 20, 2021
ebb36fb
address comment
GMHDBJD Oct 21, 2021
d99bb92
add no primary/unique key test
GMHDBJD Oct 21, 2021
9993694
Merge branch 'master' into refactorDML
GMHDBJD Oct 21, 2021
7b6ee43
address comment
GMHDBJD Oct 25, 2021
e79e1bd
add compactor
GMHDBJD Oct 25, 2021
9c9912f
Merge remote-tracking branch 'upstream/master' into compactor
GMHDBJD Oct 25, 2021
ed9c978
fix test
GMHDBJD Oct 26, 2021
6219e49
test in chaos
GMHDBJD Oct 26, 2021
e711fdd
fix
GMHDBJD Oct 26, 2021
be7271c
revert
GMHDBJD Oct 26, 2021
1e00cd9
update comment
GMHDBJD Oct 26, 2021
373266c
add ut
GMHDBJD Oct 26, 2021
89d02f7
Update compactor_test.go
GMHDBJD Oct 26, 2021
77beb5d
Merge branch 'master' into newCompactor
GMHDBJD Oct 26, 2021
4d7f6ab
update channel size
GMHDBJD Oct 27, 2021
0c89e36
Update syncer/compactor.go
GMHDBJD Oct 27, 2021
9395b6a
Update syncer/compactor.go
GMHDBJD Oct 27, 2021
716c9c2
update safemode
GMHDBJD Oct 27, 2021
d51e49a
test safmode
GMHDBJD Oct 27, 2021
b30f06a
fix lint
GMHDBJD Oct 27, 2021
1bbc72d
Merge branch 'master' into newCompactor
GMHDBJD Oct 27, 2021
c5d5a22
Update dm/config/task.go
GMHDBJD Oct 28, 2021
965b2ce
address comment
GMHDBJD Oct 28, 2021
3d653a9
address comment
GMHDBJD Oct 28, 2021
a734fe0
Merge branch 'master' into newCompactor
GMHDBJD Oct 28, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions chaos/cases/conf/task-optimistic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,24 @@ mysql-instances:
black-white-list: "instance"
mydumper-thread: 4
loader-thread: 16
syncer-thread: 16
syncer-config-name: "global"
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
-
source-id: "replica-02"
black-white-list: "instance"
mydumper-thread: 4
loader-thread: 16
syncer-thread: 16
syncer-config-name: "global"
-
source-id: "replica-03"
black-white-list: "instance"
mydumper-thread: 4
loader-thread: 16
syncer-thread: 16
syncer-config-name: "global"

black-white-list:
instance:
do-dbs: ["db_optimistic"]

syncers:
global:
compact: true
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
10 changes: 7 additions & 3 deletions chaos/cases/conf/task-pessimistic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,24 @@ mysql-instances:
black-white-list: "instance"
mydumper-thread: 4
loader-thread: 16
syncer-thread: 16
syncer-config-name: "global"
-
source-id: "replica-02"
black-white-list: "instance"
mydumper-thread: 4
loader-thread: 16
syncer-thread: 16
syncer-config-name: "global"
-
source-id: "replica-03"
black-white-list: "instance"
mydumper-thread: 4
loader-thread: 16
syncer-thread: 16
syncer-config-name: "global"

black-white-list:
instance:
do-dbs: ["db_pessimistic"]

syncers:
global:
compact: true
6 changes: 5 additions & 1 deletion chaos/cases/conf/task-single.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@ mysql-instances:
black-white-list: "instance"
mydumper-thread: 4
loader-thread: 16
syncer-thread: 16
syncer-config-name: "global"

black-white-list:
instance:
do-dbs: ["db_single"]

syncers:
global:
compact: true
9 changes: 7 additions & 2 deletions chaos/cases/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,12 @@ func newTask(ctx context.Context, cli pb.MasterClient, taskFile string, schema s
sourceConns = make([]*dbConn, 0, len(taskCfg.MySQLInstances))
res = make(results, 0, len(taskCfg.MySQLInstances))
)
for i := range taskCfg.MySQLInstances { // only use necessary part of sources.
for i, m := range taskCfg.MySQLInstances { // only use necessary part of sources.
// reset Syncer, otherwise will report ERROR 20017
if len(m.SyncerConfigName) > 0 && m.Syncer != nil {
m.Syncer = nil
}

cfg := sourcesCfg[i]
db, err2 := conn.DefaultDBProvider.Apply(cfg)
if err2 != nil {
Expand Down Expand Up @@ -270,7 +275,7 @@ func (t *task) genFullData() error {

// createTask does `start-task` operation.
func (t *task) createTask() error {
t.logger.Info("starting the task")
t.logger.Info("starting the task", zap.String("task cfg", t.taskCfg.String()))
resp, err := t.cli.StartTask(t.ctx, &pb.StartTaskRequest{
Task: t.taskCfg.String(),
})
Expand Down
95 changes: 69 additions & 26 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ type SyncerConfig struct {
Batch int `yaml:"batch" toml:"batch" json:"batch"`
QueueSize int `yaml:"queue-size" toml:"queue-size" json:"queue-size"`
// checkpoint flush interval in seconds.
CheckpointFlushInterval int `yaml:"checkpoint-flush-interval" toml:"checkpoint-flush-interval" json:"checkpoint-flush-interval"`
CheckpointFlushInterval int `yaml:"checkpoint-flush-interval" toml:"checkpoint-flush-interval" json:"checkpoint-flush-interval"`
Compact bool `yaml:"compact" toml:"compact" json:"compact"`

// deprecated
MaxRetry int `yaml:"max-retry" toml:"max-retry" json:"max-retry"`
Expand Down Expand Up @@ -857,34 +858,76 @@ func NewMySQLInstancesForDowngrade(mysqlInstances []*MySQLInstance) []*MySQLInst
return mysqlInstancesForDowngrade
}

// SyncerConfigForDowngrade is the base configuration for syncer in v2.0.
// This config is used for downgrade(config export) from a higher dmctl version.
// When we add any new config item into SyncerConfig, we should update it also.
type SyncerConfigForDowngrade struct {
MetaFile string `yaml:"meta-file"`
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
WorkerCount int `yaml:"worker-count"`
Batch int `yaml:"batch"`
QueueSize int `yaml:"queue-size"`
CheckpointFlushInterval int `yaml:"checkpoint-flush-interval"`
MaxRetry int `yaml:"max-retry"`
AutoFixGTID bool `yaml:"auto-fix-gtid"`
EnableGTID bool `yaml:"enable-gtid"`
DisableCausality bool `yaml:"disable-detect"`
SafeMode bool `yaml:"safe-mode"`
EnableANSIQuotes bool `yaml:"enable-ansi-quotes"`

Compact bool `yaml:"compact,omitempty"`
}

// NewSyncerConfigs converts SyncerConfig to SyncerConfigForDowngrade.
GMHDBJD marked this conversation as resolved.
Show resolved Hide resolved
func NewSyncerConfigs(syncerConfigs map[string]*SyncerConfig) map[string]*SyncerConfigForDowngrade {
syncerConfigsForDowngrade := make(map[string]*SyncerConfigForDowngrade, len(syncerConfigs))
for configName, syncerConfig := range syncerConfigs {
newSyncerConfig := &SyncerConfigForDowngrade{
MetaFile: syncerConfig.MetaFile,
WorkerCount: syncerConfig.WorkerCount,
Batch: syncerConfig.Batch,
QueueSize: syncerConfig.QueueSize,
CheckpointFlushInterval: syncerConfig.CheckpointFlushInterval,
MaxRetry: syncerConfig.MaxRetry,
AutoFixGTID: syncerConfig.AutoFixGTID,
EnableGTID: syncerConfig.EnableGTID,
DisableCausality: syncerConfig.DisableCausality,
SafeMode: syncerConfig.SafeMode,
EnableANSIQuotes: syncerConfig.EnableANSIQuotes,
Compact: syncerConfig.Compact,
}
syncerConfigsForDowngrade[configName] = newSyncerConfig
}
return syncerConfigsForDowngrade
}

// TaskConfigForDowngrade is the base configuration for task in v2.0.
// This config is used for downgrade(config export) from a higher dmctl version.
// When we add any new config item into SourceConfig, we should update it also.
type TaskConfigForDowngrade struct {
Name string `yaml:"name"`
TaskMode string `yaml:"task-mode"`
IsSharding bool `yaml:"is-sharding"`
ShardMode string `yaml:"shard-mode"`
IgnoreCheckingItems []string `yaml:"ignore-checking-items"`
MetaSchema string `yaml:"meta-schema"`
EnableHeartbeat bool `yaml:"enable-heartbeat"`
HeartbeatUpdateInterval int `yaml:"heartbeat-update-interval"`
HeartbeatReportInterval int `yaml:"heartbeat-report-interval"`
Timezone string `yaml:"timezone"`
CaseSensitive bool `yaml:"case-sensitive"`
TargetDB *DBConfig `yaml:"target-database"`
OnlineDDLScheme string `yaml:"online-ddl-scheme"`
Routes map[string]*router.TableRule `yaml:"routes"`
Filters map[string]*bf.BinlogEventRule `yaml:"filters"`
ColumnMappings map[string]*column.Rule `yaml:"column-mappings"`
BWList map[string]*filter.Rules `yaml:"black-white-list"`
BAList map[string]*filter.Rules `yaml:"block-allow-list"`
Mydumpers map[string]*MydumperConfig `yaml:"mydumpers"`
Loaders map[string]*LoaderConfig `yaml:"loaders"`
Syncers map[string]*SyncerConfig `yaml:"syncers"`
CleanDumpFile bool `yaml:"clean-dump-file"`
EnableANSIQuotes bool `yaml:"ansi-quotes"`
RemoveMeta bool `yaml:"remove-meta"`
Name string `yaml:"name"`
TaskMode string `yaml:"task-mode"`
IsSharding bool `yaml:"is-sharding"`
ShardMode string `yaml:"shard-mode"`
IgnoreCheckingItems []string `yaml:"ignore-checking-items"`
MetaSchema string `yaml:"meta-schema"`
EnableHeartbeat bool `yaml:"enable-heartbeat"`
HeartbeatUpdateInterval int `yaml:"heartbeat-update-interval"`
HeartbeatReportInterval int `yaml:"heartbeat-report-interval"`
Timezone string `yaml:"timezone"`
CaseSensitive bool `yaml:"case-sensitive"`
TargetDB *DBConfig `yaml:"target-database"`
OnlineDDLScheme string `yaml:"online-ddl-scheme"`
Routes map[string]*router.TableRule `yaml:"routes"`
Filters map[string]*bf.BinlogEventRule `yaml:"filters"`
ColumnMappings map[string]*column.Rule `yaml:"column-mappings"`
BWList map[string]*filter.Rules `yaml:"black-white-list"`
BAList map[string]*filter.Rules `yaml:"block-allow-list"`
Mydumpers map[string]*MydumperConfig `yaml:"mydumpers"`
Loaders map[string]*LoaderConfig `yaml:"loaders"`
Syncers map[string]*SyncerConfigForDowngrade `yaml:"syncers"`
CleanDumpFile bool `yaml:"clean-dump-file"`
EnableANSIQuotes bool `yaml:"ansi-quotes"`
RemoveMeta bool `yaml:"remove-meta"`
// new config item
MySQLInstances []*MySQLInstanceForDowngrade `yaml:"mysql-instances"`
ExprFilter map[string]*ExpressionFilter `yaml:"expression-filter,omitempty"`
Expand Down Expand Up @@ -916,7 +959,7 @@ func NewTaskConfigForDowngrade(taskConfig *TaskConfig) *TaskConfigForDowngrade {
BAList: taskConfig.BAList,
Mydumpers: taskConfig.Mydumpers,
Loaders: taskConfig.Loaders,
Syncers: taskConfig.Syncers,
Syncers: NewSyncerConfigs(taskConfig.Syncers),
CleanDumpFile: taskConfig.CleanDumpFile,
EnableANSIQuotes: taskConfig.EnableANSIQuotes,
RemoveMeta: taskConfig.RemoveMeta,
Expand Down
15 changes: 15 additions & 0 deletions dm/config/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,21 @@ func cloneValues(dest, src reflect.Value) {
srcType = srcType.Elem()
}

if destType.Kind() == reflect.Map {
destMap := reflect.MakeMap(destType)
for _, k := range src.MapKeys() {
if src.MapIndex(k).Type().Kind() == reflect.Ptr {
newVal := reflect.New(destType.Elem().Elem())
cloneValues(newVal, src.MapIndex(k))
destMap.SetMapIndex(k, newVal)
} else {
cloneValues(destMap.MapIndex(k).Addr(), src.MapIndex(k).Addr())
}
}
dest.Set(destMap)
return
}

if destType.Kind() == reflect.Slice {
slice := reflect.MakeSlice(destType, src.Len(), src.Cap())
for i := 0; i < src.Len(); i++ {
Expand Down
Loading