Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
hongyunyan committed Dec 30, 2024
1 parent 6d1f1ea commit 0516fe7
Showing 1 changed file with 18 additions and 45 deletions.
63 changes: 18 additions & 45 deletions pkg/sink/mysql/mysql_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,12 @@ func (w *MysqlWriter) SendDDLTs(event *commonEvent.DDLEvent) error {
case commonEvent.InfluenceTypeNormal:
dropTableIds = append(dropTableIds, dropTables.TableIDs...)
case commonEvent.InfluenceTypeDB:
ids := w.tableSchemaStore.GetTableIdsByDB(dropTables.SchemaID)
// for drop table, we will never delete the item of table trigger, so we get normal table ids for the schemaID.
ids := w.tableSchemaStore.GetNormalTableIdsByDB(dropTables.SchemaID)
dropTableIds = append(dropTableIds, ids...)
case commonEvent.InfluenceTypeAll:
ids := w.tableSchemaStore.GetAllTableIds()
// for drop table, we will never delete the item of table trigger, so we get normal table ids for the schemaID.
ids := w.tableSchemaStore.GetAllNormalTableIds()
dropTableIds = append(dropTableIds, ids...)
}
}
Expand Down Expand Up @@ -312,6 +314,7 @@ func (w *MysqlWriter) SendDDLTs(event *commonEvent.DDLEvent) error {
builder.WriteString(" ON DUPLICATE KEY UPDATE ddl_ts=VALUES(ddl_ts), created_at=CURRENT_TIMESTAMP;")

query := builder.String()
log.Debug("send ddl ts table query", zap.String("query", query))

_, err = tx.Exec(query)
if err != nil {
Expand Down Expand Up @@ -350,6 +353,7 @@ func (w *MysqlWriter) SendDDLTs(event *commonEvent.DDLEvent) error {

builder.WriteString(")")
query := builder.String()
log.Debug("send ddl ts table query", zap.String("query", query))

_, err = tx.Exec(query)
if err != nil {
Expand All @@ -367,13 +371,11 @@ func (w *MysqlWriter) SendDDLTs(event *commonEvent.DDLEvent) error {

}

// CheckStartTsList return the startTs list for each table in the tableIDs list.
// GetStartTsList return the startTs list for each table in the tableIDs list.
// For each table,
// If no ddl-ts-v1 table, startTs = 0; -- means the downstream is new
// If have ddl-ts-v1 table, but no row for the changefeed with tableID = 0, startTs = 0; -- means the changefeed is new.
// if have row for the changefeed with tableID = 0, but no this row, startTs = -1; -- means the table is dropped.(we won't check startTs for a table before it created)
// otherwise, startTs = ddl-ts value
func (w *MysqlWriter) CheckStartTsList(tableIDs []int64) ([]int64, error) {
// If no ddl-ts-v1 table or no the row for the table , startTs = 0; -- means the table is new.
// Otherwise, startTs = ddl-ts value.
func (w *MysqlWriter) GetStartTsList(tableIDs []int64) ([]int64, error) {
retStartTsList := make([]int64, len(tableIDs))
tableIdIdxMap := make(map[int64]int, 0)
for i, tableID := range tableIDs {
Expand Down Expand Up @@ -431,43 +433,6 @@ func (w *MysqlWriter) CheckStartTsList(tableIDs []int64) ([]int64, error) {
retStartTsList[tableIdIdxMap[tableId]] = ddlTs
}

// if all table does't have this field,
// we need to check have row for tableID = 0(table trigger event dispatcher)
// If changefeed has tables, it must have row for tableID = 0;
if count == 0 {
builder.Reset()
builder.WriteString("SELECT ddl_ts FROM ")
builder.WriteString(filter.TiCDCSystemSchema)
builder.WriteString(".")
builder.WriteString(filter.DDLTsTable)
builder.WriteString(" WHERE (ticdc_cluster_id, changefeed, table_id) IN (")

builder.WriteString("('")
builder.WriteString(ticdcClusterID)
builder.WriteString("', '")
builder.WriteString(changefeedID)
builder.WriteString("', ")
builder.WriteString(strconv.FormatInt(0, 10))
builder.WriteString(")")
builder.WriteString(")")
query = builder.String()

rows, err = w.db.Query(query)
if err != nil {
return retStartTsList, cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, fmt.Sprintf("failed to check ddl ts table; Query is %s", query)))
}

defer rows.Close()
if rows.Next() {
// means has record for tableID = 0, but not for this table
for idx, startTs := range retStartTsList {
if startTs == 0 {
retStartTsList[idx] = -1
}
}
}
}

return retStartTsList, nil
}

Expand Down Expand Up @@ -498,6 +463,14 @@ func (w *MysqlWriter) RemoveDDLTsItem() error {

_, err = tx.Exec(query)
if err != nil {
if apperror.IsTableNotExistsErr(err) {
// If this table is not existed, this means the changefeed has not table, so we just return nil.
log.Info("ddl ts table is not found when RemoveDDLTsItem",
zap.String("namespace", w.ChangefeedID.Namespace()),
zap.String("changefeedID", w.ChangefeedID.Name()),
zap.Error(err))
return nil
}
log.Error("failed to delete ddl ts item ", zap.Error(err))
err2 := tx.Rollback()
if err2 != nil {
Expand Down

0 comments on commit 0516fe7

Please sign in to comment.