Skip to content

Commit

Permalink
Revert "sink : downstream support kafka sink (pingcap#274)"
Browse files Browse the repository at this point in the history
This reverts commit c27cb1a.
  • Loading branch information
CharlesCheung96 committed Sep 11, 2024
1 parent 8af397e commit e01852e
Show file tree
Hide file tree
Showing 43 changed files with 154 additions and 4,294 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/flowbehappy/tigate/downstreamadapter/dispatcher"
"github.com/flowbehappy/tigate/downstreamadapter/eventcollector"
"github.com/flowbehappy/tigate/downstreamadapter/sink"
"github.com/flowbehappy/tigate/downstreamadapter/writer"
"github.com/flowbehappy/tigate/eventpb"
"github.com/flowbehappy/tigate/heartbeatpb"
"github.com/flowbehappy/tigate/pkg/common"
Expand Down Expand Up @@ -130,12 +131,13 @@ func NewEventDispatcherManager(changefeedID model.ChangeFeedID,
}

func (e *EventDispatcherManager) InitSink() error {
sink, err := sink.NewSink(e.config, e.changefeedID)
cfg, db, err := writer.NewMysqlConfigAndDB(e.config.SinkURI)
if err != nil {
log.Error("create sink failed", zap.Error(err))
log.Error("create mysql sink failed", zap.Error(err))
return err
}
e.sink = sink

e.sink = sink.NewMysqlSink(e.changefeedID, 16, cfg, db)
return nil
}

Expand Down
42 changes: 19 additions & 23 deletions downstreamadapter/sink/helper/eventrouter/event_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import (
"github.com/flowbehappy/tigate/downstreamadapter/sink/helper/eventrouter/partition"
"github.com/flowbehappy/tigate/downstreamadapter/sink/helper/eventrouter/topic"
"github.com/flowbehappy/tigate/pkg/common"
ticonfig "github.com/flowbehappy/tigate/pkg/config"
"github.com/pingcap/log"
tableFilter "github.com/pingcap/tidb/pkg/util/table-filter"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
)
Expand All @@ -38,11 +38,11 @@ type EventRouter struct {
}

