diff --git a/src/main/java/io/debezium/connector/vitess/VitessChangeEventSourceFactory.java b/src/main/java/io/debezium/connector/vitess/VitessChangeEventSourceFactory.java index 3a71cf7f..3804997f 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessChangeEventSourceFactory.java +++ b/src/main/java/io/debezium/connector/vitess/VitessChangeEventSourceFactory.java @@ -15,6 +15,7 @@ import io.debezium.pipeline.source.spi.SnapshotProgressListener; import io.debezium.pipeline.source.spi.StreamingChangeEventSource; import io.debezium.relational.TableId; +import io.debezium.snapshot.SnapshotterService; import io.debezium.util.Clock; /** @@ -29,6 +30,7 @@ public class VitessChangeEventSourceFactory implements ChangeEventSourceFactory< private final Clock clock; private final VitessDatabaseSchema schema; private final ReplicationConnection replicationConnection; + private final SnapshotterService snapshotterService; public VitessChangeEventSourceFactory( VitessConnectorConfig connectorConfig, @@ -36,13 +38,14 @@ public VitessChangeEventSourceFactory( EventDispatcher dispatcher, Clock clock, VitessDatabaseSchema schema, - ReplicationConnection replicationConnection) { + ReplicationConnection replicationConnection, SnapshotterService snapshotterService) { this.connectorConfig = connectorConfig; this.errorHandler = errorHandler; this.dispatcher = dispatcher; this.clock = clock; this.schema = schema; this.replicationConnection = replicationConnection; + this.snapshotterService = snapshotterService; } @Override @@ -50,7 +53,14 @@ public SnapshotChangeEventSource getSnapsh NotificationService notificationService) { // A dummy SnapshotChangeEventSource, snapshot is skipped. return new VitessSnapshotChangeEventSource( - connectorConfig, new DefaultMainConnectionProvidingConnectionFactory<>(() -> null), dispatcher, schema, clock, null, notificationService); + connectorConfig, + new DefaultMainConnectionProvidingConnectionFactory<>(() -> null), + dispatcher, + schema, + clock, + null, + notificationService, + snapshotterService); } @Override diff --git a/src/main/java/io/debezium/connector/vitess/VitessConnectorTask.java b/src/main/java/io/debezium/connector/vitess/VitessConnectorTask.java index fbc84132..28900359 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessConnectorTask.java +++ b/src/main/java/io/debezium/connector/vitess/VitessConnectorTask.java @@ -18,6 +18,7 @@ import org.slf4j.LoggerFactory; import io.debezium.annotation.VisibleForTesting; +import io.debezium.bean.StandardBeanNames; import io.debezium.config.CommonConnectorConfig; import io.debezium.config.Configuration; import io.debezium.config.Field; @@ -36,6 +37,7 @@ import io.debezium.relational.TableId; import io.debezium.schema.SchemaFactory; import io.debezium.schema.SchemaNameAdjuster; +import io.debezium.snapshot.SnapshotterService; import io.debezium.spi.topic.TopicNamingStrategy; import io.debezium.util.Clock; import io.debezium.util.LoggingContext; @@ -74,9 +76,16 @@ protected ChangeEventSourceCoordinator sta // Mapped Diagnostic Context (MDC) logging LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME); + // Manual Bean Registration + connectorConfig.getBeanRegistry().add(StandardBeanNames.CONFIGURATION, config); + connectorConfig.getBeanRegistry().add(StandardBeanNames.CONNECTOR_CONFIG, connectorConfig); + connectorConfig.getBeanRegistry().add(StandardBeanNames.DATABASE_SCHEMA, schema); + // Service providers registerServiceProviders(connectorConfig.getServiceRegistry()); + final SnapshotterService snapshotterService = connectorConfig.getServiceRegistry().tryGetService(SnapshotterService.class); + try { if (previousOffset == null) { LOGGER.info("No previous offset found"); @@ -119,12 +128,13 @@ protected ChangeEventSourceCoordinator sta VitessConnector.class, connectorConfig, new VitessChangeEventSourceFactory( - connectorConfig, errorHandler, dispatcher, clock, schema, replicationConnection), + connectorConfig, errorHandler, dispatcher, clock, schema, replicationConnection, snapshotterService), connectorConfig.offsetStoragePerTask() ? new VitessChangeEventSourceMetricsFactory() : new DefaultChangeEventSourceMetricsFactory<>(), dispatcher, schema, null, - notificationService); + notificationService, + snapshotterService); coordinator.start(taskContext, this.queue, metadataProvider); diff --git a/src/main/java/io/debezium/connector/vitess/VitessSnapshotChangeEventSource.java b/src/main/java/io/debezium/connector/vitess/VitessSnapshotChangeEventSource.java index 9bdb77b4..a238c387 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessSnapshotChangeEventSource.java +++ b/src/main/java/io/debezium/connector/vitess/VitessSnapshotChangeEventSource.java @@ -21,6 +21,7 @@ import io.debezium.relational.Table; import io.debezium.relational.TableId; import io.debezium.schema.SchemaChangeEvent; +import io.debezium.snapshot.SnapshotterService; import io.debezium.util.Clock; /** Always skip snapshot for now */ @@ -33,7 +34,7 @@ public VitessSnapshotChangeEventSource( VitessDatabaseSchema schema, Clock clock, SnapshotProgressListener snapshotProgressListener, - NotificationService notificationService) { + NotificationService notificationService, SnapshotterService snapshotterService) { super( connectorConfig, connectionFactory, @@ -41,7 +42,8 @@ public VitessSnapshotChangeEventSource( dispatcher, clock, snapshotProgressListener, - notificationService); + notificationService, + snapshotterService); } @Override