From 38cc58bd34fd0ab0f6e9d93647f40f6e1e855f02 Mon Sep 17 00:00:00 2001
From: wtt <30461027+wtt40122@users.noreply.github.com>
Date: Fri, 12 Jan 2024 13:23:27 +0800
Subject: [PATCH] feat: Database support for Doris operations (#776)
---
.../plugin/datasource/DatasourcePlugin.java | 2 +-
.../docean-plugin-storage/pom.xml | 24 ++
.../storage-doris/pom.xml | 50 ++++
.../java/run/mone/doris/DorisService.java | 239 ++++++++++++++++++
.../java/run/mone/doris/DorisStreamLoad.java | 151 +++++++++++
.../main/java/run/mone/doris/HttpUtil.java | 19 ++
.../java/run/mone/doris/DorisServiceTest.java | 72 ++++++
.../run/mone/doris/DorisStreamLoadTest.java | 107 ++++++++
jcommon/docean-plugin/pom.xml | 114 +++++----
.../docean/adapter/LongDefaultAdapter.java | 5 +
10 files changed, 726 insertions(+), 57 deletions(-)
create mode 100644 jcommon/docean-plugin/docean-plugin-storage/pom.xml
create mode 100644 jcommon/docean-plugin/docean-plugin-storage/storage-doris/pom.xml
create mode 100644 jcommon/docean-plugin/docean-plugin-storage/storage-doris/src/main/java/run/mone/doris/DorisService.java
create mode 100644 jcommon/docean-plugin/docean-plugin-storage/storage-doris/src/main/java/run/mone/doris/DorisStreamLoad.java
create mode 100644 jcommon/docean-plugin/docean-plugin-storage/storage-doris/src/main/java/run/mone/doris/HttpUtil.java
create mode 100644 jcommon/docean-plugin/docean-plugin-storage/storage-doris/src/test/java/run/mone/doris/DorisServiceTest.java
create mode 100644 jcommon/docean-plugin/docean-plugin-storage/storage-doris/src/test/java/run/mone/doris/DorisStreamLoadTest.java
diff --git a/jcommon/docean-plugin/docean-plugin-datasource/src/main/java/com/xiaomi/youpin/docean/plugin/datasource/DatasourcePlugin.java b/jcommon/docean-plugin/docean-plugin-datasource/src/main/java/com/xiaomi/youpin/docean/plugin/datasource/DatasourcePlugin.java
index e84435105..a39f87997 100644
--- a/jcommon/docean-plugin/docean-plugin-datasource/src/main/java/com/xiaomi/youpin/docean/plugin/datasource/DatasourcePlugin.java
+++ b/jcommon/docean-plugin/docean-plugin-datasource/src/main/java/com/xiaomi/youpin/docean/plugin/datasource/DatasourcePlugin.java
@@ -186,7 +186,7 @@ private DatasourceConfig generateDatasourceConfig(String prefix, Config c) {
config.setDefaultInitialPoolSize(Integer.valueOf(c.get(prefix + "db_pool_size", "1")));
config.setDefaultMaxPoolSize(Integer.valueOf(c.get(prefix + "db_pool_size", "1")));
config.setDefaultMinPoolSize(Integer.valueOf(c.get(prefix + "db_pool_size", "1")));
- config.setDriverClass("com.mysql.jdbc.Driver");
+ config.setDriverClass(c.get(prefix + "db_driver", "com.mysql.jdbc.Driver"));
return config;
}
diff --git a/jcommon/docean-plugin/docean-plugin-storage/pom.xml b/jcommon/docean-plugin/docean-plugin-storage/pom.xml
new file mode 100644
index 000000000..0895477fe
--- /dev/null
+++ b/jcommon/docean-plugin/docean-plugin-storage/pom.xml
@@ -0,0 +1,24 @@
+
+
+ 4.0.0
+
+ run.mone
+ docean-plugin
+ 1.4-jdk21-SNAPSHOT
+
+
+ docean-plugin-storage
+ pom
+
+ storage-doris
+
+
+
+ 21
+ 21
+ UTF-8
+
+
+
\ No newline at end of file
diff --git a/jcommon/docean-plugin/docean-plugin-storage/storage-doris/pom.xml b/jcommon/docean-plugin/docean-plugin-storage/storage-doris/pom.xml
new file mode 100644
index 000000000..b5dd672ae
--- /dev/null
+++ b/jcommon/docean-plugin/docean-plugin-storage/storage-doris/pom.xml
@@ -0,0 +1,50 @@
+
+
+ 4.0.0
+
+ run.mone
+ docean-plugin-storage
+ 1.4-jdk21-SNAPSHOT
+
+
+ storage-doris
+
+
+ 21
+ 21
+ UTF-8
+
+
+
+
+ com.zaxxer
+ HikariCP
+ 5.1.0
+
+
+ org.mariadb.jdbc
+ mariadb-java-client
+ 3.1.4
+ test
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.14
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+ 21
+
+
+
+
+
\ No newline at end of file
diff --git a/jcommon/docean-plugin/docean-plugin-storage/storage-doris/src/main/java/run/mone/doris/DorisService.java b/jcommon/docean-plugin/docean-plugin-storage/storage-doris/src/main/java/run/mone/doris/DorisService.java
new file mode 100644
index 000000000..449f86ab6
--- /dev/null
+++ b/jcommon/docean-plugin/docean-plugin-storage/storage-doris/src/main/java/run/mone/doris/DorisService.java
@@ -0,0 +1,239 @@
+package run.mone.doris;
+
+import com.google.common.collect.Lists;
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
+import java.util.stream.Collectors;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * @author wtt
+ * @version 1.0
+ * @description
+ * @date 2024/1/8 10:23
+ */
+@Slf4j
+public class DorisService {
+
+ private HikariDataSource dataSource;
+
+ private Map>> bufferMap = new ConcurrentHashMap<>();
+ private Map> tableMap = new ConcurrentHashMap<>();
+
+ private ScheduledExecutorService scheduledExecutorService;
+
+ private ExecutorService executorService;
+
+ private Long flushIntervalMillSeconds = 1000L;
+
+ @Setter
+ private Integer stream_load_port = 8030;
+
+ private static final String DEFAULT_DRIVER_NAME = "org.mariadb.jdbc.Driver";
+
+ public DorisService(String url, String user, String password) {
+ this(DEFAULT_DRIVER_NAME, url, user, password);
+ }
+
+ public DorisService(String driver, String url, String user, String password) {
+ this.dataSource = getDatasource(driver, url, user, password);
+
+ executorService = Executors.newVirtualThreadPerTaskExecutor();
+ scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+
+ scheduledExecutorService.scheduleAtFixedRate(this::flush, 1000, flushIntervalMillSeconds, TimeUnit.MILLISECONDS);
+ }
+
+ private HikariDataSource getDatasource(String driver, String url, String user, String password) {
+ HikariConfig config = new HikariConfig();
+ config.setDriverClassName(driver);
+ config.setJdbcUrl(url);
+ config.setUsername(user);
+ config.setPassword(password);
+ config.setMaximumPoolSize(30);
+ config.setConnectionTimeout(SECONDS.toMillis(30));
+ config.setConnectionTestQuery("SELECT 1");
+// config.setLeakDetectionThreshold(10000); // 设置为30秒
+
+ return new HikariDataSource(config);
+ }
+
+ public boolean createTable(String createSql) {
+ try (Connection connection = dataSource.getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute(createSql);
+
+ } // Automatically closes statement
+ catch (SQLException e) {
+ throw new RuntimeException("createTable error:" + e.getMessage());
+ }
+ return true;
+ }
+
+ public boolean updateTable(String updateSql) {
+ try (Connection connection = dataSource.getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute(updateSql);
+
+ } // Automatically closes statement
+ catch (SQLException e) {
+ throw new RuntimeException("updateTable error:" + e.getMessage());
+ }
+ return true;
+ }
+
+ public List getColumnList(String tableName) {
+ List columnList = Lists.newArrayList();
+ try {
+ Connection connection = dataSource.getConnection();
+ DatabaseMetaData metaData = connection.getMetaData();
+ try (ResultSet resultSet = metaData.getColumns(null, null, tableName, null)) {
+ while (resultSet.next()) {
+ String columnName = resultSet.getString("COLUMN_NAME");
+ columnList.add(columnName);
+ }
+ }
+ } catch (Exception e) {
+ log.error("getColumnList error,tableName:{}", tableName, e);
+ }
+ return columnList;
+ }
+
+ public boolean deleteTable(String deleteSql) {
+ try (Connection connection = dataSource.getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute(deleteSql);
+
+ } // Automatically closes statement
+ catch (SQLException e) {
+ throw new RuntimeException("deleteTable error:" + e.getMessage());
+ }
+ return true;
+ }
+
+ private void processBatch(Connection connection, String tableName, List columnList, List