From eed8f3762b58ea37cb912dd7a532936308c59940 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Tue, 19 Dec 2023 16:50:09 +0800 Subject: [PATCH] add mysql doris e2ecase --- .github/workflows/run-e2ecase.yml | 44 +++ flink-doris-connector/pom.xml | 6 + .../org/apache/doris/flink/DorisTestBase.java | 4 +- .../flink/tools/cdc/MySQLDorisE2ECase.java | 331 ++++++++++++++++++ 4 files changed, 383 insertions(+), 2 deletions(-) create mode 100644 .github/workflows/run-e2ecase.yml create mode 100644 flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java diff --git a/.github/workflows/run-e2ecase.yml b/.github/workflows/run-e2ecase.yml new file mode 100644 index 000000000..ad76a3fa7 --- /dev/null +++ b/.github/workflows/run-e2ecase.yml @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +--- +name: Run E2ECases +on: + pull_request: + push: + +jobs: + build-extension: + name: "Run E2ECases" + runs-on: ubuntu-latest + defaults: + run: + shell: bash + steps: + - name: Checkout + uses: actions/checkout@master + + - name: Setup java + uses: actions/setup-java@v2 + with: + distribution: adopt + java-version: '8' + + - name: Run E2ECases + run: | + cd flink-doris-connector && mvn test -Dtest="*E2ECase" + diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index 76156adf7..a54562e7a 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -332,6 +332,12 @@ under the License. ${testcontainers.version} test + + org.testcontainers + mysql + ${testcontainers.version} + test + diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java index 8ad4130bd..520d3d2ae 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java @@ -49,8 +49,8 @@ public abstract class DorisTestBase { protected static final String DORIS_12_DOCKER_IMAGE = "adamlee489/doris:1.2.7.1_x86"; private static final String DRIVER_JAR = "https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar"; - private static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; - private static final String URL = "jdbc:mysql://%s:9030"; + protected static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; + protected static final String URL = "jdbc:mysql://%s:9030"; protected static final String USERNAME = "root"; protected static final String PASSWORD = ""; protected static final GenericContainer DORIS_CONTAINER = createDorisContainer(); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java new file mode 100644 index 000000000..0246e3930 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java @@ -0,0 +1,331 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.tools.cdc; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.function.SupplierWithException; + +import org.apache.doris.flink.DorisTestBase; +import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.lifecycle.Startables; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.api.common.JobStatus.RUNNING; + +/** + * MySQLDorisE2ECase 1. Automatically create tables 2. Schema change event synchronization + * 3.Synchronization of addition, deletion and modification events 4. CDC multi-table writing. + */ +public class MySQLDorisE2ECase extends DorisTestBase { + protected static final Logger LOG = LoggerFactory.getLogger(MySQLDorisE2ECase.class); + private static final String DATABASE = "test"; + private static final String MYSQL_USER = "root"; + private static final String MYSQL_PASSWD = "123456"; + private static final String TABLE_1 = "tbl1"; + private static final String TABLE_2 = "tbl2"; + private static final String TABLE_3 = "tbl3"; + + private static final MySQLContainer MYSQL_CONTAINER = + new MySQLContainer("mysql") + .withDatabaseName(DATABASE) + .withUsername(MYSQL_USER) + .withPassword(MYSQL_PASSWD); + + @BeforeClass + public static void startMySQLContainers() { + MYSQL_CONTAINER.setCommand("--default-time-zone=Asia/Shanghai"); + LOG.info("Starting MySQL containers..."); + Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join(); + LOG.info("MySQL Containers are started."); + } + + @AfterClass + public static void stopMySQLContainers() { + LOG.info("Stopping MySQL containers..."); + MYSQL_CONTAINER.stop(); + LOG.info("MySQL Containers are stopped."); + } + + @Test + public void testMySQL2Doris() throws Exception { + initializeMySQLTable(); + JobClient jobClient = submitJob(); + // wait 2 times checkpoint + Thread.sleep(20000); + Set> expected = + Stream.>of( + Arrays.asList("doris_1", 1), + Arrays.asList("doris_2", 2), + Arrays.asList("doris_3", 3)) + .collect(Collectors.toSet()); + String sql = + "select * from %s.%s union all select * from %s.%s union all select * from %s.%s order by 1"; + checkResult(expected, sql, 2); + + // add incremental data + try (Connection connection = + DriverManager.getConnection( + MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD); + Statement statement = connection.createStatement()) { + statement.execute( + String.format("insert into %s.%s values ('doris_1_1',10)", DATABASE, TABLE_1)); + statement.execute( + String.format("insert into %s.%s values ('doris_2_1',11)", DATABASE, TABLE_2)); + statement.execute( + String.format("insert into %s.%s values ('doris_3_1',12)", DATABASE, TABLE_3)); + + statement.execute( + String.format( + "update %s.%s set age=18 where name='doris_1'", DATABASE, TABLE_1)); + statement.execute( + String.format("delete from %s.%s where name='doris_2'", DATABASE, TABLE_2)); + } + + Thread.sleep(20000); + Set> expected2 = + Stream.>of( + Arrays.asList("doris_1", 18), + Arrays.asList("doris_1_1", 10), + Arrays.asList("doris_2_1", 11), + Arrays.asList("doris_3", 3), + Arrays.asList("doris_3_1", 12)) + .collect(Collectors.toSet()); + sql = + "select * from %s.%s union all select * from %s.%s union all select * from %s.%s order by 1"; + checkResult(expected2, sql, 2); + + // mock schema change + try (Connection connection = + DriverManager.getConnection( + MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "alter table %s.%s add column c1 varchar(128)", DATABASE, TABLE_1)); + statement.execute( + String.format("alter table %s.%s drop column age", DATABASE, TABLE_1)); + Thread.sleep(20000); + statement.execute( + String.format( + "insert into %s.%s values ('doris_1_1_1','c1_val')", + DATABASE, TABLE_1)); + } + Thread.sleep(20000); + Set> expected3 = + Stream.>of( + Arrays.asList("doris_1", null), + Arrays.asList("doris_1_1", null), + Arrays.asList("doris_1_1_1", "c1_val")) + .collect(Collectors.toSet()); + sql = "select * from %s.%s order by 1"; + checkResult(expected3, sql, 2); + jobClient.cancel().get(); + } + + public void checkResult(Set> expected, String query, int columnSize) + throws Exception { + Set> actual = new HashSet<>(); + try (Statement sinkStatement = connection.createStatement()) { + ResultSet sinkResultSet = + sinkStatement.executeQuery( + String.format( + query, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, + TABLE_3)); + while (sinkResultSet.next()) { + List row = new ArrayList<>(); + for (int i = 1; i <= columnSize; i++) { + row.add(sinkResultSet.getObject(i)); + } + actual.add(row); + } + } + Assertions.assertIterableEquals(expected, actual); + } + + public JobClient submitJob() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RestartStrategies.noRestart()); + Map flinkMap = new HashMap<>(); + flinkMap.put("execution.checkpointing.interval", "10s"); + flinkMap.put("pipeline.operator-chaining", "false"); + flinkMap.put("parallelism.default", "1"); + + Configuration configuration = Configuration.fromMap(flinkMap); + env.configure(configuration); + + String database = DATABASE; + Map mysqlConfig = new HashMap<>(); + mysqlConfig.put("database-name", DATABASE); + mysqlConfig.put("hostname", MYSQL_CONTAINER.getHost()); + mysqlConfig.put("port", MYSQL_CONTAINER.getMappedPort(3306) + ""); + mysqlConfig.put("username", MYSQL_USER); + mysqlConfig.put("password", MYSQL_PASSWD); + mysqlConfig.put("server-time-zone", "Asia/Shanghai"); + Configuration config = Configuration.fromMap(mysqlConfig); + + Map sinkConfig = new HashMap<>(); + sinkConfig.put("fenodes", getFenodes()); + sinkConfig.put("username", USERNAME); + sinkConfig.put("password", PASSWORD); + sinkConfig.put("jdbc-url", String.format(DorisTestBase.URL, DORIS_CONTAINER.getHost())); + sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString()); + Configuration sinkConf = Configuration.fromMap(sinkConfig); + + Map tableConfig = new HashMap<>(); + tableConfig.put("replication_num", "1"); + + String includingTables = "tbl1|tbl2|tbl3"; + String excludingTables = ""; + DatabaseSync databaseSync = new MysqlDatabaseSync(); + databaseSync + .setEnv(env) + .setDatabase(database) + .setConfig(config) + .setIncludingTables(includingTables) + .setExcludingTables(excludingTables) + .setIgnoreDefaultValue(false) + .setSinkConfig(sinkConf) + .setTableConfig(tableConfig) + .setCreateTableOnly(false) + .setNewSchemaChange(true) + .create(); + databaseSync.build(); + JobClient jobClient = env.executeAsync(); + waitForJobStatus( + jobClient, + Collections.singletonList(RUNNING), + Deadline.fromNow(Duration.ofSeconds(10))); + return jobClient; + } + + public void initializeMySQLTable() throws Exception { + try (Connection connection = + DriverManager.getConnection( + MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD); + Statement statement = connection.createStatement()) { + statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE)); + statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_1)); + statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_2)); + statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_3)); + statement.execute( + String.format( + "CREATE TABLE %s.%s ( \n" + + "`name` varchar(256) primary key,\n" + + "`age` int\n" + + ")", + DATABASE, TABLE_1)); + statement.execute( + String.format( + "CREATE TABLE %s.%s ( \n" + + "`name` varchar(256) primary key,\n" + + "`age` int\n" + + ")", + DATABASE, TABLE_2)); + statement.execute( + String.format( + "CREATE TABLE %s.%s ( \n" + + "`name` varchar(256) primary key,\n" + + "`age` int\n" + + ")", + DATABASE, TABLE_3)); + // mock stock data + statement.execute( + String.format("insert into %s.%s values ('doris_1',1)", DATABASE, TABLE_1)); + statement.execute( + String.format("insert into %s.%s values ('doris_2',2)", DATABASE, TABLE_2)); + statement.execute( + String.format("insert into %s.%s values ('doris_3',3)", DATABASE, TABLE_3)); + } + } + + public static void waitForJobStatus( + JobClient client, List expectedStatus, Deadline deadline) throws Exception { + waitUntilCondition( + () -> { + JobStatus currentStatus = (JobStatus) client.getJobStatus().get(); + if (expectedStatus.contains(currentStatus)) { + return true; + } else if (currentStatus.isTerminalState()) { + try { + client.getJobExecutionResult().get(); + } catch (Exception var4) { + throw new IllegalStateException( + String.format( + "Job has entered %s state, but expecting %s", + currentStatus, expectedStatus), + var4); + } + + throw new IllegalStateException( + String.format( + "Job has entered a terminal state %s, but expecting %s", + currentStatus, expectedStatus)); + } else { + return false; + } + }, + deadline, + 100L, + "Condition was not met in given timeout."); + } + + public static void waitUntilCondition( + SupplierWithException condition, + Deadline timeout, + long retryIntervalMillis, + String errorMsg) + throws Exception { + while (timeout.hasTimeLeft() && !(Boolean) condition.get()) { + long timeLeft = Math.max(0L, timeout.timeLeft().toMillis()); + Thread.sleep(Math.min(retryIntervalMillis, timeLeft)); + } + + if (!timeout.hasTimeLeft()) { + throw new TimeoutException(errorMsg); + } + } +}