Skip to content

Commit

Permalink
Merge remote-tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
wk989898 committed Jan 2, 2025
2 parents 393fd0b + 79e5a97 commit 561c359
Show file tree
Hide file tree
Showing 11 changed files with 476 additions and 75 deletions.
9 changes: 7 additions & 2 deletions .github/workflows/integration_test_mysql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,18 @@ jobs:
run: |
pwd && ls -l bin/ && ls -l tools/bin/
export TICDC_NEWARCH=true && make integration_test CASE=fail_over_ddl_C
- name: Test fail_over_ddl_D
run: |
pwd && ls -l bin/ && ls -l tools/bin/
export TICDC_NEWARCH=true && make integration_test CASE=fail_over_ddl_D
- name: Test fail_over_ddl_E
run: |
pwd && ls -l bin/ && ls -l tools/bin/
export TICDC_NEWARCH=true && make integration_test CASE=fail_over_ddl_E
- name: Test fail_over_ddl_D
- name: Test fail_over_ddl_F
run: |
pwd && ls -l bin/ && ls -l tools/bin/
export TICDC_NEWARCH=true && make integration_test CASE=fail_over_ddl_D
export TICDC_NEWARCH=true && make integration_test CASE=fail_over_ddl_F
6 changes: 5 additions & 1 deletion downstreamadapter/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.Dispat
if pendingEvent != nil && action.CommitTs == pendingEvent.GetCommitTs() && blockStatus == heartbeatpb.BlockStage_WAITING {
d.blockEventStatus.updateBlockStage(heartbeatpb.BlockStage_WRITING)
if action.Action == heartbeatpb.Action_Write {
failpoint.Inject("WaitBeforeWrite", func() {
// we use the failpoint to make the ddl event is not written to downstream before the other node finish restarting
time.Sleep(30 * time.Second)
})
failpoint.Inject("BlockBeforeWrite", nil)
err := d.AddBlockEventToSink(pendingEvent)
if err != nil {
Expand All @@ -239,7 +243,7 @@ func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.Dispat
return
}
failpoint.Inject("BlockReportAfterWrite", nil)
failpoint.Inject("WaitWhileAfterWriteBeforeReport", func() {
failpoint.Inject("WaitBeforeReport", func() {
time.Sleep(30 * time.Second)
})
} else {
Expand Down
13 changes: 0 additions & 13 deletions logservice/schemastore/ddl_job_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package schemastore

import (
"context"
"math"
"sync"

Expand All @@ -30,8 +29,6 @@ import (
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pdutil"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)

Expand All @@ -55,8 +52,6 @@ type ddlJobFetcher struct {

func newDDLJobFetcher(
subClient *logpuller.SubscriptionClient,
pdCli pd.Client,
pdClock pdutil.Clock,
kvStorage kv.Storage,
startTs uint64,
cacheDDLEvent func(ddlEvent DDLJobWithCommitTs),
Expand Down Expand Up @@ -86,14 +81,6 @@ func newDDLJobFetcher(
return ddlJobFetcher
}

func (p *ddlJobFetcher) run(ctx context.Context) error {
return nil
}

func (p *ddlJobFetcher) close(ctx context.Context) error {
return nil
}

func (p *ddlJobFetcher) tryAdvanceResolvedTs(subID logpuller.SubscriptionID, newResolvedTs uint64) {
p.resolvedTsTracker.Lock()
defer p.resolvedTsTracker.Unlock()
Expand Down
6 changes: 3 additions & 3 deletions logservice/schemastore/disk_format.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,9 @@ func loadAndApplyDDLHistory(
defer snapIter.Close()
for snapIter.First(); snapIter.Valid(); snapIter.Next() {
ddlEvent := unmarshalPersistedDDLEvent(snapIter.Value())
if shouldSkipDDL(&ddlEvent, databaseMap, tableMap) {
continue
}
// Note: no need to skip ddl here
// 1. for create table and create tables, we always store the ddl with smaller commit ts, so the ddl won't conflict with the tables in the gc snapshot.
// 2. for other ddls to be ignored, they are already filtered before write to disk.
handler, ok := allDDLHandlers[model.ActionType(ddlEvent.Type)]
if !ok {
log.Panic("unknown ddl type", zap.Any("ddlType", ddlEvent.Type), zap.String("query", ddlEvent.Query))
Expand Down
74 changes: 33 additions & 41 deletions logservice/schemastore/persist_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,11 @@ func (p *persistentStorage) persistUpperBoundPeriodically(ctx context.Context) e
func (p *persistentStorage) handleDDLJob(job *model.Job) error {
p.mu.Lock()

if shouldSkipDDL(job, p.tableMap) {
p.mu.Unlock()
return nil
}

handler, ok := allDDLHandlers[job.Type]
if !ok {
log.Panic("unknown ddl type", zap.Any("ddlType", job.Type), zap.String("query", job.Query))
Expand All @@ -665,11 +670,6 @@ func (p *persistentStorage) handleDDLJob(job *model.Job) error {
partitionMap: p.partitionMap,
})

if shouldSkipDDL(&ddlEvent, p.databaseMap, p.tableMap) {
p.mu.Unlock()
return nil
}

p.mu.Unlock()

// Note: need write ddl event to disk before update ddl history,
Expand Down Expand Up @@ -708,53 +708,45 @@ func (p *persistentStorage) handleDDLJob(job *model.Job) error {
return nil
}

func shouldSkipDDL(
event *PersistedDDLEvent,
databaseMap map[int64]*BasicDatabaseInfo,
tableMap map[int64]*BasicTableInfo,
) bool {
switch model.ActionType(event.Type) {
// TODO: add some comment to explain why and when we should skip ActionCreateSchema/ActionCreateTable
case model.ActionCreateSchema:
if _, ok := databaseMap[event.CurrentSchemaID]; ok {
log.Warn("database already exists. ignore DDL ",
zap.String("DDL", event.Query),
zap.Int64("jobID", event.ID),
zap.Int64("schemaID", event.CurrentSchemaID),
zap.Uint64("finishTs", event.FinishedTs),
zap.Int64("jobSchemaVersion", event.SchemaVersion))
return true
}
func shouldSkipDDL(job *model.Job, tableMap map[int64]*BasicTableInfo) bool {
switch model.ActionType(job.Type) {
// Skipping ActionCreateTable and ActionCreateTables when the table already exists:
// 1. It is possible to receive ActionCreateTable and ActionCreateTables multiple times,
// and filtering duplicates in a generic way is challenging.
// (SchemaVersion checks are unreliable because versions might not be strictly ordered in some cases.)
// 2. ActionCreateTable and ActionCreateTables for the same table may have different commit ts.
// One of these actions could be garbage collected, leaving the table present in the snapshot.
// Therefore, the only reliable way to determine if a later DDL operation is redundant
// is by verifying whether the table already exists.
case model.ActionCreateTable:
// Note: partition table's logical table id is also in tableMap
if _, ok := tableMap[event.CurrentTableID]; ok {
if _, ok := tableMap[job.BinlogInfo.TableInfo.ID]; ok {
log.Warn("table already exists. ignore DDL",
zap.String("DDL", event.Query),
zap.Int64("jobID", event.ID),
zap.Int64("schemaID", event.CurrentSchemaID),
zap.Int64("tableID", event.CurrentTableID),
zap.Uint64("finishTs", event.FinishedTs),
zap.Int64("jobSchemaVersion", event.SchemaVersion))
zap.String("DDL", job.Query),
zap.Int64("jobID", job.ID),
zap.Int64("schemaID", job.SchemaID),
zap.Int64("tableID", job.BinlogInfo.TableInfo.ID),
zap.Uint64("finishTs", job.BinlogInfo.FinishedTS),
zap.Int64("jobSchemaVersion", job.BinlogInfo.SchemaVersion))
return true
}
case model.ActionAlterTableAttributes,
model.ActionAlterTablePartitionAttributes:
// Note: these ddls seems not useful to sync to downstream?
return true
case model.ActionCreateTables:
// For duplicate create tables ddl job, the tables in the job should be same, check the first table is enough
if _, ok := tableMap[event.MultipleTableInfos[0].ID]; ok {
if _, ok := tableMap[job.BinlogInfo.MultipleTableInfos[0].ID]; ok {
log.Warn("table already exists. ignore DDL",
zap.String("DDL", event.Query),
zap.Int64("jobID", event.ID),
zap.Int64("schemaID", event.CurrentSchemaID),
zap.Int64("tableID", event.CurrentTableID),
zap.Uint64("finishTs", event.FinishedTs),
zap.Int64("jobSchemaVersion", event.SchemaVersion))
zap.String("DDL", job.Query),
zap.Int64("jobID", job.ID),
zap.Int64("schemaID", job.SchemaID),
zap.Int64("tableID", job.BinlogInfo.MultipleTableInfos[0].ID),
zap.Uint64("finishTs", job.BinlogInfo.FinishedTS),
zap.Int64("jobSchemaVersion", job.BinlogInfo.SchemaVersion))
return true
}
// DDLs ignored
case model.ActionAlterTableAttributes,
model.ActionAlterTablePartitionAttributes:
return true
}
// Note: create tables don't need to be ignore, because we won't receive it twice
return false
}

Expand Down
4 changes: 2 additions & 2 deletions logservice/schemastore/persist_storage_ddl_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1090,7 +1090,7 @@ func buildDDLEventCommon(rawEvent *PersistedDDLEvent, _ filter.Filter, tiDBOnly
}

func buildDDLEventForCreateSchema(rawEvent *PersistedDDLEvent, tableFilter filter.Filter) commonEvent.DDLEvent {
ddlEvent := buildDDLEventCommon(rawEvent, tableFilter, false)
ddlEvent := buildDDLEventCommon(rawEvent, tableFilter, WithoutTiDBOnly)
ddlEvent.BlockedTables = &commonEvent.InfluencedTables{
InfluenceType: commonEvent.InfluenceTypeNormal,
TableIDs: []int64{heartbeatpb.DDLSpan.TableID},
Expand All @@ -1099,7 +1099,7 @@ func buildDDLEventForCreateSchema(rawEvent *PersistedDDLEvent, tableFilter filte
}

func buildDDLEventForDropSchema(rawEvent *PersistedDDLEvent, tableFilter filter.Filter) commonEvent.DDLEvent {
ddlEvent := buildDDLEventCommon(rawEvent, tableFilter, false)
ddlEvent := buildDDLEventCommon(rawEvent, tableFilter, WithoutTiDBOnly)
ddlEvent.BlockedTables = &commonEvent.InfluencedTables{
InfluenceType: commonEvent.InfluenceTypeDB,
SchemaID: rawEvent.CurrentSchemaID,
Expand Down
13 changes: 4 additions & 9 deletions logservice/schemastore/schema_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,6 @@ func New(
zap.Int64("schemaVersion", s.schemaVersion))
s.ddlJobFetcher = newDDLJobFetcher(
subClient,
pdCli,
pdClock,
kvStorage,
upperBound.ResolvedTs,
s.writeDDLEvent,
Expand All @@ -123,18 +121,13 @@ func (s *schemaStore) Run(ctx context.Context) error {
eg.Go(func() error {
return s.updateResolvedTsPeriodically(ctx)
})
eg.Go(func() error {
return s.ddlJobFetcher.run(ctx)
})

return eg.Wait()
}

func (s *schemaStore) Close(ctx context.Context) error {
log.Info("schema store closed")
if err := s.dataStorage.close(); err != nil {
log.Warn("failed to close data storage", zap.Error(err))
}
return s.ddlJobFetcher.close(ctx)
return s.dataStorage.close()
}

func (s *schemaStore) updateResolvedTsPeriodically(ctx context.Context) error {
Expand Down Expand Up @@ -174,7 +167,9 @@ func (s *schemaStore) updateResolvedTsPeriodically(ctx context.Context) error {
}
log.Info("handle ddl job",
zap.Int64("schemaID", event.Job.SchemaID),
zap.String("schemaName", event.Job.SchemaName),
zap.Int64("tableID", event.Job.TableID),
zap.String("tableName", event.Job.TableName),
zap.Any("type", event.Job.Type),
zap.String("job", event.Job.Query),
zap.Int64("jobSchemaVersion", event.Job.BinlogInfo.SchemaVersion),
Expand Down
8 changes: 4 additions & 4 deletions tests/integration_tests/fail_over_ddl_C/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ function failOverCaseC-1() {

sleep 10

export GO_FAILPOINTS='github.com/pingcap/ticdc/pkg/scheduler/StopBalanceScheduler=return(true);github.com/pingcap/ticdc/downstreamadapter/dispatcher/WaitWhileAfterWriteBeforeReport=return(true)'
export GO_FAILPOINTS='github.com/pingcap/ticdc/pkg/scheduler/StopBalanceScheduler=return(true);github.com/pingcap/ticdc/downstreamadapter/dispatcher/WaitBeforeReport=return(true)'

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "0-1" --addr "127.0.0.1:8300"

Expand Down Expand Up @@ -139,7 +139,7 @@ function failOverCaseC-2() {

sleep 10

export GO_FAILPOINTS='github.com/pingcap/ticdc/pkg/scheduler/StopBalanceScheduler=return(true);github.com/pingcap/ticdc/downstreamadapter/dispatcher/WaitWhileAfterWriteBeforeReport=return(true)'
export GO_FAILPOINTS='github.com/pingcap/ticdc/pkg/scheduler/StopBalanceScheduler=return(true);github.com/pingcap/ticdc/downstreamadapter/dispatcher/WaitBeforeReport=return(true)'

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "0-1" --addr "127.0.0.1:8300"

Expand Down Expand Up @@ -208,7 +208,7 @@ function failOverCaseC-3() {

sleep 10

export GO_FAILPOINTS='github.com/pingcap/ticdc/pkg/scheduler/StopBalanceScheduler=return(true);github.com/pingcap/ticdc/downstreamadapter/dispatcher/WaitWhileAfterWriteBeforeReport=return(true)'
export GO_FAILPOINTS='github.com/pingcap/ticdc/pkg/scheduler/StopBalanceScheduler=return(true);github.com/pingcap/ticdc/downstreamadapter/dispatcher/WaitBeforeReport=return(true)'

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "0-1" --addr "127.0.0.1:8300"

Expand Down Expand Up @@ -278,7 +278,7 @@ function failOverCaseC-5() {

sleep 10

export GO_FAILPOINTS='github.com/pingcap/ticdc/pkg/scheduler/StopBalanceScheduler=return(true);github.com/pingcap/ticdc/downstreamadapter/dispatcher/WaitWhileAfterWriteBeforeReport=return(true)'
export GO_FAILPOINTS='github.com/pingcap/ticdc/pkg/scheduler/StopBalanceScheduler=return(true);github.com/pingcap/ticdc/downstreamadapter/dispatcher/WaitBeforeReport=return(true)'

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "0-1" --addr "127.0.0.1:8300"

Expand Down
28 changes: 28 additions & 0 deletions tests/integration_tests/fail_over_ddl_F/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# diff Configuration.
check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/fail_over_ddl_F/sync_diff/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["fail_over_ddl_test2.*", "fail_over_ddl_test.*"]

[data-sources]
[data-sources.mysql1]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""

[data-sources.tidb0]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
36 changes: 36 additions & 0 deletions tests/integration_tests/fail_over_ddl_F/data/prepare.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
create database `fail_over_ddl_test2`;
use `fail_over_ddl_test2`;

create table t1 (
id INT AUTO_INCREMENT PRIMARY KEY,
val INT DEFAULT 0,
col0 INT NOT NULL
);

create table t2 (
id INT AUTO_INCREMENT PRIMARY KEY,
val INT DEFAULT 0,
col0 INT NOT NULL
);

INSERT INTO t1 (val, col0) VALUES (1, 1);
INSERT INTO t1 (val, col0) VALUES (2, 2);
INSERT INTO t1 (val, col0) VALUES (3, 3);
INSERT INTO t1 (val, col0) VALUES (4, 4);
INSERT INTO t1 (val, col0) VALUES (5, 5);
INSERT INTO t1 (val, col0) VALUES (6, 6);
INSERT INTO t1 (val, col0) VALUES (7, 7);
INSERT INTO t1 (val, col0) VALUES (8, 8);
INSERT INTO t1 (val, col0) VALUES (9, 9);
INSERT INTO t1 (val, col0) VALUES (10, 10);

INSERT INTO t2 (val, col0) VALUES (1, 1);
INSERT INTO t2 (val, col0) VALUES (2, 2);
INSERT INTO t2 (val, col0) VALUES (3, 3);
INSERT INTO t2 (val, col0) VALUES (4, 4);
INSERT INTO t2 (val, col0) VALUES (5, 5);
INSERT INTO t2 (val, col0) VALUES (6, 6);
INSERT INTO t2 (val, col0) VALUES (7, 7);
INSERT INTO t2 (val, col0) VALUES (8, 8);
INSERT INTO t2 (val, col0) VALUES (9, 9);
INSERT INTO t2 (val, col0) VALUES (10, 10);
Loading

0 comments on commit 561c359

Please sign in to comment.