Skip to content

Commit

Permalink
kafka: refactor the ddl and dml worker (#888)
Browse files Browse the repository at this point in the history
* refactor the kafka dml worker

* remove protocol from the ddl worker fields

* tiny adjust the ddl worker

* tiny adjust the ddl worker

* rewrite ddl worker write block event method

* no need to fetch partition number if no need to broadcast

* finish refactor the worker

* fix close components
  • Loading branch information
3AceShowHand authored Jan 16, 2025
1 parent fa2267b commit e1fcd8c
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 109 deletions.
4 changes: 2 additions & 2 deletions downstreamadapter/sink/helper/eventrouter/event_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ func (s *EventRouter) GetActiveTopics(activeTables []*commonEvent.SchemaTableNam
return topics
}

// GetPartitionForRowChange returns the target partition for row changes.
func (s *EventRouter) GetPartitionGeneratorForRowChange(tableInfo *common.TableInfo) partition.PartitionGenerator {
// GetPartitionGenerator returns the target partition by the table information.
func (s *EventRouter) GetPartitionGenerator(tableInfo *common.TableInfo) partition.PartitionGenerator {
return s.GetPartitionDispatcher(tableInfo.GetSchemaName(), tableInfo.GetTableName())
}

Expand Down
10 changes: 5 additions & 5 deletions downstreamadapter/sink/helper/eventrouter/event_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func TestGetPartitionForRowChange(t *testing.T) {
tableInfo := &common.TableInfo{
TableName: common.TableName{Schema: "test_default1", Table: "table"},
}
partitionGenerator := d.GetPartitionGeneratorForRowChange(tableInfo)
partitionGenerator := d.GetPartitionGenerator(tableInfo)
p, _, err := partitionGenerator.GeneratePartitionIndexAndKey(&commonEvent.RowChange{}, 16, tableInfo, 0)
require.NoError(t, err)
require.Equal(t, int32(14), p)
Expand All @@ -212,7 +212,7 @@ func TestGetPartitionForRowChange(t *testing.T) {
tableInfo = &common.TableInfo{
TableName: common.TableName{Schema: "test_default2", Table: "table"},
}
partitionGenerator = d.GetPartitionGeneratorForRowChange(tableInfo)
partitionGenerator = d.GetPartitionGenerator(tableInfo)
p, _, err = partitionGenerator.GeneratePartitionIndexAndKey(&commonEvent.RowChange{}, 16, tableInfo, 0)
require.NoError(t, err)
require.Equal(t, int32(0), p)
Expand All @@ -221,7 +221,7 @@ func TestGetPartitionForRowChange(t *testing.T) {
tableInfo = &common.TableInfo{
TableName: common.TableName{Schema: "test_table", Table: "table"},
}
partitionGenerator = d.GetPartitionGeneratorForRowChange(tableInfo)
partitionGenerator = d.GetPartitionGenerator(tableInfo)
p, _, err = partitionGenerator.GeneratePartitionIndexAndKey(&commonEvent.RowChange{}, 16, tableInfo, 1)
require.NoError(t, err)
require.Equal(t, int32(15), p)
Expand All @@ -246,7 +246,7 @@ func TestGetPartitionForRowChange(t *testing.T) {
row, ok := dmlEvent.GetNextRow()
require.True(t, ok)

partitionGenerator = d.GetPartitionGeneratorForRowChange(dmlEvent.TableInfo)
partitionGenerator = d.GetPartitionGenerator(dmlEvent.TableInfo)
p, _, err = partitionGenerator.GeneratePartitionIndexAndKey(&row, 10, dmlEvent.TableInfo, 2)
require.NoError(t, err)
require.Equal(t, int32(9), p)
Expand All @@ -255,7 +255,7 @@ func TestGetPartitionForRowChange(t *testing.T) {
tableInfo = &common.TableInfo{
TableName: common.TableName{Schema: "a", Table: "table"},
}
partitionGenerator = d.GetPartitionGeneratorForRowChange(tableInfo)
partitionGenerator = d.GetPartitionGenerator(tableInfo)
p, _, err = partitionGenerator.GeneratePartitionIndexAndKey(&commonEvent.RowChange{}, 2, tableInfo, 1)
require.NoError(t, err)
require.Equal(t, int32(1), p)
Expand Down
20 changes: 12 additions & 8 deletions downstreamadapter/sink/kafka_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,12 @@ func (s *KafkaSink) SinkType() common.SinkType {

func verifyKafkaSink(ctx context.Context, changefeedID common.ChangeFeedID, uri *url.URL, sinkConfig *config.SinkConfig) error {
components, _, err := worker.GetKafkaSinkComponent(ctx, changefeedID, uri, sinkConfig)
components.AdminClient.Close()
components.TopicManager.Close()
if components.AdminClient != nil {
components.AdminClient.Close()
}
if components.TopicManager != nil {
components.TopicManager.Close()
}
return err
}

Expand Down Expand Up @@ -144,22 +148,22 @@ func (s *KafkaSink) IsNormal() bool {
}

func (s *KafkaSink) AddDMLEvent(event *commonEvent.DMLEvent) {
s.dmlWorker.GetEventChan() <- event
s.dmlWorker.AddDMLEvent(event)
}

func (s *KafkaSink) PassBlockEvent(event commonEvent.BlockEvent) {
event.PostFlush()
}

func (s *KafkaSink) WriteBlockEvent(event commonEvent.BlockEvent) error {
switch event := event.(type) {
switch v := event.(type) {
case *commonEvent.DDLEvent:
if event.TiDBOnly {
if v.TiDBOnly {
// run callback directly and return
event.PostFlush()
v.PostFlush()
return nil
}
err := s.ddlWorker.WriteBlockEvent(s.ctx, event)
err := s.ddlWorker.WriteBlockEvent(s.ctx, v)
if err != nil {
atomic.StoreUint32(&s.isNormal, 0)
return errors.Trace(err)
Expand All @@ -179,7 +183,7 @@ func (s *KafkaSink) WriteBlockEvent(event commonEvent.BlockEvent) error {
}

func (s *KafkaSink) AddCheckpointTs(ts uint64) {
s.ddlWorker.GetCheckpointTsChan() <- ts
s.ddlWorker.AddCheckpoint(ts)
}

func (s *KafkaSink) SetTableSchemaStore(tableSchemaStore *util.TableSchemaStore) {
Expand Down
27 changes: 13 additions & 14 deletions downstreamadapter/worker/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ import (
"context"
"net/url"

"github.com/pingcap/errors"
"github.com/pingcap/ticdc/downstreamadapter/sink/helper"
"github.com/pingcap/ticdc/downstreamadapter/sink/helper/eventrouter"
"github.com/pingcap/ticdc/downstreamadapter/sink/helper/topicmanager"
commonType "github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/common/columnselector"
ticonfig "github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/sink/codec"
"github.com/pingcap/ticdc/pkg/sink/codec/common"
"github.com/pingcap/ticdc/pkg/sink/kafka"
Expand All @@ -47,28 +46,28 @@ type KafkaComponent struct {
func getKafkaSinkComponentWithFactory(ctx context.Context,
changefeedID commonType.ChangeFeedID,
sinkURI *url.URL,
sinkConfig *ticonfig.SinkConfig,
sinkConfig *config.SinkConfig,
factoryCreator kafka.FactoryCreator,
) (KafkaComponent, ticonfig.Protocol, error) {
) (KafkaComponent, config.Protocol, error) {
kafkaComponent := KafkaComponent{}
protocol, err := helper.GetProtocol(utils.GetOrZero(sinkConfig.Protocol))
if err != nil {
return kafkaComponent, ticonfig.ProtocolUnknown, errors.Trace(err)
return kafkaComponent, config.ProtocolUnknown, errors.Trace(err)
}

options := kafka.NewOptions()
if err = options.Apply(changefeedID, sinkURI, sinkConfig); err != nil {
return kafkaComponent, protocol, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
return kafkaComponent, protocol, errors.WrapError(errors.ErrKafkaInvalidConfig, err)
}

kafkaComponent.Factory, err = factoryCreator(ctx, options, changefeedID)
if err != nil {
return kafkaComponent, protocol, cerror.WrapError(cerror.ErrKafkaNewProducer, err)
return kafkaComponent, protocol, errors.WrapError(errors.ErrKafkaNewProducer, err)
}

kafkaComponent.AdminClient, err = kafkaComponent.Factory.AdminClient()
if err != nil {
return kafkaComponent, protocol, cerror.WrapError(cerror.ErrKafkaNewProducer, err)
return kafkaComponent, protocol, errors.WrapError(errors.ErrKafkaNewProducer, err)
}

// We must close adminClient when this func return cause by an error
Expand All @@ -85,7 +84,7 @@ func getKafkaSinkComponentWithFactory(ctx context.Context,
}
// adjust the option configuration before creating the kafka client
if err = kafka.AdjustOptions(ctx, kafkaComponent.AdminClient, options, topic); err != nil {
return kafkaComponent, protocol, cerror.WrapError(cerror.ErrKafkaNewProducer, err)
return kafkaComponent, protocol, errors.WrapError(errors.ErrKafkaNewProducer, err)
}

kafkaComponent.TopicManager, err = topicmanager.GetTopicManagerAndTryCreateTopic(
Expand Down Expand Up @@ -128,8 +127,8 @@ func GetKafkaSinkComponent(
ctx context.Context,
changefeedID commonType.ChangeFeedID,
sinkURI *url.URL,
sinkConfig *ticonfig.SinkConfig,
) (KafkaComponent, ticonfig.Protocol, error) {
sinkConfig *config.SinkConfig,
) (KafkaComponent, config.Protocol, error) {
factoryCreator := kafka.NewSaramaFactory
if utils.GetOrZero(sinkConfig.EnableKafkaSinkV2) {
factoryCreator = v2.NewFactory
Expand All @@ -141,7 +140,7 @@ func GetKafkaSinkComponentForTest(
ctx context.Context,
changefeedID commonType.ChangeFeedID,
sinkURI *url.URL,
sinkConfig *ticonfig.SinkConfig,
) (KafkaComponent, ticonfig.Protocol, error) {
sinkConfig *config.SinkConfig,
) (KafkaComponent, config.Protocol, error) {
return getKafkaSinkComponentWithFactory(ctx, changefeedID, sinkURI, sinkConfig, kafka.NewMockFactory)
}
64 changes: 22 additions & 42 deletions downstreamadapter/worker/kafka_ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,27 @@ import (
"context"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/downstreamadapter/sink/helper/eventrouter"
"github.com/pingcap/ticdc/downstreamadapter/sink/helper/topicmanager"
"github.com/pingcap/ticdc/downstreamadapter/worker/producer"
commonType "github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/pkg/sink/codec/common"
"github.com/pingcap/ticdc/pkg/sink/util"
"go.uber.org/zap"
)

// worker will send messages to the DML producer on a batch basis.
// KafkaDDLWorker handle DDL and checkpoint event
type KafkaDDLWorker struct {
// changeFeedID indicates this sink belongs to which processor(changefeed).
changeFeedID commonType.ChangeFeedID
// protocol indicates the protocol used by this sink.
protocol config.Protocol
checkpointTsChan chan uint64

encoder common.EventEncoder
checkpointTsChan chan uint64
encoder common.EventEncoder
// eventRouter used to route events to the right topic and partition.
eventRouter *eventrouter.EventRouter
// topicManager used to manage topics.
Expand Down Expand Up @@ -75,7 +73,7 @@ func getDDLDispatchRule(protocol config.Protocol) DDLDispatchRule {
return PartitionAll
}

// newWorker creates a new flush worker.
// NewKafkaDDLWorker return a ddl worker instance.
func NewKafkaDDLWorker(
id commonType.ChangeFeedID,
protocol config.Protocol,
Expand All @@ -87,7 +85,6 @@ func NewKafkaDDLWorker(
) *KafkaDDLWorker {
return &KafkaDDLWorker{
changeFeedID: id,
protocol: protocol,
encoder: encoder,
producer: producer,
eventRouter: eventRouter,
Expand All @@ -102,49 +99,27 @@ func (w *KafkaDDLWorker) Run(ctx context.Context) error {
return w.encodeAndSendCheckpointEvents(ctx)
}

func (w *KafkaDDLWorker) GetCheckpointTsChan() chan<- uint64 {
return w.checkpointTsChan
func (w *KafkaDDLWorker) AddCheckpoint(ts uint64) {
w.checkpointTsChan <- ts
}

func (w *KafkaDDLWorker) SetTableSchemaStore(tableSchemaStore *util.TableSchemaStore) {
w.tableSchemaStore = tableSchemaStore
}

func (w *KafkaDDLWorker) WriteBlockEvent(ctx context.Context, event *event.DDLEvent) error {
messages := make([]*common.Message, 0)
topics := make([]string, 0)

// Some ddl event may be multi-events, we need to split it into multiple messages.
// Such as rename table test.table1 to test.table10, test.table2 to test.table20
if event.IsMultiEvents() {
subEvents := event.GetSubEvents()
for _, subEvent := range subEvents {
message, err := w.encoder.EncodeDDLEvent(&subEvent)
if err != nil {
return errors.Trace(err)
}
topic := w.eventRouter.GetTopicForDDL(&subEvent)
messages = append(messages, message)
topics = append(topics, topic)
}
} else {
message, err := w.encoder.EncodeDDLEvent(event)
if err != nil {
return errors.Trace(err)
}
topic := w.eventRouter.GetTopicForDDL(event)
messages = append(messages, message)
topics = append(topics, topic)
}

for i, message := range messages {
topic := topics[i]
partitionNum, err := w.topicManager.GetPartitionNum(ctx, topic)
for _, e := range event.GetEvents() {
message, err := w.encoder.EncodeDDLEvent(e)
if err != nil {
return errors.Trace(err)
}
topic := w.eventRouter.GetTopicForDDL(e)

if w.partitionRule == PartitionAll {
partitionNum, err := w.topicManager.GetPartitionNum(ctx, topic)
if err != nil {
return errors.Trace(err)
}
err = w.statistics.RecordDDLExecution(func() error {
return w.producer.SyncBroadcastMessage(ctx, topic, partitionNum, message)
})
Expand All @@ -171,6 +146,11 @@ func (w *KafkaDDLWorker) encodeAndSendCheckpointEvents(ctx context.Context) erro
metrics.CheckpointTsMessageCount.DeleteLabelValues(w.changeFeedID.Namespace(), w.changeFeedID.Name())
}()

var (
msg *common.Message
partitionNum int32
err error
)
for {
select {
case <-ctx.Done():
Expand All @@ -184,7 +164,7 @@ func (w *KafkaDDLWorker) encodeAndSendCheckpointEvents(ctx context.Context) erro
}
start := time.Now()

msg, err := w.encoder.EncodeCheckpointEvent(ts)
msg, err = w.encoder.EncodeCheckpointEvent(ts)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -198,7 +178,7 @@ func (w *KafkaDDLWorker) encodeAndSendCheckpointEvents(ctx context.Context) erro
// This will be compatible with the old behavior.
if len(tableNames) == 0 {
topic := w.eventRouter.GetDefaultTopic()
partitionNum, err := w.topicManager.GetPartitionNum(ctx, topic)
partitionNum, err = w.topicManager.GetPartitionNum(ctx, topic)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -211,7 +191,7 @@ func (w *KafkaDDLWorker) encodeAndSendCheckpointEvents(ctx context.Context) erro
} else {
topics := w.eventRouter.GetActiveTopics(tableNames)
for _, topic := range topics {
partitionNum, err := w.topicManager.GetPartitionNum(ctx, topic)
partitionNum, err = w.topicManager.GetPartitionNum(ctx, topic)
if err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions downstreamadapter/worker/kafka_ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ func TestWriteCheckpointTs(t *testing.T) {
tableSchemaStore := util.NewTableSchemaStore([]*heartbeatpb.SchemaInfo{}, common.KafkaSinkType)
ddlWorker.SetTableSchemaStore(tableSchemaStore)

ddlWorker.GetCheckpointTsChan() <- 1
ddlWorker.GetCheckpointTsChan() <- 2
ddlWorker.AddCheckpoint(1)
ddlWorker.AddCheckpoint(2)

time.Sleep(1 * time.Second)

Expand Down
Loading

0 comments on commit e1fcd8c

Please sign in to comment.