Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[log](improve)Add printing parameters and move the verification forward #295

Merged
merged 7 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;

import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
Expand All @@ -42,9 +43,9 @@ public class CdcTools {
private static final List<String> EMPTY_KEYS = Collections.singletonList("password");

public static void main(String[] args) throws Exception {
System.out.println("Input args: " + Arrays.asList(args) + ".\n");
String operation = args[0].toLowerCase();
String[] opArgs = Arrays.copyOfRange(args, 1, args.length);
System.out.println();
switch (operation) {
case MYSQL_SYNC_DATABASE:
createMySQLSyncDatabase(opArgs);
Expand All @@ -66,6 +67,7 @@ public static void main(String[] args) throws Exception {

private static void createMySQLSyncDatabase(String[] opArgs) throws Exception {
MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
Preconditions.checkArgument(params.has("mysql-conf"));
Map<String, String> mysqlMap = getConfigMap(params, "mysql-conf");
Configuration mysqlConfig = Configuration.fromMap(mysqlMap);
DatabaseSync databaseSync = new MysqlDatabaseSync();
Expand All @@ -74,6 +76,7 @@ private static void createMySQLSyncDatabase(String[] opArgs) throws Exception {

private static void createOracleSyncDatabase(String[] opArgs) throws Exception {
MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
Preconditions.checkArgument(params.has("oracle-conf"));
Map<String, String> oracleMap = getConfigMap(params, "oracle-conf");
Configuration oracleConfig = Configuration.fromMap(oracleMap);
DatabaseSync databaseSync = new OracleDatabaseSync();
Expand All @@ -82,6 +85,7 @@ private static void createOracleSyncDatabase(String[] opArgs) throws Exception {

private static void createPostgresSyncDatabase(String[] opArgs) throws Exception {
MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
Preconditions.checkArgument(params.has("postgres-conf"));
Map<String, String> postgresMap = getConfigMap(params, "postgres-conf");
Configuration postgresConfig = Configuration.fromMap(postgresMap);
DatabaseSync databaseSync = new PostgresDatabaseSync();
Expand All @@ -90,6 +94,7 @@ private static void createPostgresSyncDatabase(String[] opArgs) throws Exception

private static void createSqlServerSyncDatabase(String[] opArgs) throws Exception {
MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
Preconditions.checkArgument(params.has("sqlserver-conf"));
Map<String, String> postgresMap = getConfigMap(params, "sqlserver-conf");
Configuration postgresConfig = Configuration.fromMap(postgresMap);
DatabaseSync databaseSync = new SqlServerDatabaseSync();
Expand All @@ -115,6 +120,7 @@ private static void syncDatabase(
boolean useNewSchemaChange = params.has("use-new-schema-change");
boolean singleSink = params.has("single-sink");

Preconditions.checkArgument(params.has("sink-conf"));
Map<String, String> sinkMap = getConfigMap(params, "sink-conf");
Map<String, String> tableMap = getConfigMap(params, "table-conf");
Configuration sinkConfig = Configuration.fromMap(sinkMap);
Expand Down Expand Up @@ -149,7 +155,13 @@ private static void syncDatabase(

private static Map<String, String> getConfigMap(MultipleParameterTool params, String key) {
if (!params.has(key)) {
return new HashMap<>();
System.out.println(
"Can not find key ["
+ key
+ "] from args: "
+ params.toMap().toString()
+ ".\n");
return null;
}

Map<String, String> map = new HashMap<>();
Expand All @@ -163,7 +175,8 @@ private static Map<String, String> getConfigMap(MultipleParameterTool params, St
continue;
}

System.err.println("Invalid " + key + " " + param + ".\n");
System.out.println("Invalid " + key + " " + param + ".\n");
return null;
}
return map;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
Expand Down Expand Up @@ -441,7 +442,9 @@ public DatabaseSync setMultiToOneTarget(String multiToOneTarget) {
}

public DatabaseSync setTableConfig(Map<String, String> tableConfig) {
this.tableConfig = tableConfig;
if (!CollectionUtil.isNullOrEmpty(tableConfig)) {
this.tableConfig = tableConfig;
}
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,20 @@
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerLoggerFactory;

import java.net.MalformedURLException;
import java.net.InetAddress;
import java.net.URL;
import java.net.URLClassLoader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Stream;
Expand Down Expand Up @@ -107,7 +112,7 @@ public static GenericContainer createDorisContainer() {
return container;
}

protected static void initializeJdbcConnection() throws SQLException, MalformedURLException {
protected static void initializeJdbcConnection() throws Exception {
URLClassLoader urlClassLoader =
new URLClassLoader(
new URL[] {new URL(DRIVER_JAR)}, DorisTestBase.class.getClassLoader());
Expand All @@ -124,6 +129,7 @@ protected static void initializeJdbcConnection() throws SQLException, MalformedU
} while (!isBeReady(resultSet, Duration.ofSeconds(1L)));
}
LOG.info("Connected to Doris successfully...");
printClusterStatus();
}

private static boolean isBeReady(ResultSet rs, Duration duration) throws SQLException {
Expand All @@ -135,4 +141,28 @@ private static boolean isBeReady(ResultSet rs, Duration duration) throws SQLExce
}
return false;
}

protected static void printClusterStatus() throws Exception {
LOG.info("Current machine IP: {}", InetAddress.getLocalHost());
try (Statement statement = connection.createStatement()) {
ResultSet showFrontends = statement.executeQuery("show frontends");
LOG.info("Frontends status: {}", convertList(showFrontends));
ResultSet showBackends = statement.executeQuery("show backends");
LOG.info("Backends status: {}", convertList(showBackends));
}
}

private static List<Map> convertList(ResultSet rs) throws SQLException {
List<Map> list = new ArrayList<>();
ResultSetMetaData metaData = rs.getMetaData();
int columnCount = metaData.getColumnCount();
while (rs.next()) {
Map<String, Object> rowData = new HashMap<>();
for (int i = 1; i <= columnCount; i++) {
rowData.put(metaData.getColumnName(i), rs.getObject(i));
}
list.add(rowData);
}
return list;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public static void stopMySQLContainers() {

@Test
public void testMySQL2Doris() throws Exception {
printClusterStatus();
initializeMySQLTable();
JobClient jobClient = submitJob();
// wait 2 times checkpoint
Expand Down Expand Up @@ -173,6 +174,7 @@ public void testMySQL2Doris() throws Exception {

@Test
public void testAutoAddTable() throws Exception {
printClusterStatus();
initializeMySQLTable();
initializeDorisTable();
JobClient jobClient = submitJob();
Expand Down
Loading