Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into add-unit-test-for-ds
Browse files Browse the repository at this point in the history
  • Loading branch information
asddongmen committed Jan 2, 2025
2 parents 68c0f13 + 15926d5 commit 3d1ddcc
Show file tree
Hide file tree
Showing 40 changed files with 3,602 additions and 1,666 deletions.
20 changes: 20 additions & 0 deletions .github/workflows/integration_test_mysql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,23 @@ jobs:
run: |
pwd && ls -l bin/ && ls -l tools/bin/
export TICDC_NEWARCH=true && make integration_test CASE=fail_over_ddl_B
- name: Test fail_over_ddl_C
run: |
pwd && ls -l bin/ && ls -l tools/bin/
export TICDC_NEWARCH=true && make integration_test CASE=fail_over_ddl_C
- name: Test fail_over_ddl_D
run: |
pwd && ls -l bin/ && ls -l tools/bin/
export TICDC_NEWARCH=true && make integration_test CASE=fail_over_ddl_D
- name: Test fail_over_ddl_E
run: |
pwd && ls -l bin/ && ls -l tools/bin/
export TICDC_NEWARCH=true && make integration_test CASE=fail_over_ddl_E
- name: Test fail_over_ddl_F
run: |
pwd && ls -l bin/ && ls -l tools/bin/
export TICDC_NEWARCH=true && make integration_test CASE=fail_over_ddl_F
8 changes: 8 additions & 0 deletions downstreamadapter/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,11 @@ func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.Dispat
if pendingEvent != nil && action.CommitTs == pendingEvent.GetCommitTs() && blockStatus == heartbeatpb.BlockStage_WAITING {
d.blockEventStatus.updateBlockStage(heartbeatpb.BlockStage_WRITING)
if action.Action == heartbeatpb.Action_Write {
failpoint.Inject("WaitBeforeWrite", func() {
// we use the failpoint to make the ddl event is not written to downstream before the other node finish restarting
time.Sleep(30 * time.Second)
})
failpoint.Inject("BlockBeforeWrite", nil)
err := d.AddBlockEventToSink(pendingEvent)
if err != nil {
select {
Expand All @@ -238,6 +243,9 @@ func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.Dispat
return
}
failpoint.Inject("BlockReportAfterWrite", nil)
failpoint.Inject("WaitBeforeReport", func() {
time.Sleep(30 * time.Second)
})
} else {
d.PassBlockEventToSink(pendingEvent)
}
Expand Down
13 changes: 0 additions & 13 deletions logservice/schemastore/ddl_job_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package schemastore

import (
"context"
"math"
"sync"

Expand All @@ -30,8 +29,6 @@ import (
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pdutil"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)

Expand All @@ -55,8 +52,6 @@ type ddlJobFetcher struct {

func newDDLJobFetcher(
subClient *logpuller.SubscriptionClient,
pdCli pd.Client,
pdClock pdutil.Clock,
kvStorage kv.Storage,
startTs uint64,
cacheDDLEvent func(ddlEvent DDLJobWithCommitTs),
Expand Down Expand Up @@ -86,14 +81,6 @@ func newDDLJobFetcher(
return ddlJobFetcher
}

func (p *ddlJobFetcher) run(ctx context.Context) error {
return nil
}

func (p *ddlJobFetcher) close(ctx context.Context) error {
return nil
}

func (p *ddlJobFetcher) tryAdvanceResolvedTs(subID logpuller.SubscriptionID, newResolvedTs uint64) {
p.resolvedTsTracker.Lock()
defer p.resolvedTsTracker.Unlock()
Expand Down
45 changes: 29 additions & 16 deletions logservice/schemastore/disk_format.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,21 +273,27 @@ func loadAndApplyDDLHistory(
defer snapIter.Close()
for snapIter.First(); snapIter.Valid(); snapIter.Next() {
ddlEvent := unmarshalPersistedDDLEvent(snapIter.Value())
if shouldSkipDDL(&ddlEvent, databaseMap, tableMap) {
continue
}
if tableTriggerDDLHistory, err = updateDDLHistory(
&ddlEvent,
databaseMap,
tableMap,
partitionMap,
tablesDDLHistory,
tableTriggerDDLHistory); err != nil {
log.Panic("updateDDLHistory error", zap.Error(err))
}
if err := updateDatabaseInfoAndTableInfo(&ddlEvent, databaseMap, tableMap, partitionMap); err != nil {
log.Panic("updateDatabaseInfo error", zap.Error(err))
// Note: no need to skip ddl here
// 1. for create table and create tables, we always store the ddl with smaller commit ts, so the ddl won't conflict with the tables in the gc snapshot.
// 2. for other ddls to be ignored, they are already filtered before write to disk.
handler, ok := allDDLHandlers[model.ActionType(ddlEvent.Type)]
if !ok {
log.Panic("unknown ddl type", zap.Any("ddlType", ddlEvent.Type), zap.String("query", ddlEvent.Query))
}
tableTriggerDDLHistory = handler.updateDDLHistoryFunc(updateDDLHistoryFuncArgs{
ddlEvent: &ddlEvent,
databaseMap: databaseMap,
tableMap: tableMap,
partitionMap: partitionMap,
tablesDDLHistory: tablesDDLHistory,
tableTriggerDDLHistory: tableTriggerDDLHistory,
})
handler.updateSchemaMetadataFunc(updateSchemaMetadataFuncArgs{
event: &ddlEvent,
databaseMap: databaseMap,
tableMap: tableMap,
partitionMap: partitionMap,
})
}

return tablesDDLHistory, tableTriggerDDLHistory, nil
Expand Down Expand Up @@ -585,9 +591,16 @@ func loadAllPhysicalTablesAtTs(
defer snapIter.Close()
for snapIter.First(); snapIter.Valid(); snapIter.Next() {
ddlEvent := unmarshalPersistedDDLEvent(snapIter.Value())
if err := updateDatabaseInfoAndTableInfo(&ddlEvent, databaseMap, tableMap, partitionMap); err != nil {
log.Panic("updateDatabaseInfo error", zap.Error(err))
handler, ok := allDDLHandlers[model.ActionType(ddlEvent.Type)]
if !ok {
log.Panic("unknown ddl type", zap.Any("ddlType", ddlEvent.Type), zap.String("query", ddlEvent.Query))
}
handler.updateSchemaMetadataFunc(updateSchemaMetadataFuncArgs{
event: &ddlEvent,
databaseMap: databaseMap,
tableMap: tableMap,
partitionMap: partitionMap,
})
}
log.Info("after load tables from ddl",
zap.Int("tableMapLen", len(tableMap)),
Expand Down
146 changes: 11 additions & 135 deletions logservice/schemastore/multi_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,6 @@ func (v *versionedTableInfoStore) applyDDL(event *PersistedDDLEvent) {

// lock must be hold by the caller
func (v *versionedTableInfoStore) doApplyDDL(event *PersistedDDLEvent) {
// TODO: add a unit test
// TODO: whether need add schema version check
if len(v.infos) != 0 && event.FinishedTs <= v.infos[len(v.infos)-1].version {
log.Warn("already applied ddl, ignore it.",
zap.Int64("tableID", v.tableID),
Expand All @@ -219,98 +217,7 @@ func (v *versionedTableInfoStore) doApplyDDL(event *PersistedDDLEvent) {
zap.Int("infosLen", len(v.infos)))
return
}
appendTableInfo := func() {
info := common.WrapTableInfo(event.CurrentSchemaID, event.CurrentSchemaName, event.TableInfo)
v.infos = append(v.infos, &tableInfoItem{version: uint64(event.FinishedTs), info: info})
}

switch model.ActionType(event.Type) {
case model.ActionCreateTable:
if len(v.infos) == 1 {
// table info may be in snapshot, can not filter redudant job in this case
log.Warn("ignore create table job",
zap.Int64("tableID", int64(v.tableID)),
zap.String("query", event.Query),
zap.Uint64("finishedTS", event.FinishedTs))
break
}
assertEmpty(v.infos, event)
appendTableInfo()
case model.ActionDropSchema:
// ignore
case model.ActionDropTable:
v.deleteVersion = uint64(event.FinishedTs)
case model.ActionAddColumn,
model.ActionDropColumn:
assertNonEmpty(v.infos, event)
appendTableInfo()
case model.ActionTruncateTable:
if isPartitionTable(event.TableInfo) {
createTable := false
for _, partition := range getAllPartitionIDs(event.TableInfo) {
if v.tableID == partition {
createTable = true
break
}
}
if createTable {
log.Info("create table for truncate table")
appendTableInfo()
} else {
v.deleteVersion = uint64(event.FinishedTs)
}
} else {
if v.tableID == event.CurrentTableID {
appendTableInfo()
} else {
if v.tableID != event.PrevTableID {
log.Panic("should not happen")
}
v.deleteVersion = uint64(event.FinishedTs)
}
}
case model.ActionRenameTable:
assertNonEmpty(v.infos, event)
appendTableInfo()
case model.ActionAddTablePartition:
newCreatedIDs := getCreatedIDs(event.PrevPartitions, getAllPartitionIDs(event.TableInfo))
for _, partition := range newCreatedIDs {
if v.tableID == partition {
appendTableInfo()
break
}
}
case model.ActionDropTablePartition:
droppedIDs := getDroppedIDs(event.PrevPartitions, getAllPartitionIDs(event.TableInfo))
for _, partition := range droppedIDs {
if v.tableID == partition {
v.deleteVersion = uint64(event.FinishedTs)
break
}
}
case model.ActionCreateView:
// create view is add to all table's ddl history, so it will be read when build store, just ignore it
case model.ActionTruncateTablePartition:
physicalIDs := getAllPartitionIDs(event.TableInfo)
droppedIDs := getDroppedIDs(event.PrevPartitions, physicalIDs)
dropped := false
for _, partition := range droppedIDs {
if v.tableID == partition {
v.deleteVersion = uint64(event.FinishedTs)
dropped = true
break
}
}
if !dropped {
newCreatedIDs := getCreatedIDs(event.PrevPartitions, physicalIDs)
for _, partition := range newCreatedIDs {
if v.tableID == partition {
appendTableInfo()
break
}
}
}
case model.ActionExchangeTablePartition:
if model.ActionType(event.Type) == model.ActionExchangeTablePartition {
assertNonEmpty(v.infos, event)
columnSchema := v.infos[len(v.infos)-1].info.ShadowCopyColumnSchema()
// the previous normal table
Expand All @@ -337,48 +244,17 @@ func (v *versionedTableInfoStore) doApplyDDL(event *PersistedDDLEvent) {
)
v.infos = append(v.infos, &tableInfoItem{version: uint64(event.FinishedTs), info: tableInfo})
}
case model.ActionCreateTables:
assertEmpty(v.infos, event)
for _, tableInfo := range event.MultipleTableInfos {
if isPartitionTable(tableInfo) {
for _, partitionID := range getAllPartitionIDs(tableInfo) {
if v.tableID == partitionID {
info := common.WrapTableInfo(event.CurrentSchemaID, event.CurrentSchemaName, tableInfo)
v.infos = append(v.infos, &tableInfoItem{version: uint64(event.FinishedTs), info: info})
break
}
}
} else {
if v.tableID == tableInfo.ID {
info := common.WrapTableInfo(event.CurrentSchemaID, event.CurrentSchemaName, tableInfo)
v.infos = append(v.infos, &tableInfoItem{version: uint64(event.FinishedTs), info: info})
break
}
}
}
case model.ActionReorganizePartition:
physicalIDs := getAllPartitionIDs(event.TableInfo)
droppedIDs := getDroppedIDs(event.PrevPartitions, physicalIDs)
dropped := false
for _, partition := range droppedIDs {
if v.tableID == partition {
v.deleteVersion = uint64(event.FinishedTs)
dropped = true
break
}
} else {
// TODO: add func to check invariant for every ddl type
handler, ok := allDDLHandlers[model.ActionType(event.Type)]
if !ok {
log.Panic("unknown ddl type", zap.Any("ddlType", event.Type), zap.String("query", event.Query))
}
if !dropped {
newCreatedIDs := getCreatedIDs(event.PrevPartitions, physicalIDs)
for _, partition := range newCreatedIDs {
if v.tableID == partition {
appendTableInfo()
break
}
}
tableInfo, deleted := handler.extractTableInfoFunc(event, v.tableID)
if tableInfo != nil {
v.infos = append(v.infos, &tableInfoItem{version: uint64(event.FinishedTs), info: tableInfo})
} else if deleted {
v.deleteVersion = uint64(event.FinishedTs)
}
default:
log.Panic("not supported ddl type",
zap.Any("ddlType", event.Type),
zap.String("DDL", event.Query))
}
}
Loading

0 comments on commit 3d1ddcc

Please sign in to comment.