diff --git a/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java b/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java index 27fef4a6..dd8e2fdd 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java +++ b/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java @@ -23,6 +23,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.debezium.annotation.VisibleForTesting; import io.debezium.config.CommonConnectorConfig; import io.debezium.config.ConfigDefinition; import io.debezium.config.Configuration; @@ -60,8 +61,11 @@ public class VitessConnectorConfig extends HistorizedRelationalDatabaseConnector private static final Logger LOGGER = LoggerFactory.getLogger(VitessConnectorConfig.class); private static final String VITESS_CONFIG_GROUP_PREFIX = "vitess."; + private static final String JDBC_CONFIG_PREFIX = "jdbc."; private static final int DEFAULT_VTGATE_PORT = 15_991; - private static final int DEFAULT_VTGATE_JDBC_PORT = 15_306; + + @VisibleForTesting + protected static final int DEFAULT_VTGATE_JDBC_PORT = 15_306; /** * Set of all built-in database names that will generally be ignored by the connector. @@ -73,6 +77,8 @@ public class VitessConnectorConfig extends HistorizedRelationalDatabaseConnector public JdbcConfiguration getJdbcConfig() { JdbcConfiguration jdbcConfiguration = super.getJdbcConfig(); JdbcConfiguration updatedConfig = JdbcConfiguration.adapt(jdbcConfiguration.edit() + .with(JdbcConfiguration.USER, getVtgateJdbcUsername()) + .with(JdbcConfiguration.PASSWORD, getVtgateJdbcPassword()) .with(JdbcConfiguration.DATABASE, getKeyspace()) .with(JdbcConfiguration.PORT, getVtgateJdbcPort()) .build()); @@ -100,6 +106,8 @@ public enum SnapshotMode implements EnumeratedValue { CONFIGURATION_BASED("configuration_based"), + RECOVERY("recovery"), + /** * Never perform an initial snapshot and only receive new data changes. */ @@ -237,15 +245,29 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue .withValidation(Field::isInteger) .withDescription("Port of the Vitess VTGate gRPC server."); - public static final Field VTGATE_JDBC_PORT = Field.create(DATABASE_CONFIG_PREFIX + "jdbc." + JdbcConfiguration.PORT) + public static final Field VTGATE_JDBC_PORT = Field.create(DATABASE_CONFIG_PREFIX + JDBC_CONFIG_PREFIX + JdbcConfiguration.PORT) .withDisplayName("Vitess JDBC database port") .withType(Type.INT) .withWidth(Width.SHORT) .withDefault(DEFAULT_VTGATE_JDBC_PORT) - .withImportance(ConfigDef.Importance.HIGH) + .withImportance(ConfigDef.Importance.MEDIUM) .withValidation(Field::isInteger) .withDescription("Port of the Vitess VTGate JDBC server."); + public static final Field VTGATE_JDBC_USER = Field.create(DATABASE_CONFIG_PREFIX + JDBC_CONFIG_PREFIX + JdbcConfiguration.USER) + .withDisplayName("Vitess JDBC database user") + .withType(Type.INT) + .withWidth(Width.SHORT) + .withImportance(ConfigDef.Importance.MEDIUM) + .withDescription("Username of the Vitess VTGate JDBC server."); + + public static final Field VTGATE_JDBC_PASSWORD = Field.create(DATABASE_CONFIG_PREFIX + JDBC_CONFIG_PREFIX + JdbcConfiguration.PASSWORD) + .withDisplayName("Vitess JDBC database password") + .withType(Type.INT) + .withWidth(Width.SHORT) + .withImportance(ConfigDef.Importance.MEDIUM) + .withDescription("Password of the Vitess VTGate JDBC server."); + public static final Field VTGATE_USER = Field.create(DATABASE_CONFIG_PREFIX + JdbcConfiguration.USER) .withDisplayName("User") .withType(Type.STRING) @@ -715,10 +737,30 @@ public String getVtgateUsername() { return getConfig().getString(VTGATE_USER); } + public String getVtgateJdbcUsername() { + String jdbcUsername = getConfig().getString(VTGATE_JDBC_USER); + if (jdbcUsername != null) { + return jdbcUsername; + } + else { + return getConfig().getString(VTGATE_USER); + } + } + public String getVtgatePassword() { return getConfig().getString(VTGATE_PASSWORD); } + public String getVtgateJdbcPassword() { + String jdbcPassword = getConfig().getString(VTGATE_JDBC_PASSWORD); + if (jdbcPassword != null) { + return jdbcPassword; + } + else { + return getConfig().getString(VTGATE_PASSWORD); + } + } + public String getTabletType() { return getConfig().getString(TABLET_TYPE); } diff --git a/src/test/java/io/debezium/connector/vitess/TestHelper.java b/src/test/java/io/debezium/connector/vitess/TestHelper.java index 99d7afbc..8a83a82b 100644 --- a/src/test/java/io/debezium/connector/vitess/TestHelper.java +++ b/src/test/java/io/debezium/connector/vitess/TestHelper.java @@ -67,8 +67,8 @@ public class TestHelper { private static final String VTGATE_HOST = "localhost"; private static final int VTGATE_PORT = 15991; // Use the same username and password for vtgate and vtctld - private static final String USERNAME = "vitess"; - private static final String PASSWORD = "vitess_password"; + public static final String USERNAME = "vitess"; + public static final String PASSWORD = "vitess_password"; protected static final String VGTID_JSON_NO_PKS_TEMPLATE = "[" + "{\"keyspace\":\"%s\",\"shard\":\"%s\",\"gtid\":\"%s\"}," + @@ -165,8 +165,8 @@ public static Configuration.Builder defaultConfig(boolean hasMultipleShards, .with(VitessConnectorConfig.VTGATE_HOST, VTGATE_HOST) .with(VitessConnectorConfig.VTGATE_PORT, VTGATE_PORT) .with(VitessConnectorConfig.VTGATE_USER, USERNAME) - .with(VitessConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class) .with(VitessConnectorConfig.VTGATE_PASSWORD, PASSWORD) + .with(VitessConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class) .with(EmbeddedEngineConfig.WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_MS, 5000) .with(VitessConnectorConfig.POLL_INTERVAL_MS, 100); if (!Strings.isNullOrEmpty(tableInclude)) { diff --git a/src/test/java/io/debezium/connector/vitess/VitessConnectorConfigTest.java b/src/test/java/io/debezium/connector/vitess/VitessConnectorConfigTest.java index b9b8d7a5..86cfd0e7 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessConnectorConfigTest.java +++ b/src/test/java/io/debezium/connector/vitess/VitessConnectorConfigTest.java @@ -19,6 +19,7 @@ import io.debezium.config.Configuration; import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionMetadataFactory; import io.debezium.heartbeat.Heartbeat; +import io.debezium.jdbc.JdbcConfiguration; import io.debezium.schema.DefaultTopicNamingStrategy; import io.debezium.schema.SchemaNameAdjuster; @@ -189,4 +190,48 @@ public void shouldValidateInheritEpochWithOrderedTransactionMetadata() { assertThat(inputs.size()).isEqualTo(0); } + @Test + public void shouldGetJdbcConfigNoPort() { + Integer grpcPort = 124; + Configuration configuration = TestHelper.defaultConfig() + .with(VitessConnectorConfig.VTGATE_PORT, grpcPort) + .build(); + VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration); + JdbcConfiguration jdbcConfiguration = connectorConfig.getJdbcConfig(); + assertThat(jdbcConfiguration.getPort()).isEqualTo(VitessConnectorConfig.DEFAULT_VTGATE_JDBC_PORT); + } + + @Test + public void shouldGetJdbcConfig() { + String expectedUser = "jdbc_user"; + String expectedPassword = "jdbc_password"; + Integer expectedPort = 123; + Integer grpcPort = 124; + Configuration configuration = TestHelper.defaultConfig() + .with(VitessConnectorConfig.VTGATE_JDBC_USER, expectedUser) + .with(VitessConnectorConfig.VTGATE_JDBC_PASSWORD, expectedPassword) + .with(VitessConnectorConfig.VTGATE_JDBC_PORT, expectedPort) + .with(VitessConnectorConfig.VTGATE_PORT, grpcPort) + .build(); + VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration); + JdbcConfiguration jdbcConfiguration = connectorConfig.getJdbcConfig(); + assertThat(jdbcConfiguration.getUser()).isEqualTo(expectedUser); + assertThat(jdbcConfiguration.getPassword()).isEqualTo(expectedPassword); + assertThat(jdbcConfiguration.getPort()).isEqualTo(expectedPort); + } + + @Test + public void shouldGetJdbcWithFallbacks() { + String expectedUser = "other_user"; + String expectedPassword = "other_password"; + Configuration configuration = TestHelper.defaultConfig() + .with(VitessConnectorConfig.VTGATE_USER, expectedUser) + .with(VitessConnectorConfig.VTGATE_PASSWORD, expectedPassword) + .build(); + VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration); + JdbcConfiguration jdbcConfiguration = connectorConfig.getJdbcConfig(); + assertThat(jdbcConfiguration.getUser()).isEqualTo(expectedUser); + assertThat(jdbcConfiguration.getPassword()).isEqualTo(expectedPassword); + } + } diff --git a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java index e8235aee..d2a392a2 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java +++ b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java @@ -253,7 +253,7 @@ public void shouldMigrateToEnabledSchemaChanges() throws Exception { .with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table) .with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NEVER) .with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH) - .with(VitessConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class), true); + .with(VitessConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class), true); assertConnectorIsRunning(); TestHelper.execute("INSERT INTO ddl_table (id) VALUES (1);", TEST_SHARDED_KEYSPACE); @@ -278,10 +278,7 @@ public void shouldMigrateToEnabledSchemaChanges() throws Exception { .with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table) // Enable schema change .with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) - .with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.CONFIGURATION_BASED) - .with(VitessConnectorConfig.SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_ON_SCHEMA_ERROR, true) - .with(VitessConnectorConfig.SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_SCHEMA, true) - .with(VitessConnectorConfig.SNAPSHOT_MODE_CONFIGURATION_BASED_START_STREAM, true) + .with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.RECOVERY) // Other configs from before .with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH) .with(VitessConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class), true); @@ -372,6 +369,10 @@ public void shouldSnapshotSchemaAndReceiveSchemaEventsSharded() throws Exception .with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) .with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table) .with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NO_DATA) + .with(VitessConnectorConfig.VTGATE_JDBC_PORT, VitessConnectorConfig.DEFAULT_VTGATE_JDBC_PORT) + // Although the creds are identical make sure it works with setting them explicitly + .with(VitessConnectorConfig.VTGATE_JDBC_USER, TestHelper.USERNAME) + .with(VitessConnectorConfig.VTGATE_JDBC_PASSWORD, TestHelper.PASSWORD) .with(VitessConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class), true); assertConnectorIsRunning();