diff --git a/pkg/sink/mysql/mysql_writer.go b/pkg/sink/mysql/mysql_writer.go index 7894048f..00d5397e 100644 --- a/pkg/sink/mysql/mysql_writer.go +++ b/pkg/sink/mysql/mysql_writer.go @@ -272,10 +272,12 @@ func (w *MysqlWriter) SendDDLTs(event *commonEvent.DDLEvent) error { case commonEvent.InfluenceTypeNormal: dropTableIds = append(dropTableIds, dropTables.TableIDs...) case commonEvent.InfluenceTypeDB: - ids := w.tableSchemaStore.GetTableIdsByDB(dropTables.SchemaID) + // for drop table, we will never delete the item of table trigger, so we get normal table ids for the schemaID. + ids := w.tableSchemaStore.GetNormalTableIdsByDB(dropTables.SchemaID) dropTableIds = append(dropTableIds, ids...) case commonEvent.InfluenceTypeAll: - ids := w.tableSchemaStore.GetAllTableIds() + // for drop table, we will never delete the item of table trigger, so we get normal table ids for the schemaID. + ids := w.tableSchemaStore.GetAllNormalTableIds() dropTableIds = append(dropTableIds, ids...) } } @@ -312,6 +314,7 @@ func (w *MysqlWriter) SendDDLTs(event *commonEvent.DDLEvent) error { builder.WriteString(" ON DUPLICATE KEY UPDATE ddl_ts=VALUES(ddl_ts), created_at=CURRENT_TIMESTAMP;") query := builder.String() + log.Debug("send ddl ts table query", zap.String("query", query)) _, err = tx.Exec(query) if err != nil { @@ -350,6 +353,7 @@ func (w *MysqlWriter) SendDDLTs(event *commonEvent.DDLEvent) error { builder.WriteString(")") query := builder.String() + log.Debug("send ddl ts table query", zap.String("query", query)) _, err = tx.Exec(query) if err != nil { @@ -367,13 +371,11 @@ func (w *MysqlWriter) SendDDLTs(event *commonEvent.DDLEvent) error { } -// CheckStartTsList return the startTs list for each table in the tableIDs list. +// GetStartTsList return the startTs list for each table in the tableIDs list. // For each table, -// If no ddl-ts-v1 table, startTs = 0; -- means the downstream is new -// If have ddl-ts-v1 table, but no row for the changefeed with tableID = 0, startTs = 0; -- means the changefeed is new. -// if have row for the changefeed with tableID = 0, but no this row, startTs = -1; -- means the table is dropped.(we won't check startTs for a table before it created) -// otherwise, startTs = ddl-ts value -func (w *MysqlWriter) CheckStartTsList(tableIDs []int64) ([]int64, error) { +// If no ddl-ts-v1 table or no the row for the table , startTs = 0; -- means the table is new. +// Otherwise, startTs = ddl-ts value. +func (w *MysqlWriter) GetStartTsList(tableIDs []int64) ([]int64, error) { retStartTsList := make([]int64, len(tableIDs)) tableIdIdxMap := make(map[int64]int, 0) for i, tableID := range tableIDs { @@ -431,43 +433,6 @@ func (w *MysqlWriter) CheckStartTsList(tableIDs []int64) ([]int64, error) { retStartTsList[tableIdIdxMap[tableId]] = ddlTs } - // if all table does't have this field, - // we need to check have row for tableID = 0(table trigger event dispatcher) - // If changefeed has tables, it must have row for tableID = 0; - if count == 0 { - builder.Reset() - builder.WriteString("SELECT ddl_ts FROM ") - builder.WriteString(filter.TiCDCSystemSchema) - builder.WriteString(".") - builder.WriteString(filter.DDLTsTable) - builder.WriteString(" WHERE (ticdc_cluster_id, changefeed, table_id) IN (") - - builder.WriteString("('") - builder.WriteString(ticdcClusterID) - builder.WriteString("', '") - builder.WriteString(changefeedID) - builder.WriteString("', ") - builder.WriteString(strconv.FormatInt(0, 10)) - builder.WriteString(")") - builder.WriteString(")") - query = builder.String() - - rows, err = w.db.Query(query) - if err != nil { - return retStartTsList, cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, fmt.Sprintf("failed to check ddl ts table; Query is %s", query))) - } - - defer rows.Close() - if rows.Next() { - // means has record for tableID = 0, but not for this table - for idx, startTs := range retStartTsList { - if startTs == 0 { - retStartTsList[idx] = -1 - } - } - } - } - return retStartTsList, nil } @@ -498,6 +463,14 @@ func (w *MysqlWriter) RemoveDDLTsItem() error { _, err = tx.Exec(query) if err != nil { + if apperror.IsTableNotExistsErr(err) { + // If this table is not existed, this means the changefeed has not table, so we just return nil. + log.Info("ddl ts table is not found when RemoveDDLTsItem", + zap.String("namespace", w.ChangefeedID.Namespace()), + zap.String("changefeedID", w.ChangefeedID.Name()), + zap.Error(err)) + return nil + } log.Error("failed to delete ddl ts item ", zap.Error(err)) err2 := tx.Rollback() if err2 != nil {