// NewEventRouter creates a new EventRouter.
func NewEventRouter(sinkConfig *ticonfig.SinkConfig, protocol config.Protocol, defaultTopic, scheme string) (*EventRouter, error) {
func NewEventRouter(cfg *config.ReplicaConfig, protocol config.Protocol, defaultTopic, scheme string) (*EventRouter, error) {
// If an event does not match any dispatching rules in the config file,
// it will be dispatched by the default partition dispatcher and
// static topic dispatcher because it matches *.* rule.
ruleConfigs := append(sinkConfig.DispatchRules, &ticonfig.DispatchRule{
ruleConfigs := append(cfg.Sink.DispatchRules, &config.DispatchRule{
Matcher: []string{"*.*"},
PartitionRule: "default",
TopicRule: "",
Expand All @@ -55,7 +55,7 @@ func NewEventRouter(sinkConfig *ticonfig.SinkConfig, protocol config.Protocol, d
if err != nil {
return nil, cerror.WrapError(cerror.ErrFilterRuleInvalid, err, ruleConfig.Matcher)
}
if !sinkConfig.CaseSensitive {
if !cfg.CaseSensitive {
f = tableFilter.CaseInsensitive(f)
}

Expand All @@ -81,25 +81,21 @@ func (s *EventRouter) GetTopicForRowChange(tableInfo *common.TableInfo) string {
}

// GetTopicForDDL returns the target topic for DDL.
func (s *EventRouter) GetTopicForDDL(ddl *common.DDLEvent) string {
schema := ddl.Job.SchemaName
table := ddl.Job.TableName

// TODO: fix this
//var schema, table string
// if ddl.PreTableInfo != nil {
// if ddl.PreTableInfo.TableName.Table == "" {
// return s.defaultTopic
// }
// schema = ddl.PreTableInfo.TableName.Schema
// table = ddl.PreTableInfo.TableName.Table
// } else {
// if ddl.TableInfo.TableName.Table == "" {
// return s.defaultTopic
// }
// schema = ddl.TableInfo.TableName.Schema
// table = ddl.TableInfo.TableName.Table
// }
func (s *EventRouter) GetTopicForDDL(ddl *model.DDLEvent) string {
var schema, table string
if ddl.PreTableInfo != nil {
if ddl.PreTableInfo.TableName.Table == "" {
return s.defaultTopic
}
schema = ddl.PreTableInfo.TableName.Schema
table = ddl.PreTableInfo.TableName.Table
} else {
if ddl.TableInfo.TableName.Table == "" {
return s.defaultTopic
}
schema = ddl.TableInfo.TableName.Schema
table = ddl.TableInfo.TableName.Table
}

topicGenerator := s.matchTopicGenerator(schema, table)
return topicGenerator.Substitute(schema, table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"sync"
"time"

tikafka "github.com/flowbehappy/tigate/pkg/sink/kafka"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
Expand All @@ -45,7 +44,7 @@ type kafkaTopicManager struct {

admin kafka.ClusterAdminClient

cfg *tikafka.AutoCreateTopicConfig
cfg *kafka.AutoCreateTopicConfig

topics sync.Map

Expand All @@ -60,7 +59,7 @@ func GetTopicManagerAndTryCreateTopic(
ctx context.Context,
changefeedID model.ChangeFeedID,
topic string,
topicCfg *tikafka.AutoCreateTopicConfig,
topicCfg *kafka.AutoCreateTopicConfig,
adminClient kafka.ClusterAdminClient,
) (TopicManager, error) {
topicManager := newKafkaTopicManager(
Expand All @@ -80,7 +79,7 @@ func newKafkaTopicManager(
defaultTopic string,
changefeedID model.ChangeFeedID,
admin kafka.ClusterAdminClient,
cfg *tikafka.AutoCreateTopicConfig,
cfg *kafka.AutoCreateTopicConfig,
) *kafkaTopicManager {
mgr := &kafkaTopicManager{
defaultTopic: defaultTopic,
Expand Down
51 changes: 15 additions & 36 deletions downstreamadapter/sink/kafka_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,17 @@ import (
"github.com/flowbehappy/tigate/downstreamadapter/worker"
"github.com/flowbehappy/tigate/downstreamadapter/worker/dmlproducer"
"github.com/flowbehappy/tigate/pkg/common"
ticonfig "github.com/flowbehappy/tigate/pkg/config"
"github.com/flowbehappy/tigate/pkg/sink/codec"
"github.com/flowbehappy/tigate/pkg/sink/kafka"
v2 "github.com/flowbehappy/tigate/pkg/sink/kafka/v2"
tiutils "github.com/flowbehappy/tigate/pkg/sink/util"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/ddlsink/mq/ddlproducer"
timetrics "github.com/pingcap/tiflow/cdc/sink/metrics"
"github.com/pingcap/tiflow/cdc/sink/util"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink"
tikafka "github.com/pingcap/tiflow/pkg/sink/kafka"
"github.com/pingcap/tiflow/pkg/sink/kafka"
v2 "github.com/pingcap/tiflow/pkg/sink/kafka/v2"
utils "github.com/pingcap/tiflow/pkg/util"
"go.uber.org/atomic"
"go.uber.org/zap"
Expand All @@ -55,35 +51,34 @@ type KafkaSink struct {
// It is also responsible for creating topics.
topicManager topicmanager.TopicManager

dmlWorker *worker.KafkaWorker
ddlWorker *worker.KafkaDDLWorker
adminClient tikafka.ClusterAdminClient
worker *worker.KafkaWorker
adminClient kafka.ClusterAdminClient
scheme string

ctx context.Context
cancel context.CancelCauseFunc // todo?
}

func NewKafkaSink(changefeedID model.ChangeFeedID, sinkURI *url.URL, sinkConfig *ticonfig.SinkConfig) (*KafkaSink, error) {
func NewKafkaSink(changefeedID model.ChangeFeedID, sinkURI *url.URL, replicaConfig *config.ReplicaConfig) (*KafkaSink, error) {
ctx, cancel := context.WithCancelCause(context.Background())
topic, err := util.GetTopic(sinkURI)
if err != nil {
return nil, errors.Trace(err)
}
scheme := sink.GetScheme(sinkURI)
protocol, err := util.GetProtocol(utils.GetOrZero(sinkConfig.Protocol))
protocol, err := util.GetProtocol(utils.GetOrZero(replicaConfig.Sink.Protocol))
if err != nil {
return nil, errors.Trace(err)
}

options := kafka.NewOptions()
if err := options.Apply(changefeedID, sinkURI, sinkConfig); err != nil {
if err := options.Apply(changefeedID, sinkURI, replicaConfig); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}

// todo
factoryCreator := kafka.NewSaramaFactory
if utils.GetOrZero(sinkConfig.EnableKafkaSinkV2) {
if utils.GetOrZero(replicaConfig.Sink.EnableKafkaSinkV2) {
factoryCreator = v2.NewFactory
}

Expand All @@ -97,17 +92,17 @@ func NewKafkaSink(changefeedID model.ChangeFeedID, sinkURI *url.URL, sinkConfig
return nil, cerror.WrapError(cerror.ErrKafkaNewProducer, err)
}

eventRouter, err := eventrouter.NewEventRouter(sinkConfig, protocol, topic, scheme)
eventRouter, err := eventrouter.NewEventRouter(replicaConfig, protocol, topic, scheme)
if err != nil {
return nil, errors.Trace(err)
}

columnSelector, err := common.NewColumnSelectors(sinkConfig)
columnSelector, err := common.NewColumnSelectors(replicaConfig)
if err != nil {
return nil, errors.Trace(err)
}

encoderConfig, err := tiutils.GetEncoderConfig(changefeedID, sinkURI, protocol, sinkConfig, options.MaxMessageBytes)
encoderConfig, err := util.GetEncoderConfig(changefeedID, sinkURI, protocol, replicaConfig, options.MaxMessageBytes)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -120,10 +115,10 @@ func NewKafkaSink(changefeedID model.ChangeFeedID, sinkURI *url.URL, sinkConfig

metricsCollector := factory.MetricsCollector(utils.RoleProcessor, adminClient)
dmlProducer := dmlproducer.NewKafkaDMLProducer(ctx, changefeedID, asyncProducer, metricsCollector)
encoderGroup := codec.NewEncoderGroup(ctx, sinkConfig, encoderConfig, changefeedID)
encoderGroup := codec.NewEncoderGroup(ctx, replicaConfig.Sink, encoderConfig, changefeedID)

statistics := timetrics.NewStatistics(changefeedID, sink.RowSink)
dmlWorker := worker.NewKafkaWorker(changefeedID, protocol, dmlProducer, encoderGroup, statistics)
worker := worker.NewKafkaWorker(changefeedID, protocol, dmlProducer, encoderGroup, statistics)

topicManager, err := topicmanager.GetTopicManagerAndTryCreateTopic(
ctx,
Expand All @@ -139,27 +134,13 @@ func NewKafkaSink(changefeedID model.ChangeFeedID, sinkURI *url.URL, sinkConfig
}
return nil, err
}

encoder, err := codec.NewEventEncoder(ctx, encoderConfig)
if err != nil {
return nil, errors.Trace(err)
}

syncProducer, err := factory.SyncProducer(ctx)
if err != nil {
return nil, errors.Trace(err)
}
ddlProducer := ddlproducer.NewKafkaDDLProducer(ctx, changefeedID, syncProducer)
ddlWorker := worker.NewKafkaDDLWorker(changefeedID, protocol, ddlProducer, encoder, statistics)

return &KafkaSink{
ctx: ctx,
cancel: cancel,
topicManager: topicManager,
eventRouter: eventRouter,
columnSelector: columnSelector,
dmlWorker: dmlWorker,
ddlWorker: ddlWorker,
worker: worker,
}, nil
}

Expand Down Expand Up @@ -206,7 +187,7 @@ func (s *KafkaSink) AddDMLEvent(event *common.DMLEvent, tableProgress *types.Tab
return
}

s.dmlWorker.GetEventChan() <- &common.MQRowEvent{
s.worker.GetEventChan() <- &common.MQRowEvent{
Key: model.TopicPartitionKey{
Topic: topic,
Partition: index,
Expand All @@ -230,8 +211,6 @@ func (s *KafkaSink) PassDDLAndSyncPointEvent(event *common.DDLEvent, tableProgre
}

func (s *KafkaSink) AddDDLAndSyncPointEvent(event *common.DDLEvent, tableProgress *types.TableProgress) {
tableProgress.Add(event)
s.ddlWorker.GetEventChan() <- event
}

func (s *KafkaSink) Close() {
Expand Down
29 changes: 0 additions & 29 deletions downstreamadapter/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,8 @@
package sink

import (
"net/url"

"github.com/flowbehappy/tigate/downstreamadapter/sink/types"
"github.com/flowbehappy/tigate/downstreamadapter/writer"
"github.com/flowbehappy/tigate/pkg/common"
"github.com/flowbehappy/tigate/pkg/config"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink"
"go.uber.org/zap"
)

type Sink interface {
Expand All @@ -38,23 +29,3 @@ type Sink interface {
// GetCheckpointTs(tableSpan *common.TableSpan) (uint64, bool)
Close()
}

func NewSink(config *config.ChangefeedConfig, changefeedID model.ChangeFeedID) (Sink, error) {
sinkURI, err := url.Parse(config.SinkURI)
if err != nil {
return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}
scheme := sink.GetScheme(sinkURI)
switch scheme {
case sink.MySQLScheme, sink.MySQLSSLScheme, sink.TiDBScheme, sink.TiDBSSLScheme:
cfg, db, err := writer.NewMysqlConfigAndDB(sinkURI)
if err != nil {
log.Error("create mysql sink failed", zap.Error(err))
return nil, err
}
return NewMysqlSink(changefeedID, 16, cfg, db), nil
case sink.KafkaScheme, sink.KafkaSSLScheme:
return NewKafkaSink(changefeedID, sinkURI, config.SinkConfig)
}
return nil, nil
}
Loading

0 comments on commit e01852e

Please sign in to comment.