Skip to content

Commit

Permalink
DBZ-8325 Support separate jdbc config & creds
Browse files Browse the repository at this point in the history
  • Loading branch information
twthorn committed Oct 25, 2024
1 parent f4935f9 commit 57c9c39
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.annotation.VisibleForTesting;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.ConfigDefinition;
import io.debezium.config.Configuration;
Expand Down Expand Up @@ -60,8 +61,11 @@ public class VitessConnectorConfig extends HistorizedRelationalDatabaseConnector
private static final Logger LOGGER = LoggerFactory.getLogger(VitessConnectorConfig.class);

private static final String VITESS_CONFIG_GROUP_PREFIX = "vitess.";
private static final String JDBC_CONFIG_PREFIX = "jdbc.";
private static final int DEFAULT_VTGATE_PORT = 15_991;
private static final int DEFAULT_VTGATE_JDBC_PORT = 15_306;

@VisibleForTesting
protected static final int DEFAULT_VTGATE_JDBC_PORT = 15_306;

/**
* Set of all built-in database names that will generally be ignored by the connector.
Expand All @@ -73,6 +77,8 @@ public class VitessConnectorConfig extends HistorizedRelationalDatabaseConnector
public JdbcConfiguration getJdbcConfig() {
JdbcConfiguration jdbcConfiguration = super.getJdbcConfig();
JdbcConfiguration updatedConfig = JdbcConfiguration.adapt(jdbcConfiguration.edit()
.with(JdbcConfiguration.USER, getVtgateJdbcUsername())
.with(JdbcConfiguration.PASSWORD, getVtgateJdbcPassword())
.with(JdbcConfiguration.DATABASE, getKeyspace())
.with(JdbcConfiguration.PORT, getVtgateJdbcPort())
.build());
Expand Down Expand Up @@ -100,6 +106,8 @@ public enum SnapshotMode implements EnumeratedValue {

CONFIGURATION_BASED("configuration_based"),

RECOVERY("recovery"),

/**
* Never perform an initial snapshot and only receive new data changes.
*/
Expand Down Expand Up @@ -237,15 +245,29 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue
.withValidation(Field::isInteger)
.withDescription("Port of the Vitess VTGate gRPC server.");

public static final Field VTGATE_JDBC_PORT = Field.create(DATABASE_CONFIG_PREFIX + "jdbc." + JdbcConfiguration.PORT)
public static final Field VTGATE_JDBC_PORT = Field.create(DATABASE_CONFIG_PREFIX + JDBC_CONFIG_PREFIX + JdbcConfiguration.PORT)
.withDisplayName("Vitess JDBC database port")
.withType(Type.INT)
.withWidth(Width.SHORT)
.withDefault(DEFAULT_VTGATE_JDBC_PORT)
.withImportance(ConfigDef.Importance.HIGH)
.withImportance(ConfigDef.Importance.MEDIUM)
.withValidation(Field::isInteger)
.withDescription("Port of the Vitess VTGate JDBC server.");

public static final Field VTGATE_JDBC_USER = Field.create(DATABASE_CONFIG_PREFIX + JDBC_CONFIG_PREFIX + JdbcConfiguration.USER)
.withDisplayName("Vitess JDBC database user")
.withType(Type.INT)
.withWidth(Width.SHORT)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDescription("Username of the Vitess VTGate JDBC server.");

public static final Field VTGATE_JDBC_PASSWORD = Field.create(DATABASE_CONFIG_PREFIX + JDBC_CONFIG_PREFIX + JdbcConfiguration.PASSWORD)
.withDisplayName("Vitess JDBC database password")
.withType(Type.INT)
.withWidth(Width.SHORT)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDescription("Password of the Vitess VTGate JDBC server.");

public static final Field VTGATE_USER = Field.create(DATABASE_CONFIG_PREFIX + JdbcConfiguration.USER)
.withDisplayName("User")
.withType(Type.STRING)
Expand Down Expand Up @@ -715,10 +737,30 @@ public String getVtgateUsername() {
return getConfig().getString(VTGATE_USER);
}

public String getVtgateJdbcUsername() {
String jdbcUsername = getConfig().getString(VTGATE_JDBC_USER);
if (jdbcUsername != null) {
return jdbcUsername;
}
else {
return getConfig().getString(VTGATE_USER);
}
}

public String getVtgatePassword() {
return getConfig().getString(VTGATE_PASSWORD);
}

public String getVtgateJdbcPassword() {
String jdbcPassword = getConfig().getString(VTGATE_JDBC_PASSWORD);
if (jdbcPassword != null) {
return jdbcPassword;
}
else {
return getConfig().getString(VTGATE_PASSWORD);
}
}

