From 7f08826a53119914e06cfb0bbe4057c038c156cf Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Wed, 11 Sep 2024 14:30:17 +0800 Subject: [PATCH 1/3] update --- .../flink/tools/cdc/DatabaseSyncConfig.java | 2 -- .../flink/tools/cdc/DorisTableConfig.java | 12 +++++---- .../container/e2e/Doris2DorisE2ECase.java | 26 +++---------------- 3 files changed, 10 insertions(+), 30 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSyncConfig.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSyncConfig.java index 6c78d5cd4..e1a089ff5 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSyncConfig.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSyncConfig.java @@ -69,8 +69,6 @@ public class DatabaseSyncConfig { public static final String SINGLE_SINK = "single-sink"; ////////// doris-table-conf ////////// public static final String TABLE_CONF = "table-conf"; - public static final String REPLICATION_NUM = "replication_num"; - public static final String TABLE_BUCKETS = "table-buckets"; ////////// date-converter-conf ////////// public static final String CONVERTERS = "converters"; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java index 912ed6982..6318fc8a5 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java @@ -26,8 +26,11 @@ import java.util.Objects; public class DorisTableConfig implements Serializable { - private static final String LIGHT_SCHEMA_CHANGE = "light_schema_change"; + public static final String LIGHT_SCHEMA_CHANGE = "light_schema_change"; // PROPERTIES parameter in doris table creation statement. such as: replication_num=1. + public static final String REPLICATION_NUM = "replication_num"; + public static final String TABLE_BUCKETS = "table-buckets"; + private final Map tableProperties; // The specific parameters extracted from --table-conf need to be parsed and integrated into the // doris table creation statement. such as: table-buckets="tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50". @@ -48,10 +51,9 @@ public DorisTableConfig(Map tableConfig) { if (!tableConfig.containsKey(LIGHT_SCHEMA_CHANGE)) { tableConfig.put(LIGHT_SCHEMA_CHANGE, Boolean.toString(true)); } - if (tableConfig.containsKey(DatabaseSyncConfig.TABLE_BUCKETS)) { - this.tableBuckets = - buildTableBucketMap(tableConfig.get(DatabaseSyncConfig.TABLE_BUCKETS)); - tableConfig.remove(DatabaseSyncConfig.TABLE_BUCKETS); + if (tableConfig.containsKey(TABLE_BUCKETS)) { + this.tableBuckets = buildTableBucketMap(tableConfig.get(TABLE_BUCKETS)); + tableConfig.remove(TABLE_BUCKETS); } tableProperties = tableConfig; } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java index 4b4e3b26a..e180872af 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java @@ -17,18 +17,15 @@ package org.apache.doris.flink.container.e2e; +import org.apache.doris.flink.container.AbstractContainerTestBase; +import org.apache.doris.flink.container.ContainerUtils; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; - -import org.apache.doris.flink.container.AbstractE2EService; -import org.apache.doris.flink.container.ContainerUtils; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,19 +34,12 @@ import java.util.List; import java.util.UUID; -public class Doris2DorisE2ECase extends AbstractE2EService { +public class Doris2DorisE2ECase extends AbstractContainerTestBase { private static final Logger LOG = LoggerFactory.getLogger(Doris2DorisE2ECase.class); private static final String DATABASE_SOURCE = "test_doris2doris_source"; private static final String DATABASE_SINK = "test_doris2doris_sink"; private static final String TABLE = "test_tbl"; - @Before - public void setUp() throws InterruptedException { - LOG.info("Doris2DorisE2ECase attempting to acquire semaphore."); - SEMAPHORE.acquire(); - LOG.info("Doris2DorisE2ECase semaphore acquired."); - } - @Test public void testDoris2Doris() throws Exception { LOG.info("Start executing the test case of doris to doris."); @@ -163,14 +153,4 @@ private void initializeDorisTable() { ContainerUtils.executeSQLStatement(getDorisQueryConnection(), LOG, sinkInitSql); LOG.info("Initialization of doris table successful."); } - - @After - public void close() { - try { - // Ensure that semaphore is always released - } finally { - LOG.info("Doris2DorisE2ECase releasing semaphore."); - SEMAPHORE.release(); - } - } } From e69035cccb190ecf3cedc43c325646b24b20a5eb Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Wed, 11 Sep 2024 14:33:16 +0800 Subject: [PATCH 2/3] update --- .../apache/doris/flink/container/e2e/Doris2DorisE2ECase.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java index e180872af..f7b3bee7a 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java @@ -17,14 +17,15 @@ package org.apache.doris.flink.container.e2e; -import org.apache.doris.flink.container.AbstractContainerTestBase; -import org.apache.doris.flink.container.ContainerUtils; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; + +import org.apache.doris.flink.container.AbstractContainerTestBase; +import org.apache.doris.flink.container.ContainerUtils; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; From 0c7663a942a9c91fc46947331c974a18c62a8519 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Wed, 11 Sep 2024 14:37:16 +0800 Subject: [PATCH 3/3] update --- .../apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java | 4 ++-- .../doris/flink/tools/cdc/CdcMongoSyncDatabaseCase.java | 4 ++-- .../doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java | 4 ++-- .../doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java | 4 ++-- .../doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java | 4 ++-- .../doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java | 4 ++-- 6 files changed, 12 insertions(+), 12 deletions(-) diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java index 0666cb9d4..055365570 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java @@ -65,8 +65,8 @@ public static void main(String[] args) throws Exception { Configuration sinkConf = Configuration.fromMap(sinkConfig); Map tableConfig = new HashMap<>(); - tableConfig.put(DatabaseSyncConfig.REPLICATION_NUM, "1"); - tableConfig.put(DatabaseSyncConfig.TABLE_BUCKETS, "tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50"); + tableConfig.put(DorisTableConfig.REPLICATION_NUM, "1"); + tableConfig.put(DorisTableConfig.TABLE_BUCKETS, "tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50"); String includingTables = "FULL_TYPES"; String excludingTables = null; String multiToOneOrigin = null; diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMongoSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMongoSyncDatabaseCase.java index 09006b2cb..ffc8a75d8 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMongoSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMongoSyncDatabaseCase.java @@ -68,8 +68,8 @@ public static void main(String[] args) throws Exception { Configuration sinkConf = Configuration.fromMap(sinkConfig); Map tableConfig = new HashMap<>(); - tableConfig.put(DatabaseSyncConfig.REPLICATION_NUM, "1"); - tableConfig.put(DatabaseSyncConfig.TABLE_BUCKETS, ".*:1"); + tableConfig.put(DorisTableConfig.REPLICATION_NUM, "1"); + tableConfig.put(DorisTableConfig.TABLE_BUCKETS, ".*:1"); String includingTables = "cdc_test"; String excludingTables = ""; String multiToOneOrigin = "a_.*|b_.*"; diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java index c430ea87b..e85e888fc 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java @@ -67,8 +67,8 @@ public static void main(String[] args) throws Exception { Configuration sinkConf = Configuration.fromMap(sinkConfig); Map tableConfig = new HashMap<>(); - tableConfig.put(DatabaseSyncConfig.REPLICATION_NUM, "1"); - tableConfig.put(DatabaseSyncConfig.TABLE_BUCKETS, "tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50"); + tableConfig.put(DorisTableConfig.REPLICATION_NUM, "1"); + tableConfig.put(DorisTableConfig.TABLE_BUCKETS, "tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50"); // String includingTables = "tbl1|tbl2|tbl3"; String includingTables = "a_.*|b_.*|c"; String excludingTables = ""; diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java index a47212c8e..92600ffd6 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java @@ -62,8 +62,8 @@ public static void main(String[] args) throws Exception { Configuration sinkConf = Configuration.fromMap(sinkConfig); Map tableConfig = new HashMap<>(); - tableConfig.put(DatabaseSyncConfig.REPLICATION_NUM, "1"); - tableConfig.put(DatabaseSyncConfig.TABLE_BUCKETS, "tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50"); + tableConfig.put(DorisTableConfig.REPLICATION_NUM, "1"); + tableConfig.put(DorisTableConfig.TABLE_BUCKETS, "tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50"); String includingTables = "a_.*|b_.*|c"; String excludingTables = ""; String multiToOneOrigin = "a_.*|b_.*"; diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java index 99892e022..331840117 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java @@ -66,8 +66,8 @@ public static void main(String[] args) throws Exception { Configuration sinkConf = Configuration.fromMap(sinkConfig); Map tableConfig = new HashMap<>(); - tableConfig.put(DatabaseSyncConfig.REPLICATION_NUM, "1"); - tableConfig.put(DatabaseSyncConfig.TABLE_BUCKETS, "tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50"); + tableConfig.put(DorisTableConfig.REPLICATION_NUM, "1"); + tableConfig.put(DorisTableConfig.TABLE_BUCKETS, "tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50"); String includingTables = "a_.*|b_.*|c"; String excludingTables = ""; String multiToOneOrigin = "a_.*|b_.*"; diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java index ca6a3121b..7a1cf276a 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java @@ -65,8 +65,8 @@ public static void main(String[] args) throws Exception { Configuration sinkConf = Configuration.fromMap(sinkConfig); Map tableConfig = new HashMap<>(); - tableConfig.put(DatabaseSyncConfig.REPLICATION_NUM, "1"); - tableConfig.put(DatabaseSyncConfig.TABLE_BUCKETS, "tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50"); + tableConfig.put(DorisTableConfig.REPLICATION_NUM, "1"); + tableConfig.put(DorisTableConfig.TABLE_BUCKETS, "tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50"); String includingTables = "a_.*|b_.*|c"; String excludingTables = ""; String multiToOneOrigin = "a_.*|b_.*";