Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Chore] Split some case logic #487

Merged
merged 3 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ public class DatabaseSyncConfig {
public static final String SINGLE_SINK = "single-sink";
////////// doris-table-conf //////////
public static final String TABLE_CONF = "table-conf";
public static final String REPLICATION_NUM = "replication_num";
public static final String TABLE_BUCKETS = "table-buckets";

////////// date-converter-conf //////////
public static final String CONVERTERS = "converters";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@
import java.util.Objects;

public class DorisTableConfig implements Serializable {
private static final String LIGHT_SCHEMA_CHANGE = "light_schema_change";
public static final String LIGHT_SCHEMA_CHANGE = "light_schema_change";
// PROPERTIES parameter in doris table creation statement. such as: replication_num=1.
public static final String REPLICATION_NUM = "replication_num";
public static final String TABLE_BUCKETS = "table-buckets";

private final Map<String, String> tableProperties;
// The specific parameters extracted from --table-conf need to be parsed and integrated into the
// doris table creation statement. such as: table-buckets="tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50".
Expand All @@ -48,10 +51,9 @@ public DorisTableConfig(Map<String, String> tableConfig) {
if (!tableConfig.containsKey(LIGHT_SCHEMA_CHANGE)) {
tableConfig.put(LIGHT_SCHEMA_CHANGE, Boolean.toString(true));
}
if (tableConfig.containsKey(DatabaseSyncConfig.TABLE_BUCKETS)) {
this.tableBuckets =
buildTableBucketMap(tableConfig.get(DatabaseSyncConfig.TABLE_BUCKETS));
tableConfig.remove(DatabaseSyncConfig.TABLE_BUCKETS);
if (tableConfig.containsKey(TABLE_BUCKETS)) {
this.tableBuckets = buildTableBucketMap(tableConfig.get(TABLE_BUCKETS));
tableConfig.remove(TABLE_BUCKETS);
}
tableProperties = tableConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;

import org.apache.doris.flink.container.AbstractE2EService;
import org.apache.doris.flink.container.AbstractContainerTestBase;
import org.apache.doris.flink.container.ContainerUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -37,19 +35,12 @@
import java.util.List;
import java.util.UUID;

public class Doris2DorisE2ECase extends AbstractE2EService {
public class Doris2DorisE2ECase extends AbstractContainerTestBase {
private static final Logger LOG = LoggerFactory.getLogger(Doris2DorisE2ECase.class);
private static final String DATABASE_SOURCE = "test_doris2doris_source";
private static final String DATABASE_SINK = "test_doris2doris_sink";
private static final String TABLE = "test_tbl";

@Before
public void setUp() throws InterruptedException {
LOG.info("Doris2DorisE2ECase attempting to acquire semaphore.");
SEMAPHORE.acquire();
LOG.info("Doris2DorisE2ECase semaphore acquired.");
}

@Test
public void testDoris2Doris() throws Exception {
LOG.info("Start executing the test case of doris to doris.");
Expand Down Expand Up @@ -163,14 +154,4 @@ private void initializeDorisTable() {
ContainerUtils.executeSQLStatement(getDorisQueryConnection(), LOG, sinkInitSql);
LOG.info("Initialization of doris table successful.");
}

@After
public void close() {
try {
// Ensure that semaphore is always released
} finally {
LOG.info("Doris2DorisE2ECase releasing semaphore.");
SEMAPHORE.release();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ public static void main(String[] args) throws Exception {
Configuration sinkConf = Configuration.fromMap(sinkConfig);

Map<String, String> tableConfig = new HashMap<>();
tableConfig.put(DatabaseSyncConfig.REPLICATION_NUM, "1");
tableConfig.put(DatabaseSyncConfig.TABLE_BUCKETS, "tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
tableConfig.put(DorisTableConfig.REPLICATION_NUM, "1");
tableConfig.put(DorisTableConfig.TABLE_BUCKETS, "tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
String includingTables = "FULL_TYPES";
String excludingTables = null;
String multiToOneOrigin = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ public static void main(String[] args) throws Exception {
Configuration sinkConf = Configuration.fromMap(sinkConfig);

Map<String, String> tableConfig = new HashMap<>();
tableConfig.put(DatabaseSyncConfig.REPLICATION_NUM, "1");
tableConfig.put(DatabaseSyncConfig.TABLE_BUCKETS, ".*:1");
tableConfig.put(DorisTableConfig.REPLICATION_NUM, "1");
tableConfig.put(DorisTableConfig.TABLE_BUCKETS, ".*:1");
String includingTables = "cdc_test";
String excludingTables = "";
String multiToOneOrigin = "a_.*|b_.*";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ public static void main(String[] args) throws Exception {
Configuration sinkConf = Configuration.fromMap(sinkConfig);

Map<String, String> tableConfig = new HashMap<>();
tableConfig.put(DatabaseSyncConfig.REPLICATION_NUM, "1");
tableConfig.put(DatabaseSyncConfig.TABLE_BUCKETS, "tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
tableConfig.put(DorisTableConfig.REPLICATION_NUM, "1");
tableConfig.put(DorisTableConfig.TABLE_BUCKETS, "tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
// String includingTables = "tbl1|tbl2|tbl3";
String includingTables = "a_.*|b_.*|c";
String excludingTables = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ public static void main(String[] args) throws Exception {
Configuration sinkConf = Configuration.fromMap(sinkConfig);

Map<String, String> tableConfig = new HashMap<>();
tableConfig.put(DatabaseSyncConfig.REPLICATION_NUM, "1");
tableConfig.put(DatabaseSyncConfig.TABLE_BUCKETS, "tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
tableConfig.put(DorisTableConfig.REPLICATION_NUM, "1");
tableConfig.put(DorisTableConfig.TABLE_BUCKETS, "tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
String includingTables = "a_.*|b_.*|c";
String excludingTables = "";
String multiToOneOrigin = "a_.*|b_.*";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ public static void main(String[] args) throws Exception {
Configuration sinkConf = Configuration.fromMap(sinkConfig);

Map<String, String> tableConfig = new HashMap<>();
tableConfig.put(DatabaseSyncConfig.REPLICATION_NUM, "1");
tableConfig.put(DatabaseSyncConfig.TABLE_BUCKETS, "tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
tableConfig.put(DorisTableConfig.REPLICATION_NUM, "1");
tableConfig.put(DorisTableConfig.TABLE_BUCKETS, "tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
String includingTables = "a_.*|b_.*|c";
String excludingTables = "";
String multiToOneOrigin = "a_.*|b_.*";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ public static void main(String[] args) throws Exception {
Configuration sinkConf = Configuration.fromMap(sinkConfig);

Map<String, String> tableConfig = new HashMap<>();
tableConfig.put(DatabaseSyncConfig.REPLICATION_NUM, "1");
tableConfig.put(DatabaseSyncConfig.TABLE_BUCKETS, "tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
tableConfig.put(DorisTableConfig.REPLICATION_NUM, "1");
tableConfig.put(DorisTableConfig.TABLE_BUCKETS, "tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
String includingTables = "a_.*|b_.*|c";
String excludingTables = "";
String multiToOneOrigin = "a_.*|b_.*";
Expand Down
Loading