Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optimize insert in java sdk #3525

Merged
merged 11 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@
}
Type.DataType type = metaData.getSchema().get(idx).getDataType();
if (type == Type.DataType.kVarchar || type == Type.DataType.kString) {
if (settedValue.at(idx)) {
return false;

Check warning on line 217 in java/openmldb-common/src/main/java/com/_4paradigm/openmldb/common/codec/FlexibleRowBuilder.java

View check run for this annotation

Codecov / codecov/patch

java/openmldb-common/src/main/java/com/_4paradigm/openmldb/common/codec/FlexibleRowBuilder.java#L217

Added line #L217 was not covered by tests
}
if (idx != metaData.getStrIdxList().get(curStrIdx)) {
if (stringValueCache == null) {
stringValueCache = new TreeMap<>();
Expand Down
5 changes: 5 additions & 0 deletions java/openmldb-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@
<artifactId>snappy-java</artifactId>
<version>1.1.7.2</version>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.9.3</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.testng/testng -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,53 +18,34 @@

import static com._4paradigm.openmldb.sdk.impl.Util.sqlTypeToString;

import com._4paradigm.openmldb.DataType;
import com._4paradigm.openmldb.Schema;
import com._4paradigm.openmldb.common.Pair;
import com._4paradigm.openmldb.sdk.Common;
import com._4paradigm.openmldb.sdk.Schema;

import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.List;

public class SQLInsertMetaData implements ResultSetMetaData {

private final List<DataType> schema;
private final Schema realSchema;
private final List<Pair<Long, Integer>> idx;
private final Schema schema;
private final List<Integer> holeIdx;

public SQLInsertMetaData(List<DataType> schema,
Schema realSchema,
List<Pair<Long, Integer>> idx) {
public SQLInsertMetaData(Schema schema, List<Integer> holeIdx) {
this.schema = schema;
this.realSchema = realSchema;
this.idx = idx;
this.holeIdx = holeIdx;
}

private void checkSchemaNull() throws SQLException {
if (schema == null) {
throw new SQLException("schema is null");
}
}

private void checkIdx(int i) throws SQLException {
if (i <= 0) {
private void check(int i) throws SQLException {
if (i < 0) {
throw new SQLException("index underflow");
}
if (i > schema.size()) {
if (i >= holeIdx.size()) {
throw new SQLException("index overflow");
}
}

public void check(int i) throws SQLException {
checkIdx(i);
checkSchemaNull();
}

@Override
public int getColumnCount() throws SQLException {
checkSchemaNull();
return schema.size();
return holeIdx.size();
}

@Override
Expand Down Expand Up @@ -93,9 +74,10 @@

@Override
public int isNullable(int i) throws SQLException {
check(i);
Long index = idx.get(i - 1).getKey();
if (realSchema.IsColumnNotNull(index)) {
int realIdx = i - 1;
check(realIdx);
boolean nullable = schema.isNullable(holeIdx.get(realIdx));

Check warning on line 79 in java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/jdbc/SQLInsertMetaData.java

View check run for this annotation

Codecov / codecov/patch

java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/jdbc/SQLInsertMetaData.java#L77-L79

Added lines #L77 - L79 were not covered by tests
if (!nullable) {
return columnNoNulls;
} else {
return columnNullable;
Expand All @@ -122,9 +104,9 @@

@Override
public String getColumnName(int i) throws SQLException {
check(i);
Long index = idx.get(i - 1).getKey();
return realSchema.GetColumnName(index);
int realIdx = i - 1;
check(realIdx);
return schema.getColumnName(holeIdx.get(realIdx));
}

@Override
Expand Down Expand Up @@ -159,9 +141,9 @@

@Override
public int getColumnType(int i) throws SQLException {
check(i);
Long index = idx.get(i - 1).getKey();
return Common.type2SqlType(realSchema.GetColumnType(index));
int realIdx = i - 1;
check(realIdx);
return schema.getColumnType(holeIdx.get(realIdx));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,12 @@
spInfo.setDbName(procedureInfo.GetDbName());
spInfo.setProName(procedureInfo.GetSpName());
spInfo.setSql(procedureInfo.GetSql());
spInfo.setInputSchema(convertSchema(procedureInfo.GetInputSchema()));
spInfo.setOutputSchema(convertSchema(procedureInfo.GetOutputSchema()));
com._4paradigm.openmldb.Schema inputSchema = procedureInfo.GetInputSchema();
spInfo.setInputSchema(convertSchema(inputSchema));
inputSchema.delete();
com._4paradigm.openmldb.Schema outputSchema = procedureInfo.GetOutputSchema();
spInfo.setOutputSchema(convertSchema(outputSchema));
outputSchema.delete();

Check warning on line 179 in java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/Common.java

View check run for this annotation

Codecov / codecov/patch

java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/Common.java#L174-L179

Added lines #L174 - L179 were not covered by tests
spInfo.setMainTable(procedureInfo.GetMainTable());
spInfo.setInputTables(procedureInfo.GetTables());
spInfo.setInputDbs(procedureInfo.GetDbs());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@
if (resultSet != null) {
resultSet.delete();
}
queryFuture.delete();
queryFuture = null;

Check warning on line 78 in java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/QueryFuture.java

View check run for this annotation

Codecov / codecov/patch

java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/QueryFuture.java#L77-L78

Added lines #L77 - L78 were not covered by tests
logger.error("call procedure failed: {}", msg);
throw new ExecutionException(new SqlException("call procedure failed: " + msg));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,13 @@ public interface SqlExecutor {
@Deprecated
java.sql.ResultSet executeSQL(String db, String sql);

@Deprecated
SQLInsertRow getInsertRow(String db, String sql);

@Deprecated
SQLInsertRows getInsertRows(String db, String sql);

@Deprecated
ResultSet executeSQLRequest(String db, String sql, SQLRequestRow row);

Statement getStatement();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com._4paradigm.openmldb.sdk.impl;

import com._4paradigm.openmldb.common.zk.ZKClient;
import com._4paradigm.openmldb.proto.NS;
import com._4paradigm.openmldb.sdk.SqlException;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;

import java.util.*;
import java.util.concurrent.TimeUnit;

public class InsertPreparedStatementCache {

private Cache<AbstractMap.SimpleImmutableEntry<String, String>, InsertPreparedStatementMeta> cache;

private ZKClient zkClient;
private NodeCache nodeCache;
private String tablePath;

public InsertPreparedStatementCache(int cacheSize, ZKClient zkClient) throws SqlException {
cache = Caffeine.newBuilder().maximumSize(cacheSize).build();
this.zkClient = zkClient;
if (zkClient != null) {
tablePath = zkClient.getConfig().getNamespace() + "/table/db_table_data";
nodeCache = new NodeCache(zkClient.getClient(), zkClient.getConfig().getNamespace() + "/table/notify");
try {
nodeCache.start();
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
checkAndInvalid();
}
});
} catch (Exception e) {
throw new SqlException("NodeCache exception: " + e.getMessage());

Check warning on line 37 in java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementCache.java

View check run for this annotation

Codecov / codecov/patch

java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementCache.java#L36-L37

Added lines #L36 - L37 were not covered by tests
}
}
}

public InsertPreparedStatementMeta get(String db, String sql) {
return cache.getIfPresent(new AbstractMap.SimpleImmutableEntry<>(db, sql));
}

public void put(String db, String sql, InsertPreparedStatementMeta meta) {
cache.put(new AbstractMap.SimpleImmutableEntry<>(db, sql), meta);
}

public void checkAndInvalid() throws Exception {
if (!zkClient.checkExists(tablePath)) {
return;

Check warning on line 52 in java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementCache.java

View check run for this annotation

Codecov / codecov/patch

java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementCache.java#L52

Added line #L52 was not covered by tests
}
List<String> children = zkClient.getChildren(tablePath);
Map<AbstractMap.SimpleImmutableEntry<String, String>, InsertPreparedStatementMeta> view = cache.asMap();
Map<AbstractMap.SimpleImmutableEntry<String, String>, Integer> tableMap = new HashMap<>();
for (String path : children) {
byte[] bytes = zkClient.getClient().getData().forPath(tablePath + "/" + path);
NS.TableInfo tableInfo = NS.TableInfo.parseFrom(bytes);
tableMap.put(new AbstractMap.SimpleImmutableEntry<>(tableInfo.getDb(), tableInfo.getName()), tableInfo.getTid());
}
Iterator<Map.Entry<AbstractMap.SimpleImmutableEntry<String, String>, InsertPreparedStatementMeta>> iterator
= view.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<AbstractMap.SimpleImmutableEntry<String, String>, InsertPreparedStatementMeta> entry = iterator.next();
String db = entry.getKey().getKey();
InsertPreparedStatementMeta meta = entry.getValue();
String name = meta.getName();
Integer tid = tableMap.get(new AbstractMap.SimpleImmutableEntry<>(db, name));
if (tid != null && tid != meta.getTid()) {
cache.invalidate(entry.getKey());

Check warning on line 71 in java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementCache.java

View check run for this annotation

Codecov / codecov/patch

java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementCache.java#L71

Added line #L71 was not covered by tests
}
}
}
}
Loading
Loading