Skip to content

Commit

Permalink
DBZ-7301 Inject the SnapshotterService
Browse files Browse the repository at this point in the history
  • Loading branch information
mfvitale authored and jpechane committed Feb 6, 2024
1 parent 1af5e85 commit e12c0e0
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.util.Clock;

/**
Expand All @@ -29,28 +30,37 @@ public class VitessChangeEventSourceFactory implements ChangeEventSourceFactory<
private final Clock clock;
private final VitessDatabaseSchema schema;
private final ReplicationConnection replicationConnection;
private final SnapshotterService snapshotterService;

public VitessChangeEventSourceFactory(
VitessConnectorConfig connectorConfig,
ErrorHandler errorHandler,
EventDispatcher<VitessPartition, TableId> dispatcher,
Clock clock,
VitessDatabaseSchema schema,
ReplicationConnection replicationConnection) {
ReplicationConnection replicationConnection, SnapshotterService snapshotterService) {
this.connectorConfig = connectorConfig;
this.errorHandler = errorHandler;
this.dispatcher = dispatcher;
this.clock = clock;
this.schema = schema;
this.replicationConnection = replicationConnection;
this.snapshotterService = snapshotterService;
}

@Override
public SnapshotChangeEventSource<VitessPartition, VitessOffsetContext> getSnapshotChangeEventSource(SnapshotProgressListener<VitessPartition> snapshotProgressListener,
NotificationService<VitessPartition, VitessOffsetContext> notificationService) {
// A dummy SnapshotChangeEventSource, snapshot is skipped.
return new VitessSnapshotChangeEventSource(
connectorConfig, new DefaultMainConnectionProvidingConnectionFactory<>(() -> null), dispatcher, schema, clock, null, notificationService);
connectorConfig,
new DefaultMainConnectionProvidingConnectionFactory<>(() -> null),
dispatcher,
schema,
clock,
null,
notificationService,
snapshotterService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.slf4j.LoggerFactory;

import io.debezium.annotation.VisibleForTesting;
import io.debezium.bean.StandardBeanNames;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
Expand All @@ -36,6 +37,7 @@
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaFactory;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.Clock;
import io.debezium.util.LoggingContext;
Expand Down Expand Up @@ -74,9 +76,16 @@ protected ChangeEventSourceCoordinator<VitessPartition, VitessOffsetContext> sta
// Mapped Diagnostic Context (MDC) logging
LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME);

// Manual Bean Registration
connectorConfig.getBeanRegistry().add(StandardBeanNames.CONFIGURATION, config);
connectorConfig.getBeanRegistry().add(StandardBeanNames.CONNECTOR_CONFIG, connectorConfig);
connectorConfig.getBeanRegistry().add(StandardBeanNames.DATABASE_SCHEMA, schema);

// Service providers
registerServiceProviders(connectorConfig.getServiceRegistry());

final SnapshotterService snapshotterService = connectorConfig.getServiceRegistry().tryGetService(SnapshotterService.class);

try {
if (previousOffset == null) {
LOGGER.info("No previous offset found");
Expand Down Expand Up @@ -119,12 +128,13 @@ protected ChangeEventSourceCoordinator<VitessPartition, VitessOffsetContext> sta
VitessConnector.class,
connectorConfig,
new VitessChangeEventSourceFactory(
connectorConfig, errorHandler, dispatcher, clock, schema, replicationConnection),
connectorConfig, errorHandler, dispatcher, clock, schema, replicationConnection, snapshotterService),
connectorConfig.offsetStoragePerTask() ? new VitessChangeEventSourceMetricsFactory() : new DefaultChangeEventSourceMetricsFactory<>(),
dispatcher,
schema,
null,
notificationService);
notificationService,
snapshotterService);

coordinator.start(taskContext, this.queue, metadataProvider);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.util.Clock;

/** Always skip snapshot for now */
Expand All @@ -33,15 +34,16 @@ public VitessSnapshotChangeEventSource(
VitessDatabaseSchema schema,
Clock clock,
SnapshotProgressListener<VitessPartition> snapshotProgressListener,
NotificationService<VitessPartition, VitessOffsetContext> notificationService) {
NotificationService<VitessPartition, VitessOffsetContext> notificationService, SnapshotterService snapshotterService) {
super(
connectorConfig,
connectionFactory,
schema,
dispatcher,
clock,
snapshotProgressListener,
notificationService);
notificationService,
snapshotterService);
}

@Override
Expand Down

0 comments on commit e12c0e0

Please sign in to comment.