public String getTabletType() {
return getConfig().getString(TABLET_TYPE);
}
Expand Down
6 changes: 3 additions & 3 deletions src/test/java/io/debezium/connector/vitess/TestHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ public class TestHelper {
private static final String VTGATE_HOST = "localhost";
private static final int VTGATE_PORT = 15991;
// Use the same username and password for vtgate and vtctld
private static final String USERNAME = "vitess";
private static final String PASSWORD = "vitess_password";
public static final String USERNAME = "vitess";
public static final String PASSWORD = "vitess_password";

protected static final String VGTID_JSON_NO_PKS_TEMPLATE = "[" +
"{\"keyspace\":\"%s\",\"shard\":\"%s\",\"gtid\":\"%s\"}," +
Expand Down Expand Up @@ -165,8 +165,8 @@ public static Configuration.Builder defaultConfig(boolean hasMultipleShards,
.with(VitessConnectorConfig.VTGATE_HOST, VTGATE_HOST)
.with(VitessConnectorConfig.VTGATE_PORT, VTGATE_PORT)
.with(VitessConnectorConfig.VTGATE_USER, USERNAME)
.with(VitessConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class)
.with(VitessConnectorConfig.VTGATE_PASSWORD, PASSWORD)
.with(VitessConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class)
.with(EmbeddedEngineConfig.WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_MS, 5000)
.with(VitessConnectorConfig.POLL_INTERVAL_MS, 100);
if (!Strings.isNullOrEmpty(tableInclude)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.debezium.config.Configuration;
import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionMetadataFactory;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.schema.DefaultTopicNamingStrategy;
import io.debezium.schema.SchemaNameAdjuster;

Expand Down Expand Up @@ -189,4 +190,48 @@ public void shouldValidateInheritEpochWithOrderedTransactionMetadata() {
assertThat(inputs.size()).isEqualTo(0);
}

@Test
public void shouldGetJdbcConfigNoPort() {
Integer grpcPort = 124;
Configuration configuration = TestHelper.defaultConfig()
.with(VitessConnectorConfig.VTGATE_PORT, grpcPort)
.build();
VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration);
JdbcConfiguration jdbcConfiguration = connectorConfig.getJdbcConfig();
assertThat(jdbcConfiguration.getPort()).isEqualTo(VitessConnectorConfig.DEFAULT_VTGATE_JDBC_PORT);
}

@Test
public void shouldGetJdbcConfig() {
String expectedUser = "jdbc_user";
String expectedPassword = "jdbc_password";
Integer expectedPort = 123;
Integer grpcPort = 124;
Configuration configuration = TestHelper.defaultConfig()
.with(VitessConnectorConfig.VTGATE_JDBC_USER, expectedUser)
.with(VitessConnectorConfig.VTGATE_JDBC_PASSWORD, expectedPassword)
.with(VitessConnectorConfig.VTGATE_JDBC_PORT, expectedPort)
.with(VitessConnectorConfig.VTGATE_PORT, grpcPort)
.build();
VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration);
JdbcConfiguration jdbcConfiguration = connectorConfig.getJdbcConfig();
assertThat(jdbcConfiguration.getUser()).isEqualTo(expectedUser);
assertThat(jdbcConfiguration.getPassword()).isEqualTo(expectedPassword);
assertThat(jdbcConfiguration.getPort()).isEqualTo(expectedPort);
}

@Test
public void shouldGetJdbcWithFallbacks() {
String expectedUser = "other_user";
String expectedPassword = "other_password";
Configuration configuration = TestHelper.defaultConfig()
.with(VitessConnectorConfig.VTGATE_USER, expectedUser)
.with(VitessConnectorConfig.VTGATE_PASSWORD, expectedPassword)
.build();
VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration);
JdbcConfiguration jdbcConfiguration = connectorConfig.getJdbcConfig();
assertThat(jdbcConfiguration.getUser()).isEqualTo(expectedUser);
assertThat(jdbcConfiguration.getPassword()).isEqualTo(expectedPassword);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ public void shouldMigrateToEnabledSchemaChanges() throws Exception {
.with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table)
.with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NEVER)
.with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH)
.with(VitessConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class), true);
.with(VitessConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class), true);
assertConnectorIsRunning();

TestHelper.execute("INSERT INTO ddl_table (id) VALUES (1);", TEST_SHARDED_KEYSPACE);
Expand All @@ -278,10 +278,7 @@ public void shouldMigrateToEnabledSchemaChanges() throws Exception {
.with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table)
// Enable schema change
.with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.CONFIGURATION_BASED)
.with(VitessConnectorConfig.SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_ON_SCHEMA_ERROR, true)
.with(VitessConnectorConfig.SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_SCHEMA, true)
.with(VitessConnectorConfig.SNAPSHOT_MODE_CONFIGURATION_BASED_START_STREAM, true)
.with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.RECOVERY)
// Other configs from before
.with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH)
.with(VitessConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class), true);
Expand Down Expand Up @@ -372,6 +369,10 @@ public void shouldSnapshotSchemaAndReceiveSchemaEventsSharded() throws Exception
.with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table)
.with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NO_DATA)
.with(VitessConnectorConfig.VTGATE_JDBC_PORT, VitessConnectorConfig.DEFAULT_VTGATE_JDBC_PORT)
// Although the creds are identical make sure it works with setting them explicitly
.with(VitessConnectorConfig.VTGATE_JDBC_USER, TestHelper.USERNAME)
.with(VitessConnectorConfig.VTGATE_JDBC_PASSWORD, TestHelper.PASSWORD)
.with(VitessConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class),
true);
assertConnectorIsRunning();
Expand Down

0 comments on commit 57c9c39

Please sign in to comment.