Skip to content

Commit

Permalink
DBZ-8325 Emit DDL events
Browse files Browse the repository at this point in the history
  • Loading branch information
twthorn committed Oct 15, 2024
1 parent 2cf4caa commit dfc2008
Show file tree
Hide file tree
Showing 33 changed files with 727 additions and 73 deletions.
13 changes: 13 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,19 @@
<artifactId>debezium-revapi</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-binlog</artifactId>
<version>${version.debezium}</version>
</dependency>


<!-- Testing -->
<dependency>
<groupId>ch.qos.logback</groupId>
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/io/debezium/connector/vitess/SourceInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,8 @@ public String toString() {
+ restartVgtid
+ '}';
}

public String table() {
return tableId == null ? null : tableId.table();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
package io.debezium.connector.vitess;

import io.debezium.connector.vitess.connection.ReplicationConnection;
import io.debezium.jdbc.DefaultMainConnectionProvidingConnectionFactory;
import io.debezium.connector.vitess.jdbc.VitessConnection;
import io.debezium.jdbc.MainConnectionProvidingConnectionFactory;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
Expand All @@ -31,15 +32,18 @@ public class VitessChangeEventSourceFactory implements ChangeEventSourceFactory<
private final VitessDatabaseSchema schema;
private final ReplicationConnection replicationConnection;
private final SnapshotterService snapshotterService;
private final MainConnectionProvidingConnectionFactory<VitessConnection> connectionFactory;

public VitessChangeEventSourceFactory(
VitessConnectorConfig connectorConfig,
MainConnectionProvidingConnectionFactory<VitessConnection> connectionFactory,
ErrorHandler errorHandler,
EventDispatcher<VitessPartition, TableId> dispatcher,
Clock clock,
VitessDatabaseSchema schema,
ReplicationConnection replicationConnection, SnapshotterService snapshotterService) {
this.connectorConfig = connectorConfig;
this.connectionFactory = connectionFactory;
this.errorHandler = errorHandler;
this.dispatcher = dispatcher;
this.clock = clock;
Expand All @@ -54,11 +58,11 @@ public SnapshotChangeEventSource<VitessPartition, VitessOffsetContext> getSnapsh
// A dummy SnapshotChangeEventSource, snapshot is skipped.
return new VitessSnapshotChangeEventSource(
connectorConfig,
new DefaultMainConnectionProvidingConnectionFactory<>(() -> null),
this.connectionFactory,
dispatcher,
schema,
clock,
null,
snapshotProgressListener,
notificationService,
snapshotterService);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.debezium.config.Field;
import io.debezium.config.Field.ValidationOutput;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.connector.mysql.charset.MySqlCharsetRegistryServiceProvider;
import io.debezium.connector.vitess.connection.VitessTabletType;
import io.debezium.connector.vitess.pipeline.txmetadata.ShardEpochMap;
import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionMetadataFactory;
Expand All @@ -39,22 +40,51 @@
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.ColumnFilterMode;
import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.Collect;

/**
* Vitess connector configuration, including its specific configurations and the common
* configurations from Debezium.
*/
public class VitessConnectorConfig extends RelationalDatabaseConnectorConfig {
public class VitessConnectorConfig extends HistorizedRelationalDatabaseConnectorConfig {

public static final String CSV_DELIMITER = ",";

private static final Logger LOGGER = LoggerFactory.getLogger(VitessConnectorConfig.class);

private static final String VITESS_CONFIG_GROUP_PREFIX = "vitess.";
private static final int DEFAULT_VTGATE_PORT = 15_991;
private static final int DEFAULT_VTGATE_JDBC_PORT = 15_306;

/**
* Set of all built-in database names that will generally be ignored by the connector.
*/
protected static final Set<String> BUILT_IN_DB_NAMES = Collect.unmodifiableSet(
"mysql", "performance_schema", "sys", "information_schema");

@Override
public JdbcConfiguration getJdbcConfig() {
JdbcConfiguration jdbcConfiguration = super.getJdbcConfig();
JdbcConfiguration updatedConfig = JdbcConfiguration.adapt(jdbcConfiguration.edit()
.with(JdbcConfiguration.DATABASE, getKeyspace())
.with(JdbcConfiguration.PORT, getVtgateJdbcPort())
.build());
return updatedConfig;
}

@Override
protected HistoryRecordComparator getHistoryRecordComparator() {
return new HistoryRecordComparator() {

};
}

/**
* The set of predefined SnapshotMode options or aliases.
Expand All @@ -66,6 +96,8 @@ public enum SnapshotMode implements EnumeratedValue {
*/
INITIAL("initial"),

NO_DATA("no_data"),

/**
* Never perform an initial snapshot and only receive new data changes.
*/
Expand Down Expand Up @@ -203,6 +235,15 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue
.withValidation(Field::isInteger)
.withDescription("Port of the Vitess VTGate gRPC server.");

public static final Field VTGATE_JDBC_PORT = Field.create(DATABASE_CONFIG_PREFIX + "jdbc." + JdbcConfiguration.PORT)
.withDisplayName("Vitess JDBC database port")
.withType(Type.INT)
.withWidth(Width.SHORT)
.withDefault(DEFAULT_VTGATE_JDBC_PORT)
.withImportance(ConfigDef.Importance.HIGH)
.withValidation(Field::isInteger)
.withDescription("Port of the Vitess VTGate JDBC server.");

public static final Field VTGATE_USER = Field.create(DATABASE_CONFIG_PREFIX + JdbcConfiguration.USER)
.withDisplayName("User")
.withType(Type.STRING)
Expand Down Expand Up @@ -470,6 +511,7 @@ private static int validateTimePrecisionMode(Configuration config, Field field,
GTID,
VTGATE_HOST,
VTGATE_PORT,
VTGATE_JDBC_PORT,
VTGATE_USER,
VTGATE_PASSWORD,
TABLET_TYPE,
Expand Down Expand Up @@ -536,11 +578,16 @@ public static ConfigDef configDef() {

public VitessConnectorConfig(Configuration config) {
super(
VitessConnector.class,
config,
null, x -> x.schema() + "." + x.table(),
Tables.TableFilter.fromPredicate(VitessConnectorConfig::isNotBuiltInTable),
x -> x.schema() + "." + x.table(),
true,
-1,
ColumnFilterMode.SCHEMA,
true);
false);

getServiceRegistry().registerServiceProvider(new MySqlCharsetRegistryServiceProvider());
}

@Override
Expand Down Expand Up @@ -649,6 +696,10 @@ public int getVtgatePort() {
return getConfig().getInteger(VTGATE_PORT);
}

public int getVtgateJdbcPort() {
return getConfig().getInteger(VTGATE_JDBC_PORT);
}

public String getVtgateUsername() {
return getConfig().getString(VTGATE_USER);
}
Expand Down Expand Up @@ -754,6 +805,29 @@ public Heartbeat createHeartbeat(TopicNamingStrategy topicNamingStrategy, Schema
return new VitessHeartbeatImpl(getHeartbeatInterval(), topicNamingStrategy.heartbeatTopic(), getLogicalName(), schemaNameAdjuster);
}

/**
* Checks whether the {@link TableId} refers to a built-in table.
*
* @param tableId the relational table identifier, should not be null
* @return true if the reference refers to a built-in table
*/
public static boolean isNotBuiltInTable(TableId tableId) {
return !isBuiltInDatabase(tableId.catalog());
}

/**
* Check whether the specified database name is a built-in database.
*
* @param databaseName the database name to check
* @return true if the database is a built-in database; false otherwise
*/
public static boolean isBuiltInDatabase(String databaseName) {
if (databaseName == null) {
return false;
}
return BUILT_IN_DB_NAMES.contains(databaseName.toLowerCase());
}

public BigIntUnsignedHandlingMode getBigIntUnsgnedHandlingMode() {
return BigIntUnsignedHandlingMode.parse(getConfig().getString(BIGINT_UNSIGNED_HANDLING_MODE),
BIGINT_UNSIGNED_HANDLING_MODE.defaultValueAsString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.connector.vitess.connection.ReplicationConnection;
import io.debezium.connector.vitess.connection.VitessReplicationConnection;
import io.debezium.connector.vitess.jdbc.VitessConnection;
import io.debezium.connector.vitess.metrics.VitessChangeEventSourceMetricsFactory;
import io.debezium.jdbc.DefaultMainConnectionProvidingConnectionFactory;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.MainConnectionProvidingConnectionFactory;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
Expand All @@ -37,6 +41,7 @@
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.service.spi.ServiceRegistry;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.snapshot.SnapshotterServiceProvider;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.Clock;
import io.debezium.util.LoggingContext;
Expand Down Expand Up @@ -121,13 +126,18 @@ protected ChangeEventSourceCoordinator<VitessPartition, VitessOffsetContext> sta
NotificationService<VitessPartition, VitessOffsetContext> notificationService = new NotificationService<>(getNotificationChannels(),
connectorConfig, SchemaFactory.get(), dispatcher::enqueueNotification);

JdbcConfiguration jdbcConfig = connectorConfig.getJdbcConfig();

MainConnectionProvidingConnectionFactory<VitessConnection> connectionFactory = new DefaultMainConnectionProvidingConnectionFactory<>(
() -> new VitessConnection(jdbcConfig));

ChangeEventSourceCoordinator<VitessPartition, VitessOffsetContext> coordinator = new ChangeEventSourceCoordinator<>(
previousOffsets,
errorHandler,
VitessConnector.class,
connectorConfig,
new VitessChangeEventSourceFactory(
connectorConfig, errorHandler, dispatcher, clock, schema, replicationConnection, snapshotterService),
connectorConfig, connectionFactory, errorHandler, dispatcher, clock, schema, replicationConnection, snapshotterService),
connectorConfig.offsetStoragePerTask() ? new VitessChangeEventSourceMetricsFactory() : new DefaultChangeEventSourceMetricsFactory<>(),
dispatcher,
schema,
Expand Down Expand Up @@ -225,6 +235,7 @@ protected Iterable<Field> getAllConfigurationFields() {
@Override
protected void registerServiceProviders(ServiceRegistry serviceRegistry) {

serviceRegistry.registerServiceProvider(new SnapshotterServiceProvider());
serviceRegistry.registerServiceProvider(new PostProcessorRegistryServiceProvider());
}
}
Loading

0 comments on commit dfc2008

Please sign in to comment.