From cb2f0bd56af9dd10e444c6543c88d7df701697a1 Mon Sep 17 00:00:00 2001 From: huangwei Date: Fri, 1 Jul 2022 15:45:04 +0800 Subject: [PATCH] feat: support custom order in insert stmt --- .../com/_4paradigm/openmldb/common/Pair.java | 104 +++ .../openmldb/jdbc/SQLInsertMetaData.java | 19 +- .../sdk/impl/InsertPreparedStatementImpl.java | 43 +- .../openmldb/jdbc/JDBCDriverTest.java | 12 +- .../openmldb/jdbc/SQLRouterSmokeTest.java | 33 +- src/sdk/sql_cluster_router.cc | 88 +-- src/sdk/sql_cluster_router.h | 119 ++-- src/sdk/sql_cluster_test.cc | 36 +- src/sdk/sql_insert_row.cc | 34 +- src/sdk/sql_insert_row.h | 76 +- src/sdk/sql_router_test.cc | 662 +++++++++--------- 11 files changed, 710 insertions(+), 516 deletions(-) create mode 100644 java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/common/Pair.java diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/common/Pair.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/common/Pair.java new file mode 100644 index 00000000000..e17b2979226 --- /dev/null +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/common/Pair.java @@ -0,0 +1,104 @@ +package com._4paradigm.openmldb.common; + +import java.io.Serializable; + +public class Pair implements Serializable { + + /** + * Key of this Pair. + */ + private K key; + + /** + * Gets the key for this pair. + * + * @return key for this pair + */ + public K getKey() { + return key; + } + + /** + * Value of this this Pair. + */ + private V value; + + /** + * Gets the value for this pair. + * + * @return value for this pair + */ + public V getValue() { + return value; + } + + /** + * Creates a new pair + * + * @param key The key for this pair + * @param value The value to use for this pair + */ + public Pair(K key, V value) { + this.key = key; + this.value = value; + } + + /** + *

String representation of this + * Pair.

+ * + *

The default name/value delimiter '=' is always used.

+ * + * @return String representation of this Pair + */ + @Override + public String toString() { + return key + "=" + value; + } + + /** + *

Generate a hash code for this Pair.

+ * + *

The hash code is calculated using both the name and + * the value of the Pair.

+ * + * @return hash code for this Pair + */ + @Override + public int hashCode() { + // name's hashCode is multiplied by an arbitrary prime number (13) + // in order to make sure there is a difference in the hashCode between + // these two parameters: + // name: a value: aa + // name: aa value: a + return key.hashCode() * 13 + (value == null ? 0 : value.hashCode()); + } + + /** + *

Test this Pair for equality with another + * Object.

+ * + *

If the Object to be tested is not a + * Pair or is null, then this method + * returns false.

+ * + *

Two Pairs are considered equal if and only if + * both the names and values are equal.

+ * + * @param o the Object to test for + * equality with this Pair + * @return true if the given Object is + * equal to this Pair else false + */ + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o instanceof Pair) { + Pair pair = (Pair) o; + if (key != null ? !key.equals(pair.key) : pair.key != null) return false; + if (value != null ? !value.equals(pair.value) : pair.value != null) return false; + return true; + } + return false; + } +} \ No newline at end of file diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/jdbc/SQLInsertMetaData.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/jdbc/SQLInsertMetaData.java index b8bdea50726..e4ccd903146 100644 --- a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/jdbc/SQLInsertMetaData.java +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/jdbc/SQLInsertMetaData.java @@ -16,8 +16,11 @@ package com._4paradigm.openmldb.jdbc; +import static com._4paradigm.openmldb.sdk.impl.Util.sqlTypeToString; + import com._4paradigm.openmldb.DataType; import com._4paradigm.openmldb.Schema; +import com._4paradigm.openmldb.common.Pair; import com._4paradigm.openmldb.sdk.Common; import java.sql.ResultSetMetaData; @@ -28,10 +31,11 @@ public class SQLInsertMetaData implements ResultSetMetaData { private final List schema; private final Schema realSchema; - private final List idx; + private final List> idx; + public SQLInsertMetaData(List schema, Schema realSchema, - List idx) { + List> idx) { this.schema = schema; this.realSchema = realSchema; this.idx = idx; @@ -90,7 +94,7 @@ public boolean isCurrency(int i) throws SQLException { @Override public int isNullable(int i) throws SQLException { check(i); - int index = idx.get(i - 1); + Long index = idx.get(i - 1).getKey(); if (realSchema.IsColumnNotNull(index)) { return columnNoNulls; } else { @@ -119,7 +123,7 @@ public String getColumnLabel(int i) throws SQLException { @Override public String getColumnName(int i) throws SQLException { check(i); - int index = idx.get(i - 1); + Long index = idx.get(i - 1).getKey(); return realSchema.GetColumnName(index); } @@ -156,14 +160,13 @@ public String getCatalogName(int i) throws SQLException { @Override public int getColumnType(int i) throws SQLException { check(i); - DataType dataType = schema.get(i - 1); - return Common.type2SqlType(dataType); + Long index = idx.get(i - 1).getKey(); + return Common.type2SqlType(realSchema.GetColumnType(index)); } @Override - @Deprecated public String getColumnTypeName(int i) throws SQLException { - throw new SQLException("current do not support this method"); + return sqlTypeToString(getColumnType(i)); } @Override diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementImpl.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementImpl.java index ecd4b30952c..01e39c4b835 100644 --- a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementImpl.java +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementImpl.java @@ -18,6 +18,7 @@ import com._4paradigm.openmldb.*; +import com._4paradigm.openmldb.common.Pair; import com._4paradigm.openmldb.jdbc.SQLInsertMetaData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +33,7 @@ import java.sql.Date; import java.sql.ResultSet; import java.util.*; +import java.util.stream.Collectors; public class InsertPreparedStatementImpl implements PreparedStatement { public static final Charset CHARSET = StandardCharsets.UTF_8; @@ -48,7 +50,8 @@ public class InsertPreparedStatementImpl implements PreparedStatement { private final List currentDatas; private final List currentDatasType; private final List hasSet; - private final List scehmaIdxs; + // stmt insert idx -> real table schema idx + private final List> schemaIdxes; private boolean closed = false; private boolean closeOnComplete = false; @@ -63,17 +66,24 @@ public InsertPreparedStatementImpl(String db, String sql, SQLRouter router) thro this.currentSchema = tempRow.GetSchema(); VectorUint32 idxes = tempRow.GetHoleIdx(); + // In stmt order, if no columns in stmt, in schema order + // We'll sort it to schema order later, so needs the map + schemaIdxes = new ArrayList<>(idxes.size()); + for (int i = 0; i < idxes.size(); i++) { + schemaIdxes.add(new Pair<>(idxes.get(i), i)); + } + currentDatas = new ArrayList<>(idxes.size()); currentDatasType = new ArrayList<>(idxes.size()); hasSet = new ArrayList<>(idxes.size()); - scehmaIdxs = new ArrayList<>(idxes.size()); - for (int i = 0; i < idxes.size(); i++) { - long idx = idxes.get(i); + // CurrentDatas and Type order is consistent with insert stmt. We'll do appending in schema order when build + // row. + for (Pair pair : schemaIdxes) { + Long idx = pair.getKey(); DataType type = currentSchema.GetColumnType(idx); currentDatasType.add(type); currentDatas.add(null); hasSet.add(false); - scehmaIdxs.add(i); } } @@ -118,14 +128,14 @@ private void checkIdx(int i) throws SQLException { if (i <= 0) { throw new SQLException("error sqe number"); } - if (i > scehmaIdxs.size()) { + if (i > schemaIdxes.size()) { throw new SQLException("out of data range"); } } private void checkType(int i, DataType type) throws SQLException { if (currentDatasType.get(i - 1) != type) { - throw new SQLException("data type not match"); + throw new SQLException("data type not match, expect " + currentDatasType.get(i - 1) + ", actual " + type); } } @@ -206,7 +216,7 @@ public void setBigDecimal(int i, BigDecimal bigDecimal) throws SQLException { } private boolean checkNotAllowNull(int i) { - long idx = this.scehmaIdxs.get(i - 1); + Long idx = this.schemaIdxes.get(i - 1).getKey(); return this.currentSchema.IsColumnNotNull(idx); } @@ -300,22 +310,26 @@ public void setObject(int i, Object o, int i1) throws SQLException { private void buildRow() throws SQLException { SQLInsertRow currentRow = getSQLInsertRow(); - boolean ok = currentRow.Init(stringsLen); if (!ok) { throw new SQLException("init row failed"); } - for (int i = 0; i < currentDatasType.size(); i++) { - Object data = currentDatas.get(i); + // SQLInsertRow::AppendXXX order is the schema order(skip the no-hole columns) + List> sortedIdxes = schemaIdxes.stream().sorted(Comparator.comparing(Pair::getKey)) + .collect(Collectors.toList()); + + for (Pair sortedIdx : sortedIdxes) { + Integer currentDataIdx = sortedIdx.getValue(); + Object data = currentDatas.get(currentDataIdx); if (data == null) { ok = currentRow.AppendNULL(); } else { - DataType curType = currentDatasType.get(i); + DataType curType = currentDatasType.get(currentDataIdx); if (DataType.kTypeBool.equals(curType)) { ok = currentRow.AppendBool((boolean) data); } else if (DataType.kTypeDate.equals(curType)) { - java.sql.Date date = (java.sql.Date) data; + Date date = (Date) data; ok = currentRow.AppendDate(date.getYear() + 1900, date.getMonth() + 1, date.getDate()); } else if (DataType.kTypeDouble.equals(curType)) { ok = currentRow.AppendDouble((double) data); @@ -423,9 +437,8 @@ public void setArray(int i, Array array) throws SQLException { } @Override - @Deprecated public ResultSetMetaData getMetaData() throws SQLException { - return new SQLInsertMetaData(this.currentDatasType, this.currentSchema, this.scehmaIdxs); + return new SQLInsertMetaData(this.currentDatasType, this.currentSchema, this.schemaIdxes); } @Override diff --git a/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/JDBCDriverTest.java b/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/JDBCDriverTest.java index 07a0e41c1af..8a2fa0a276a 100644 --- a/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/JDBCDriverTest.java +++ b/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/JDBCDriverTest.java @@ -23,6 +23,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.sql.Types; import java.util.ArrayList; import java.util.List; import java.util.stream.IntStream; @@ -183,7 +184,7 @@ public void testForKafkaConnector() throws SQLException { String tableName = "kafka_test"; stmt = connection.createStatement(); try { - stmt.execute(String.format("create table if not exists %s(c1 int, c2 string)", tableName)); + stmt.execute(String.format("create table if not exists %s(c1 int, c2 string, c3 timestamp)", tableName)); } catch (Exception e) { Assert.fail(); } @@ -198,6 +199,15 @@ public void testForKafkaConnector() throws SQLException { pstmt.setFetchSize(100); pstmt.addBatch(); + insertSql = "INSERT INTO " + + tableName + + "(`c3`,`c2`) VALUES(?,?)"; + pstmt = connection.prepareStatement(insertSql); + Assert.assertEquals(pstmt.getMetaData().getColumnCount(), 2); + // index starts from 1 + Assert.assertEquals(pstmt.getMetaData().getColumnType(2), Types.VARCHAR); + Assert.assertEquals(pstmt.getMetaData().getColumnName(2), "c2"); + try { stmt = connection.prepareStatement("DELETE FROM " + tableName + " WHERE c1=1"); diff --git a/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/SQLRouterSmokeTest.java b/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/SQLRouterSmokeTest.java index 29e9cae2758..63b5f43ce57 100644 --- a/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/SQLRouterSmokeTest.java +++ b/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/SQLRouterSmokeTest.java @@ -31,6 +31,7 @@ import org.testng.collections.Maps; import java.sql.PreparedStatement; +import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Types; import java.util.ArrayList; @@ -71,7 +72,7 @@ public class SQLRouterSmokeTest { @DataProvider(name = "executor") public Object[] executor() { - return new Object[] {clusterExecutor, standaloneExecutor}; + return new Object[]{clusterExecutor, standaloneExecutor}; } @Test(dataProvider = "executor") @@ -153,7 +154,7 @@ public void testSmoke(SqlExecutor router) { Assert.assertEquals("kTypeString", rs3.GetInternalSchema().GetColumnType(0).toString()); List col2InsertRes = new ArrayList<>(); - while(rs3.next()) { + while (rs3.next()) { col2InsertRes.add(rs3.getString(1)); } Collections.sort(col2InsertRes); @@ -319,7 +320,8 @@ public void testInsertPreparedState(SqlExecutor router) { router.dropDB(dbname); boolean ok = router.createDB(dbname); Assert.assertTrue(ok); - String ddl = "create table tsql1010 ( col1 bigint, col2 date, col3 string, col4 string, col5 int, index(key=col3, ts=col1));"; + String ddl = "create table tsql1010 ( col1 bigint, col2 date, col3 string, col4 string, col5 int," + + " index(key=col3, ts=col1));"; // create table ok = router.executeDDL(dbname, ddl); Assert.assertTrue(ok); @@ -334,11 +336,11 @@ public void testInsertPreparedState(SqlExecutor router) { ok = router.executeInsert(dbname, fullInsert); Assert.assertTrue(ok); Object[][] datas = new Object[][]{ - {1000l, d1, "guangdong", "广州", 1}, - {1001l, d2, "jiangsu", "nanjing", 2}, - {1002l, d3, "sandong", "jinan", 3}, - {1003l, d4, "zhejiang", "hangzhou", 4}, - {1004l, d5, "henan", "zhenzhou", 5}, + {1000L, d1, "guangdong", "广州", 1}, + {1001L, d2, "jiangsu", "nanjing", 2}, + {1002L, d3, "sandong", "jinan", 3}, + {1003L, d4, "zhejiang", "hangzhou", 4}, + {1004L, d5, "henan", "zhenzhou", 5}, }; // insert placeholder String date2 = String.format("%s-%s-%s", d2.getYear() + 1900, d2.getMonth() + 1, d2.getDate()); @@ -352,15 +354,22 @@ public void testInsertPreparedState(SqlExecutor router) { } ok = impl.execute(); Assert.assertTrue(ok); - insert = "insert into tsql1010 values(1002, ?, ?, 'jinan', 3);"; + + // custom insert order + insert = "insert into tsql1010 (col1, col3, col2, col4, col5) values (1002, ?, ?, 'jinan', 3);"; PreparedStatement impl2 = router.getInsertPreparedStmt(dbname, insert); + ResultSetMetaData metaData = impl2.getMetaData(); + Assert.assertEquals(metaData.getColumnCount(), 2); + Assert.assertEquals(metaData.getColumnName(1), "col3"); + Assert.assertEquals(metaData.getColumnType(1), Types.VARCHAR); + Assert.assertEquals(metaData.getColumnTypeName(1), "string"); try { - impl2.setString(1, "c"); + impl2.setString(2, "c"); } catch (Exception e) { Assert.assertEquals("data type not match", e.getMessage()); } - impl2.setDate(1, d3); - impl2.setString(2, "sandong"); + impl2.setString(1, "sandong"); + impl2.setDate(2, d3); ok = impl2.execute(); Assert.assertTrue(ok); diff --git a/src/sdk/sql_cluster_router.cc b/src/sdk/sql_cluster_router.cc index 91f63f6df48..41ebacb7a9c 100644 --- a/src/sdk/sql_cluster_router.cc +++ b/src/sdk/sql_cluster_router.cc @@ -342,20 +342,25 @@ std::shared_ptr SQLClusterRouter::GetInsertRow(const std::string& if (cache) { status->code = 0; return std::make_shared(cache->table_info, cache->column_schema, cache->default_map, - cache->str_length); + cache->str_length, cache->hole_idx_arr); } std::shared_ptr<::openmldb::nameserver::TableInfo> table_info; DefaultValueMap default_map; uint32_t str_length = 0; - if (!GetInsertInfo(db, sql, status, &table_info, &default_map, &str_length)) { + std::vector stmt_column_idx_arr; + if (!GetInsertInfo(db, sql, status, &table_info, &default_map, &str_length, &stmt_column_idx_arr)) { status->code = 1; LOG(WARNING) << "get insert information failed"; return {}; } - cache = std::make_shared(table_info, default_map, str_length, 0); + cache = std::make_shared( + table_info, default_map, str_length, + SQLInsertRow::GetHoleIdxArr(default_map, stmt_column_idx_arr, openmldb::sdk::ConvertToSchema(table_info))); SetCache(db, sql, hybridse::vm::kBatchMode, cache); - return std::make_shared(table_info, cache->column_schema, default_map, str_length); + return std::make_shared(table_info, cache->column_schema, default_map, str_length, + cache->hole_idx_arr); } + bool SQLClusterRouter::GetMultiRowInsertInfo(const std::string& db, const std::string& sql, ::hybridse::sdk::Status* status, std::shared_ptr<::openmldb::nameserver::TableInfo>* table_info, @@ -460,17 +465,19 @@ bool SQLClusterRouter::GetMultiRowInsertInfo(const std::string& db, const std::s } return true; } + bool SQLClusterRouter::GetInsertInfo(const std::string& db, const std::string& sql, ::hybridse::sdk::Status* status, std::shared_ptr<::openmldb::nameserver::TableInfo>* table_info, - DefaultValueMap* default_map, uint32_t* str_length) { - if (status == NULL || table_info == NULL || default_map == NULL || str_length == NULL) { + DefaultValueMap* default_map, uint32_t* str_length, + std::vector* stmt_column_idx_in_table) { + if (status == nullptr || table_info == nullptr || default_map == nullptr || str_length == nullptr) { LOG(WARNING) << "insert info is null" << sql; return false; } ::hybridse::node::NodeManager nm; ::hybridse::plan::PlanNodeList plans; bool ok = GetSQLPlan(sql, &nm, &plans); - if (!ok || plans.size() == 0) { + if (!ok || plans.empty()) { LOG(WARNING) << "fail to get sql plan with sql " << sql; status->msg = "fail to get sql plan with"; return false; @@ -483,7 +490,7 @@ bool SQLClusterRouter::GetInsertInfo(const std::string& db, const std::string& s } auto* iplan = dynamic_cast<::hybridse::node::InsertPlanNode*>(plan); const ::hybridse::node::InsertStmt* insert_stmt = iplan->GetInsertNode(); - if (insert_stmt == NULL) { + if (insert_stmt == nullptr) { LOG(WARNING) << "insert stmt is null"; status->msg = "insert stmt is null"; return false; @@ -494,6 +501,7 @@ bool SQLClusterRouter::GetInsertInfo(const std::string& db, const std::string& s LOG(WARNING) << status->msg; return false; } + // std::map column_map; for (size_t j = 0; j < insert_stmt->columns_.size(); ++j) { const std::string& col_name = insert_stmt->columns_[j]; @@ -506,6 +514,7 @@ bool SQLClusterRouter::GetInsertInfo(const std::string& db, const std::string& s return false; } column_map.insert(std::make_pair(i, j)); + stmt_column_idx_in_table->emplace_back(i); find_flag = true; break; } @@ -526,7 +535,7 @@ bool SQLClusterRouter::GetInsertInfo(const std::string& db, const std::string& s return true; } -DefaultValueMap SQLClusterRouter::GetDefaultMap(std::shared_ptr<::openmldb::nameserver::TableInfo> table_info, +DefaultValueMap SQLClusterRouter::GetDefaultMap(const std::shared_ptr<::openmldb::nameserver::TableInfo>& table_info, const std::map& column_map, ::hybridse::node::ExprListNode* row, uint32_t* str_length) { if (row == nullptr || str_length == nullptr) { @@ -570,7 +579,7 @@ DefaultValueMap SQLClusterRouter::GetDefaultMap(std::shared_ptr<::openmldb::name } if (hybridse::node::kExprPrimary == row->children_.at(i)->GetExprType()) { - ::hybridse::node::ConstNode* primary = dynamic_cast<::hybridse::node::ConstNode*>(row->children_.at(i)); + auto* primary = dynamic_cast<::hybridse::node::ConstNode*>(row->children_.at(i)); std::shared_ptr<::hybridse::node::ConstNode> val; if (primary->IsNull()) { if (column.not_null()) { @@ -653,17 +662,21 @@ std::shared_ptr SQLClusterRouter::GetInsertRows(const std::string if (cache) { status->code = 0; return std::make_shared(cache->table_info, cache->column_schema, cache->default_map, - cache->str_length); + cache->str_length, cache->hole_idx_arr); } std::shared_ptr<::openmldb::nameserver::TableInfo> table_info; DefaultValueMap default_map; uint32_t str_length = 0; - if (!GetInsertInfo(db, sql, status, &table_info, &default_map, &str_length)) { + std::vector stmt_column_idx_arr; + if (!GetInsertInfo(db, sql, status, &table_info, &default_map, &str_length, &stmt_column_idx_arr)) { return {}; } - cache = std::make_shared(table_info, default_map, str_length); + cache = std::make_shared( + table_info, default_map, str_length, + SQLInsertRow::GetHoleIdxArr(default_map, stmt_column_idx_arr, openmldb::sdk::ConvertToSchema(table_info))); SetCache(db, sql, hybridse::vm::kBatchMode, cache); - return std::make_shared(table_info, cache->column_schema, default_map, str_length); + return std::make_shared(table_info, cache->column_schema, default_map, str_length, + cache->hole_idx_arr); } bool SQLClusterRouter::ExecuteDDL(const std::string& db, const std::string& sql, hybridse::sdk::Status* status) { @@ -1575,13 +1588,11 @@ std::shared_ptr SQLClusterRouter::HandleSQLCmd(const h } std::vector vec = {fun_info.name(), openmldb::type::DataType_Name(fun_info.return_type()).substr(1), - arg_type, - is_aggregate, - fun_info.file()}; + arg_type, is_aggregate, fun_info.file()}; lines.push_back(vec); } - return ResultSetSQL::MakeResultSet( - {"Name", "Return_type", "Arg_type", "Is_aggregate", "File"}, lines, status); + return ResultSetSQL::MakeResultSet({"Name", "Return_type", "Arg_type", "Is_aggregate", "File"}, lines, + status); } case hybridse::node::kCmdDropFunction: { std::string name = cmd_node->GetArgs()[0]; @@ -2161,9 +2172,8 @@ ::openmldb::base::Status SQLClusterRouter::ExecuteOfflineQuery(const std::string return taskmanager_client_ptr->RunBatchAndShow(sql, config, default_db, sync_job, job_info); } -::openmldb::base::Status SQLClusterRouter::ExecuteOfflineQueryGetOutput(const std::string& sql, - const std::map& config, - const std::string& default_db, +::openmldb::base::Status SQLClusterRouter::ExecuteOfflineQueryGetOutput( + const std::string& sql, const std::map& config, const std::string& default_db, std::string& output) { auto taskmanager_client_ptr = cluster_sdk_->GetTaskManagerClient(); if (!taskmanager_client_ptr) { @@ -3110,8 +3120,8 @@ hybridse::sdk::Status SQLClusterRouter::GetNewIndex( // update ttl auto ns_ptr = cluster_sdk_->GetNsClient(); std::string err; - bool ok = ns_ptr->UpdateTTL(table_name, type, new_abs_ttl, new_lat_ttl, - column_key.index_name(), err); + bool ok = + ns_ptr->UpdateTTL(table_name, type, new_abs_ttl, new_lat_ttl, column_key.index_name(), err); if (!ok) { return {::hybridse::common::StatusCode::kCmdError, "update ttl failed"}; } @@ -3119,7 +3129,7 @@ hybridse::sdk::Status SQLClusterRouter::GetNewIndex( } } else { column_key.set_index_name("INDEX_" + std::to_string(cur_index_num + add_index_num) + "_" + - std::to_string(::baidu::common::timer::now_time())); + std::to_string(::baidu::common::timer::now_time())); add_index_num++; new_indexs.emplace_back(column_key); } @@ -3199,8 +3209,7 @@ hybridse::sdk::Status SQLClusterRouter::AddNewIndex( } hybridse::sdk::Status SQLClusterRouter::HandleLongWindows( - const hybridse::node::DeployPlanNode* deploy_node, - const std::set>& table_pair, + const hybridse::node::DeployPlanNode* deploy_node, const std::set>& table_pair, const std::string& select_sql) { auto iter = deploy_node->Options()->find(hybridse::vm::LONG_WINDOWS); std::string long_window_param = ""; @@ -3266,14 +3275,12 @@ hybridse::sdk::Status SQLClusterRouter::HandleLongWindows( // insert pre-aggr meta info to meta table std::string aggr_col = lw.aggr_col_ == "*" ? "" : lw.aggr_col_; auto aggr_table = - absl::StrCat("pre_", base_db, "_", deploy_node->Name(), "_", - lw.window_name_, "_", lw.aggr_func_, "_", aggr_col, - lw.filter_col_.empty() ? "" : "_" + lw.filter_col_); - std::string insert_sql = - absl::StrCat("insert into ", meta_db, ".", meta_table, " values('" + aggr_table, "', '", aggr_db, - "', '", base_db, "', '", base_table, "', '", lw.aggr_func_, "', '", lw.aggr_col_, "', '", - lw.partition_col_, "', '", lw.order_col_, "', '", lw.bucket_size_, "', '", - lw.filter_col_, "');"); + absl::StrCat("pre_", base_db, "_", deploy_node->Name(), "_", lw.window_name_, "_", lw.aggr_func_, "_", + aggr_col, lw.filter_col_.empty() ? "" : "_" + lw.filter_col_); + std::string insert_sql = absl::StrCat( + "insert into ", meta_db, ".", meta_table, " values('" + aggr_table, "', '", aggr_db, "', '", base_db, + "', '", base_table, "', '", lw.aggr_func_, "', '", lw.aggr_col_, "', '", lw.partition_col_, "', '", + lw.order_col_, "', '", lw.bucket_size_, "', '", lw.filter_col_, "');"); bool ok = ExecuteInsert("", insert_sql, &status); if (!ok) { return {base::ReturnCode::kError, "insert pre-aggr meta failed"}; @@ -3339,17 +3346,16 @@ hybridse::sdk::Status SQLClusterRouter::HandleLongWindows( return {}; } -bool SQLClusterRouter::CheckPreAggrTableExist(const std::string& base_table, - const std::string& base_db, +bool SQLClusterRouter::CheckPreAggrTableExist(const std::string& base_table, const std::string& base_db, const openmldb::base::LongWindowInfo& lw, ::hybridse::sdk::Status* status) { std::string meta_db = openmldb::nameserver::INTERNAL_DB; std::string meta_table = openmldb::nameserver::PRE_AGG_META_NAME; std::string filter_cond = lw.filter_col_.empty() ? "" : " and filter_col = '" + lw.filter_col_ + "'"; - std::string meta_info = absl::StrCat( - "base_db = '", base_db, "' and base_table = '", base_table, "' and aggr_func = '", lw.aggr_func_, - "' and aggr_col = '", lw.aggr_col_, "' and partition_cols = '", lw.partition_col_, - "' and order_by_col = '", lw.order_col_, "'", filter_cond); + std::string meta_info = + absl::StrCat("base_db = '", base_db, "' and base_table = '", base_table, "' and aggr_func = '", lw.aggr_func_, + "' and aggr_col = '", lw.aggr_col_, "' and partition_cols = '", lw.partition_col_, + "' and order_by_col = '", lw.order_col_, "'", filter_cond); std::string select_sql = absl::StrCat("select bucket_size from ", meta_db, ".", meta_table, " where ", meta_info, ";"); auto rs = ExecuteSQL("", select_sql, status); diff --git a/src/sdk/sql_cluster_router.h b/src/sdk/sql_cluster_router.h index b5283e11e95..c552960b5df 100644 --- a/src/sdk/sql_cluster_router.h +++ b/src/sdk/sql_cluster_router.h @@ -21,29 +21,28 @@ #include #include #include +#include #include #include -#include #include "base/ddl_parser.h" +#include "base/lru_cache.h" #include "base/random.h" #include "base/spinlock.h" -#include "base/lru_cache.h" #include "client/tablet_client.h" +#include "nameserver/system_table.h" #include "sdk/db_sdk.h" #include "sdk/sql_router.h" #include "sdk/table_reader_impl.h" -#include "nameserver/system_table.h" -namespace openmldb { -namespace sdk { +namespace openmldb::sdk { typedef ::google::protobuf::RepeatedPtrField<::openmldb::common::ColumnDesc> PBSchema; constexpr const char* FORMAT_STRING_KEY = "!%$FORMAT_STRING_KEY"; static std::shared_ptr<::hybridse::sdk::Schema> ConvertToSchema( - std::shared_ptr<::openmldb::nameserver::TableInfo> table_info) { + const std::shared_ptr<::openmldb::nameserver::TableInfo>& table_info) { ::hybridse::vm::Schema schema; for (const auto& column_desc : table_info->column_desc()) { ::hybridse::type::ColumnDef* column_def = schema.Add(); @@ -55,32 +54,40 @@ static std::shared_ptr<::hybridse::sdk::Schema> ConvertToSchema( } struct SQLCache { - SQLCache(std::shared_ptr<::openmldb::nameserver::TableInfo> table_info, DefaultValueMap default_map, - uint32_t str_length, uint32_t limit_cnt = 0) - : table_info(table_info), default_map(default_map), column_schema(), - str_length(str_length), limit_cnt(limit_cnt) { + // for insert row + SQLCache(const std::shared_ptr<::openmldb::nameserver::TableInfo>& table_info, DefaultValueMap default_map, + uint32_t str_length, std::vector hole_idx_arr, uint32_t limit_cnt = 0) + : table_info(table_info), + default_map(std::move(default_map)), + column_schema(), + str_length(str_length), + hole_idx_arr(std::move(hole_idx_arr)), + limit_cnt(limit_cnt) { column_schema = openmldb::sdk::ConvertToSchema(table_info); } + SQLCache(std::shared_ptr<::hybridse::sdk::Schema> column_schema, const ::hybridse::vm::Router& input_router, uint32_t limit_cnt = 0) : table_info(), default_map(), - column_schema(column_schema), + column_schema(std::move(column_schema)), parameter_schema(), str_length(0), limit_cnt(limit_cnt), router(input_router) {} + SQLCache(std::shared_ptr<::hybridse::sdk::Schema> column_schema, std::shared_ptr<::hybridse::sdk::Schema> parameter_schema, const ::hybridse::vm::Router& input_router, uint32_t limit_cnt = 0) : table_info(), default_map(), - column_schema(column_schema), - parameter_schema(parameter_schema), + column_schema(std::move(column_schema)), + parameter_schema(std::move(parameter_schema)), str_length(0), limit_cnt(limit_cnt), router(input_router) {} - bool IsCompatibleCache(std::shared_ptr<::hybridse::sdk::Schema> other_parameter_schema) { + + bool IsCompatibleCache(const std::shared_ptr<::hybridse::sdk::Schema>& other_parameter_schema) const { if (!parameter_schema && !other_parameter_schema) { return true; } @@ -103,6 +110,7 @@ struct SQLCache { std::shared_ptr<::hybridse::sdk::Schema> column_schema; std::shared_ptr<::hybridse::sdk::Schema> parameter_schema; uint32_t str_length; + std::vector hole_idx_arr; uint32_t limit_cnt; ::hybridse::vm::Router router; }; @@ -183,7 +191,6 @@ class SQLClusterRouter : public SQLRouter { std::shared_ptr ExecuteShowApiServers(hybridse::sdk::Status* status); - bool RefreshCatalog() override; std::shared_ptr CallProcedure(const std::string& db, const std::string& sp_name, @@ -204,23 +211,25 @@ class SQLClusterRouter : public SQLRouter { std::shared_ptr CallProcedure(const std::string& db, const std::string& sp_name, int64_t timeout_ms, std::shared_ptr row, - hybridse::sdk::Status* status); + hybridse::sdk::Status* status) override; std::shared_ptr CallSQLBatchRequestProcedure( const std::string& db, const std::string& sp_name, int64_t timeout_ms, std::shared_ptr row_batch, hybridse::sdk::Status* status) override; std::shared_ptr<::openmldb::client::TabletClient> GetTabletClient(const std::string& db, const std::string& sql, - const ::hybridse::vm::EngineMode engine_mode, + ::hybridse::vm::EngineMode engine_mode, const std::shared_ptr& row, - hybridse::sdk::Status& status); // NOLINT + hybridse::sdk::Status& status); // NOLINT std::shared_ptr<::openmldb::client::TabletClient> GetTabletClient( - const std::string& db, const std::string& sql, const ::hybridse::vm::EngineMode engine_mode, + const std::string& db, const std::string& sql, ::hybridse::vm::EngineMode engine_mode, const std::shared_ptr& row, const std::shared_ptr& parameter_row, - hybridse::sdk::Status& status); // NOLINT - std::shared_ptr GetSQLCache( - const std::string& db, const std::string& sql, const ::hybridse::vm::EngineMode engine_mode, - const std::shared_ptr& parameter_row, hybridse::sdk::Status& status); // NOLINT + hybridse::sdk::Status& status); // NOLINT + + std::shared_ptr GetSQLCache(const std::string& db, const std::string& sql, + ::hybridse::vm::EngineMode engine_mode, + const std::shared_ptr& parameter_row, + hybridse::sdk::Status& status); // NOLINT std::shared_ptr<::openmldb::client::TabletClient> GetTabletClientForBatchQuery( const std::string& db, const std::string& sql, const std::shared_ptr& parameter_row, @@ -236,7 +245,7 @@ class SQLClusterRouter : public SQLRouter { std::shared_ptr<::openmldb::client::NsClient> ns_ptr); std::shared_ptr HandleSQLCmd(const hybridse::node::CmdPlanNode* cmd_node, - const std::string& db, ::hybridse::sdk::Status* status); + const std::string& db, ::hybridse::sdk::Status* status); std::vector GetTableNames(const std::string& db) override; @@ -244,45 +253,41 @@ class SQLClusterRouter : public SQLRouter { bool UpdateOfflineTableInfo(const ::openmldb::nameserver::TableInfo& info) override; - ::openmldb::base::Status ShowJobs(const bool only_unfinished, + ::openmldb::base::Status ShowJobs(bool only_unfinished, std::vector<::openmldb::taskmanager::JobInfo>& job_infos) override; - ::openmldb::base::Status ShowJob(const int id, ::openmldb::taskmanager::JobInfo& job_info) override; + ::openmldb::base::Status ShowJob(int id, ::openmldb::taskmanager::JobInfo& job_info) override; - ::openmldb::base::Status StopJob(const int id, ::openmldb::taskmanager::JobInfo& job_info) override; + ::openmldb::base::Status StopJob(int id, ::openmldb::taskmanager::JobInfo& job_info) override; ::openmldb::base::Status ExecuteOfflineQuery(const std::string& sql, const std::map& config, const std::string& default_db, bool sync_job, - ::openmldb::taskmanager::JobInfo& job_info) override; // NOLINT + ::openmldb::taskmanager::JobInfo& job_info) override; // NOLINT ::openmldb::base::Status ExecuteOfflineQueryGetOutput(const std::string& sql, const std::map& config, const std::string& default_db, - std::string& output); // NOLINT + std::string& output); // NOLINT - ::openmldb::base::Status ImportOnlineData(const std::string& sql, - const std::map& config, + ::openmldb::base::Status ImportOnlineData(const std::string& sql, const std::map& config, const std::string& default_db, bool sync_job, ::openmldb::taskmanager::JobInfo& job_info) override; - ::openmldb::base::Status ImportOfflineData(const std::string& sql, - const std::map& config, + ::openmldb::base::Status ImportOfflineData(const std::string& sql, const std::map& config, const std::string& default_db, bool sync_job, ::openmldb::taskmanager::JobInfo& job_info) override; - ::openmldb::base::Status ExportOfflineData(const std::string& sql, - const std::map& config, + ::openmldb::base::Status ExportOfflineData(const std::string& sql, const std::map& config, const std::string& default_db, bool sync_job, ::openmldb::taskmanager::JobInfo& job_info) override; - ::openmldb::base::Status CreatePreAggrTable(const std::string& aggr_db, - const std::string& aggr_table, + ::openmldb::base::Status CreatePreAggrTable(const std::string& aggr_db, const std::string& aggr_table, const ::openmldb::base::LongWindowInfo& window_info, const ::openmldb::nameserver::TableInfo& base_table_info, std::shared_ptr<::openmldb::client::NsClient> ns_ptr); - std::string GetJobLog(const int id, hybridse::sdk::Status* status) override; + std::string GetJobLog(int id, hybridse::sdk::Status* status) override; bool NotifyTableChange() override; @@ -305,21 +310,21 @@ class SQLClusterRouter : public SQLRouter { bool IsConstQuery(::hybridse::vm::PhysicalOpNode* node); std::shared_ptr GetCache(const std::string& db, const std::string& sql, - const hybridse::vm::EngineMode engine_mode); + hybridse::vm::EngineMode engine_mode); - void SetCache(const std::string& db, const std::string& sql, - const hybridse::vm::EngineMode engine_mode, const std::shared_ptr& router_cache); + void SetCache(const std::string& db, const std::string& sql, hybridse::vm::EngineMode engine_mode, + const std::shared_ptr& router_cache); bool GetSQLPlan(const std::string& sql, ::hybridse::node::NodeManager* nm, ::hybridse::node::PlanNodeList* plan); bool GetInsertInfo(const std::string& db, const std::string& sql, ::hybridse::sdk::Status* status, std::shared_ptr<::openmldb::nameserver::TableInfo>* table_info, DefaultValueMap* default_map, - uint32_t* str_length); + uint32_t* str_length, std::vector* stmt_column_idx_in_table); bool GetMultiRowInsertInfo(const std::string& db, const std::string& sql, ::hybridse::sdk::Status* status, std::shared_ptr<::openmldb::nameserver::TableInfo>* table_info, std::vector* default_maps, std::vector* str_lengths); - DefaultValueMap GetDefaultMap(std::shared_ptr<::openmldb::nameserver::TableInfo> table_info, + DefaultValueMap GetDefaultMap(const std::shared_ptr<::openmldb::nameserver::TableInfo>& table_info, const std::map& column_map, ::hybridse::node::ExprListNode* row, uint32_t* str_length); @@ -332,25 +337,24 @@ class SQLClusterRouter : public SQLRouter { bool ExtractDBTypes(std::shared_ptr schema, std::vector& parameter_types); // NOLINT - ::hybridse::sdk::Status SetVariable(hybridse::node::SetPlanNode* node); ::hybridse::sdk::Status ParseNamesFromArgs(const std::string& db, const std::vector& args, - std::string* db_name, std::string* sp_name); + std::string* db_name, std::string* sp_name); bool CheckAnswerIfInteractive(const std::string& drop_type, const std::string& name); ::openmldb::base::Status SaveResultSet(const std::string& file_path, - const std::shared_ptr& options_map, - ::hybridse::sdk::ResultSet* result_set); + const std::shared_ptr& options_map, + ::hybridse::sdk::ResultSet* result_set); - hybridse::sdk::Status HandleLoadDataInfile(const std::string& database, - const std::string& table, const std::string& file_path, - const std::shared_ptr& options); + hybridse::sdk::Status HandleLoadDataInfile(const std::string& database, const std::string& table, + const std::string& file_path, + const std::shared_ptr& options); - hybridse::sdk::Status InsertOneRow(const std::string& database, - const std::string& insert_placeholder, const std::vector& str_col_idx, - const std::string& null_value, const std::vector& cols); + hybridse::sdk::Status InsertOneRow(const std::string& database, const std::string& insert_placeholder, + const std::vector& str_col_idx, const std::string& null_value, + const std::vector& cols); hybridse::sdk::Status HandleDeploy(const hybridse::node::DeployPlanNode* deploy_node); @@ -372,7 +376,6 @@ class SQLClusterRouter : public SQLRouter { const std::set>& table_pair, const std::string& select_sql); - bool CheckPreAggrTableExist(const std::string& base_table, const std::string& base_db, const openmldb::base::LongWindowInfo& lw, ::hybridse::sdk::Status* status); @@ -396,13 +399,11 @@ class SQLClusterRouter : public SQLRouter { bool is_cluster_mode_; bool interactive_; DBSDK* cluster_sdk_; - std::map>>> input_lru_cache_; + std::map>>> + input_lru_cache_; ::openmldb::base::SpinMutex mu_; ::openmldb::base::Random rand_; }; -} // namespace sdk -} // namespace openmldb +} // namespace openmldb::sdk #endif // SRC_SDK_SQL_CLUSTER_ROUTER_H_ diff --git a/src/sdk/sql_cluster_test.cc b/src/sdk/sql_cluster_test.cc index c781b52e5f1..4eae5f8ba9d 100644 --- a/src/sdk/sql_cluster_test.cc +++ b/src/sdk/sql_cluster_test.cc @@ -14,7 +14,6 @@ * limitations under the License. */ -#include #include #include @@ -22,10 +21,7 @@ #include #include "absl/strings/str_cat.h" -#include "base/file_util.h" -#include "base/glog_wapper.h" #include "codec/fe_row_codec.h" -#include "common/timer.h" #include "gflags/gflags.h" #include "gtest/gtest.h" #include "sdk/mini_cluster.h" @@ -34,10 +30,9 @@ #include "sdk/sql_sdk_test.h" #include "vm/catalog.h" -namespace openmldb { -namespace sdk { +namespace openmldb::sdk { -static void SetOnlineMode(std::shared_ptr router) { +static void SetOnlineMode(const std::shared_ptr& router) { ::hybridse::sdk::Status status; router->ExecuteSQL("SET @@execute_mode='online';", &status); } @@ -47,22 +42,22 @@ std::shared_ptr router_; class SQLClusterTest : public ::testing::Test { public: - SQLClusterTest() {} - ~SQLClusterTest() {} - void SetUp() {} - void TearDown() {} + SQLClusterTest() = default; + ~SQLClusterTest() override = default; + void SetUp() override {} + void TearDown() override {} }; class MockClosure : public ::google::protobuf::Closure { public: - MockClosure() {} - ~MockClosure() {} - void Run() {} + MockClosure() = default; + ~MockClosure() override = default; + void Run() override {} }; class SQLClusterDDLTest : public SQLClusterTest { public: - void SetUp() { + void SetUp() override { SQLRouterOptions sql_opt; sql_opt.zk_cluster = mc_->GetZkCluster(); sql_opt.zk_path = mc_->GetZkPath(); @@ -74,7 +69,7 @@ class SQLClusterDDLTest : public SQLClusterTest { ASSERT_TRUE(router->CreateDB(db, &status)); } - void TearDown() { + void TearDown() override { ::hybridse::sdk::Status status; ASSERT_TRUE(router->DropDB(db, &status)); router.reset(); @@ -992,23 +987,22 @@ TEST_F(SQLClusterTest, ClusterSelect) { ASSERT_TRUE(ok); } -} // namespace sdk -} // namespace openmldb +} // namespace openmldb::sdk int main(int argc, char** argv) { ::hybridse::vm::Engine::InitializeGlobalLLVM(); FLAGS_zk_session_timeout = 100000; - ::openmldb::sdk::MiniCluster mc(6181); + ::openmldb::sdk::MiniCluster mc(2181); ::openmldb::sdk::mc_ = &mc; FLAGS_enable_distsql = true; int ok = ::openmldb::sdk::mc_->SetUp(3); sleep(5); ::testing::InitGoogleTest(&argc, argv); - srand(time(NULL)); + srand(time(nullptr)); ::google::ParseCommandLineFlags(&argc, &argv, true); ::openmldb::sdk::router_ = ::openmldb::sdk::GetNewSQLRouter(); if (nullptr == ::openmldb::sdk::router_) { - LOG(ERROR) << "Fail Test with NULL SQL router"; + LOG(ERROR) << "Test failed with NULL SQL router"; return -1; } ok = RUN_ALL_TESTS(); diff --git a/src/sdk/sql_insert_row.cc b/src/sdk/sql_insert_row.cc index 53f531e06e9..52c62f91041 100644 --- a/src/sdk/sql_insert_row.cc +++ b/src/sdk/sql_insert_row.cc @@ -19,6 +19,7 @@ #include #include +#include #include "glog/logging.h" @@ -27,15 +28,19 @@ namespace sdk { SQLInsertRows::SQLInsertRows(std::shared_ptr<::openmldb::nameserver::TableInfo> table_info, std::shared_ptr schema, DefaultValueMap default_map, - uint32_t default_str_length) - : table_info_(table_info), schema_(schema), default_map_(default_map), default_str_length_(default_str_length) {} + uint32_t default_str_length, const std::vector& hole_idx_arr) + : table_info_(std::move(table_info)), + schema_(std::move(schema)), + default_map_(std::move(default_map)), + default_str_length_(default_str_length), + hole_idx_arr_(hole_idx_arr) {} std::shared_ptr SQLInsertRows::NewRow() { if (!rows_.empty() && !rows_.back()->IsComplete()) { - return std::shared_ptr(); + return {}; } std::shared_ptr row = - std::make_shared(table_info_, schema_, default_map_, default_str_length_); + std::make_shared(table_info_, schema_, default_map_, default_str_length_, hole_idx_arr_); rows_.push_back(row); return row; } @@ -44,8 +49,8 @@ SQLInsertRow::SQLInsertRow(std::shared_ptr<::openmldb::nameserver::TableInfo> ta std::shared_ptr schema, DefaultValueMap default_map, uint32_t default_string_length) : table_info_(table_info), - schema_(schema), - default_map_(default_map), + schema_(std::move(schema)), + default_map_(std::move(default_map)), default_string_length_(default_string_length), rb_(table_info->column_desc()), val_(), @@ -69,11 +74,18 @@ SQLInsertRow::SQLInsertRow(std::shared_ptr<::openmldb::nameserver::TableInfo> ta } } +SQLInsertRow::SQLInsertRow(std::shared_ptr<::openmldb::nameserver::TableInfo> table_info, + std::shared_ptr schema, DefaultValueMap default_map, + uint32_t default_str_length, std::vector hole_idx_arr) + : SQLInsertRow(std::move(table_info), std::move(schema), std::move(default_map), default_str_length) { + hole_idx_arr_ = std::move(hole_idx_arr); +} + bool SQLInsertRow::Init(int str_length) { str_size_ = str_length + default_string_length_; uint32_t row_size = rb_.CalTotalLength(str_size_); val_.resize(row_size); - int8_t* buf = reinterpret_cast(&(val_[0])); + auto* buf = reinterpret_cast(&(val_[0])); bool ok = rb_.SetBuffer(reinterpret_cast(buf), row_size); if (!ok) { return false; @@ -84,8 +96,6 @@ bool SQLInsertRow::Init(int str_length) { void SQLInsertRow::PackDimension(const std::string& val) { raw_dimensions_[rb_.GetAppendPos()] = val; } - - const std::map>>& SQLInsertRow::GetDimensions() { if (!dimensions_.empty()) { return dimensions_; @@ -101,14 +111,14 @@ const std::map>>& SQLInse key += raw_dimensions_[idx]; } if (pid_num > 0) { - pid = (uint32_t)(::openmldb::base::hash64(key) % pid_num); + pid = static_cast(::openmldb::base::hash64(key) % pid_num); } auto iter = dimensions_.find(pid); if (iter == dimensions_.end()) { auto result = dimensions_.emplace(pid, std::vector>()); iter = result.first; } - iter->second.push_back(std::make_pair(key, kv.first)); + iter->second.emplace_back(key, kv.first); } return dimensions_; } @@ -281,7 +291,7 @@ bool SQLInsertRow::AppendNULL() { bool SQLInsertRow::IsComplete() { return rb_.IsComplete(); } -bool SQLInsertRow::Build() { return str_size_ == 0; } +bool SQLInsertRow::Build() const { return str_size_ == 0; } } // namespace sdk } // namespace openmldb diff --git a/src/sdk/sql_insert_row.h b/src/sdk/sql_insert_row.h index 35768f4fad9..6f7c8fb1329 100644 --- a/src/sdk/sql_insert_row.h +++ b/src/sdk/sql_insert_row.h @@ -31,8 +31,7 @@ #include "proto/name_server.pb.h" #include "sdk/base.h" -namespace openmldb { -namespace sdk { +namespace openmldb::sdk { typedef std::shared_ptr>> DefaultValueMap; @@ -62,9 +61,12 @@ static inline ::hybridse::sdk::DataType ConvertType(::openmldb::type::DataType t class SQLInsertRow { public: - explicit SQLInsertRow(std::shared_ptr<::openmldb::nameserver::TableInfo> table_info, - std::shared_ptr schema, DefaultValueMap default_map, - uint32_t default_str_length); + SQLInsertRow(std::shared_ptr<::openmldb::nameserver::TableInfo> table_info, + std::shared_ptr schema, DefaultValueMap default_map, + uint32_t default_str_length); + SQLInsertRow(std::shared_ptr<::openmldb::nameserver::TableInfo> table_info, + std::shared_ptr schema, DefaultValueMap default_map, + uint32_t default_str_length, std::vector hole_idx_arr); ~SQLInsertRow() = default; bool Init(int str_length); bool AppendBool(bool val); @@ -77,27 +79,51 @@ class SQLInsertRow { bool AppendString(const std::string& val); bool AppendDate(uint32_t year, uint32_t month, uint32_t day); bool AppendDate(int32_t date); + + // If you don't have the value of the column, append null, this method will help you to set: + // 1. if the column can be null, set null + // 2. if not, it will set to default value(from DefaultValueMap) bool AppendNULL(); bool IsComplete(); - bool Build(); + bool Build() const; const std::map>>& GetDimensions(); inline const std::string& GetRow() { return val_; } inline const std::shared_ptr GetSchema() { return schema_; } - const std::vector GetHoleIdx() { - std::vector result; - for (uint32_t i = 0; i < (int64_t)schema_->GetColumnCnt(); ++i) { - if (default_map_->count(i) == 0) { - result.push_back(i); + std::vector GetHoleIdx() { return hole_idx_arr_; } + + bool AppendString(const char* string_buffer_var_name, uint32_t length); + + // If the insert sql has columns names, the hole idx array is consistent with insert sql, not the table schema. + // e.g. table t1(c1,c2,c3) + // insert into t1 (c3,c2,c1) values(?,2,?); + // the hole idx array is {2,0} + // + // If empty(insert stmt has no column names, e.g. insert into t1 values(?,?,?)), the array should be in schema + // order. + static std::vector GetHoleIdxArr(const DefaultValueMap& default_map, + const std::vector& stmt_column_idx_in_table, + const std::shared_ptr<::hybridse::sdk::Schema>& schema) { + std::vector hole_idx_arr; + if (!stmt_column_idx_in_table.empty()) { + // hold idx arr should in stmt column order + for (auto idx : stmt_column_idx_in_table) { + // no default value means a hole, needs to set value + if (default_map->find(idx) == default_map->end()) { + hole_idx_arr.emplace_back(idx); + } + } + } else { + for (int i = 0; i < schema->GetColumnCnt(); ++i) { + if (default_map->find(i) == default_map->end()) { + hole_idx_arr.push_back(i); + } } } - return result; + return hole_idx_arr; } - bool AppendString(const char* string_buffer_var_name, uint32_t length); - private: - bool DateToString(uint32_t year, uint32_t month, uint32_t day, std::string* date); bool MakeDefault(); void PackDimension(const std::string& val); inline bool IsDimension() { return raw_dimensions_.find(rb_.GetAppendPos()) != raw_dimensions_.end(); } @@ -107,6 +133,8 @@ class SQLInsertRow { std::shared_ptr schema_; DefaultValueMap default_map_; uint32_t default_string_length_; + std::vector hole_idx_arr_; + std::map> index_map_; std::set ts_set_; std::map raw_dimensions_; @@ -119,7 +147,8 @@ class SQLInsertRow { class SQLInsertRows { public: SQLInsertRows(std::shared_ptr<::openmldb::nameserver::TableInfo> table_info, - std::shared_ptr schema, DefaultValueMap default_map, uint32_t str_size); + std::shared_ptr schema, DefaultValueMap default_map, uint32_t str_size, + const std::vector& hole_idx_arr); ~SQLInsertRows() = default; std::shared_ptr NewRow(); inline uint32_t GetCnt() { return rows_.size(); } @@ -130,24 +159,17 @@ class SQLInsertRows { return rows_[i]; } inline const std::shared_ptr GetSchema() { return schema_; } - const std::vector GetHoleIdx() { - std::vector result; - for (uint32_t i = 0; i < (int64_t)schema_->GetColumnCnt(); ++i) { - if (default_map_->count(i) == 0) { - result.push_back(i); - } - } - return result; - } + std::vector GetHoleIdx() { return hole_idx_arr_; } private: std::shared_ptr<::openmldb::nameserver::TableInfo> table_info_; std::shared_ptr schema_; DefaultValueMap default_map_; uint32_t default_str_length_; + std::vector hole_idx_arr_; + std::vector> rows_; }; -} // namespace sdk -} // namespace openmldb +} // namespace openmldb::sdk #endif // SRC_SDK_SQL_INSERT_ROW_H_ diff --git a/src/sdk/sql_router_test.cc b/src/sdk/sql_router_test.cc index a55c21725b8..dab176bd540 100644 --- a/src/sdk/sql_router_test.cc +++ b/src/sdk/sql_router_test.cc @@ -16,7 +16,7 @@ #include "sdk/sql_router.h" -#include +#include #include #include @@ -24,18 +24,14 @@ #include #include -#include "base/file_util.h" -#include "base/glog_wapper.h" #include "case/sql_case.h" #include "codec/fe_row_codec.h" -#include "common/timer.h" #include "gflags/gflags.h" #include "gtest/gtest.h" #include "sdk/mini_cluster.h" #include "vm/catalog.h" -namespace openmldb { -namespace sdk { +namespace openmldb::sdk { typedef ::google::protobuf::RepeatedPtrField<::openmldb::common::ColumnDesc> PBSchema; typedef ::google::protobuf::RepeatedPtrField<::openmldb::common::ColumnKey> RtiDBIndex; @@ -48,13 +44,54 @@ inline std::string GenRand() { class SQLRouterTest : public ::testing::Test { public: - SQLRouterTest() {} - ~SQLRouterTest() {} - void SetUp() {} - void TearDown() {} + SQLRouterTest() = default; + ~SQLRouterTest() override = default; + void SetUp() override { + SQLRouterOptions sql_opt; + sql_opt.zk_cluster = mc_->GetZkCluster(); + sql_opt.zk_path = mc_->GetZkPath(); + router_ = NewClusterSQLRouter(sql_opt); + ASSERT_TRUE(router_); + ::hybridse::sdk::Status status; + router_->ExecuteSQL("SET @@execute_mode='online';", &status); + ASSERT_TRUE(status.IsOK()); + } + void TearDown() override {} + + template + void CheckNextRow(std::shared_ptr rs, A get_col1, B get_col2, C v1, D v2) { + ASSERT_TRUE(rs->Next()); + ASSERT_EQ(get_col1(rs), v1) << "expect " << v1 << ", " << v2; + ASSERT_EQ(get_col2(rs), v2); + } + + template + void CheckCurRow(std::shared_ptr rs, A get_col1, B get_col2, C v1, D v2) { + ASSERT_EQ(get_col1(rs), v1) << "expect " << v1 << ", " << v2; + ASSERT_EQ(get_col2(rs), v2); + } + + static void Step(const std::shared_ptr& rs, int step) { + for (int i = 0; i < step; ++i) { + ASSERT_TRUE(rs->Next()); + } + } + template + static void PrintRsAndReset(const std::shared_ptr& rs, A get_col1, B get_col2) { + std::stringstream ss("result set:\n"); + while (rs->Next()) { + ss << get_col1(rs) << " " << get_col2(rs) << "\n"; + } + ASSERT_TRUE(rs->Reset()); + LOG(INFO) << ss.str(); + } + + protected: + std::string db_ = "sql_router_test"; + std::shared_ptr router_; }; -TEST_F(SQLRouterTest, bad_zk) { +TEST_F(SQLRouterTest, badZk) { SQLRouterOptions sql_opt; sql_opt.zk_cluster = "127.0.0.1:1111"; sql_opt.zk_path = "/path"; @@ -64,55 +101,41 @@ TEST_F(SQLRouterTest, bad_zk) { } TEST_F(SQLRouterTest, db_name_test) { - SQLRouterOptions sql_opt; - sql_opt.zk_cluster = mc_->GetZkCluster(); - sql_opt.zk_path = mc_->GetZkPath(); - auto router = NewClusterSQLRouter(sql_opt); - ASSERT_TRUE(router != nullptr); + ASSERT_TRUE(router_ != nullptr); ::hybridse::sdk::Status status; - ASSERT_FALSE(router->CreateDB("", &status)); + ASSERT_FALSE(router_->CreateDB("", &status)); std::string db = "123456"; - ASSERT_TRUE(router->CreateDB(db, &status)) << db << ": " << status.msg; - ASSERT_TRUE(router->DropDB(db, &status)) << db << ": " << status.msg; + ASSERT_TRUE(router_->CreateDB(db, &status)) << db << ": " << status.msg; + ASSERT_TRUE(router_->DropDB(db, &status)) << db << ": " << status.msg; // SQL_IDENTIFIER: [^`/\\.\n]+, so use '/' in name is not allowed db = "1/2"; - ASSERT_FALSE(router->CreateDB(db, &status) && status.code == -2) << db << ": " << status.msg; - ASSERT_FALSE(router->DropDB(db, &status) && status.code == -2) << db << ": " << status.msg; + ASSERT_FALSE(router_->CreateDB(db, &status) && status.code == -2) << db << ": " << status.msg; + ASSERT_FALSE(router_->DropDB(db, &status) && status.code == -2) << db << ": " << status.msg; } TEST_F(SQLRouterTest, db_api_test) { - SQLRouterOptions sql_opt; - sql_opt.zk_cluster = mc_->GetZkCluster(); - sql_opt.zk_path = mc_->GetZkPath(); - auto router = NewClusterSQLRouter(sql_opt); - ASSERT_TRUE(router != nullptr); std::string name = "test" + GenRand(); std::string db = "db" + GenRand(); ::hybridse::sdk::Status status; std::vector dbs; - ASSERT_TRUE(router->ShowDB(&dbs, &status)); + ASSERT_TRUE(router_->ShowDB(&dbs, &status)); uint32_t origin = dbs.size(); - bool ok = router->CreateDB(db, &status); + bool ok = router_->CreateDB(db, &status); ASSERT_TRUE(ok); dbs.clear(); - ASSERT_TRUE(router->ShowDB(&dbs, &status)); + ASSERT_TRUE(router_->ShowDB(&dbs, &status)); ASSERT_EQ(1u, dbs.size() - origin); ASSERT_EQ(db, dbs[0]); - ok = router->DropDB(db, &status); + ok = router_->DropDB(db, &status); ASSERT_TRUE(ok); } TEST_F(SQLRouterTest, create_and_drop_table_test) { - SQLRouterOptions sql_opt; - sql_opt.zk_cluster = mc_->GetZkCluster(); - sql_opt.zk_path = mc_->GetZkPath(); - auto router = NewClusterSQLRouter(sql_opt); - ASSERT_TRUE(router != nullptr); std::string name = "test" + GenRand(); std::string db = "db" + GenRand(); ::hybridse::sdk::Status status; - bool ok = router->CreateDB(db, &status); + bool ok = router_->CreateDB(db, &status); ASSERT_TRUE(ok); std::string ddl = "create table " + name + "(" @@ -120,33 +143,33 @@ TEST_F(SQLRouterTest, create_and_drop_table_test) { "index(key=col1, ts=col2));"; std::string insert = "insert into " + name + " values('hello', 1590);"; std::string select = "select * from " + name + ";"; - ok = router->ExecuteDDL(db, ddl, &status); + ok = router_->ExecuteDDL(db, ddl, &status); ASSERT_TRUE(ok); - ASSERT_TRUE(router->RefreshCatalog()); + ASSERT_TRUE(router_->RefreshCatalog()); - ok = router->ExecuteInsert(db, insert, &status); + ok = router_->ExecuteInsert(db, insert, &status); - auto rs = router->ExecuteSQL(db, select, &status); + auto rs = router_->ExecuteSQL(db, select, &status); ASSERT_TRUE(rs != nullptr); ASSERT_EQ(1, rs->Size()); - ok = router->ExecuteDDL(db, "drop table " + name + ";", &status); + ok = router_->ExecuteDDL(db, "drop table " + name + ";", &status); ASSERT_TRUE(ok); // test stmt with db name prefix ddl = "create table " + db + "." + name + "(col1 string, col2 bigint, index(key=col1, ts=col2));"; insert = "insert into " + db + "." + name + " values('hello', 1590);"; select = "select * from " + db + "." + name + ";"; - ok = router->ExecuteDDL("", ddl, &status); + ok = router_->ExecuteDDL("", ddl, &status); ASSERT_TRUE(ok); - ASSERT_TRUE(router->RefreshCatalog()); + ASSERT_TRUE(router_->RefreshCatalog()); - ok = router->ExecuteInsert("", insert, &status); + ok = router_->ExecuteInsert("", insert, &status); - rs = router->ExecuteSQL(db, select, &status); + rs = router_->ExecuteSQL(db, select, &status); ASSERT_TRUE(rs != nullptr); ASSERT_EQ(1, rs->Size()); - ok = router->ExecuteDDL("", "drop table " + db + "." + name + ";", &status); + ok = router_->ExecuteDDL("", "drop table " + db + "." + name + ";", &status); ASSERT_TRUE(ok); std::string ddl_fake = "create table " + name + @@ -154,91 +177,121 @@ TEST_F(SQLRouterTest, create_and_drop_table_test) { "col1 int, col2 bigint," "index(key=col1, ts=col2));"; - ok = router->ExecuteDDL(db, ddl_fake, &status); + ok = router_->ExecuteDDL(db, ddl_fake, &status); ASSERT_TRUE(ok); - ASSERT_TRUE(router->RefreshCatalog()); + ASSERT_TRUE(router_->RefreshCatalog()); - rs = router->ExecuteSQL(db, select, &status); + rs = router_->ExecuteSQL(db, select, &status); ASSERT_TRUE(rs != nullptr); ASSERT_EQ(0, rs->Size()); // db still has table, drop fail - ok = router->DropDB(db, &status); + ok = router_->DropDB(db, &status); ASSERT_FALSE(ok); - ok = router->ExecuteDDL(db, "drop table " + name + ";", &status); + ok = router_->ExecuteDDL(db, "drop table " + name + ";", &status); ASSERT_TRUE(ok); - ok = router->DropDB(db, &status); + ok = router_->DropDB(db, &status); ASSERT_TRUE(ok); } -TEST_F(SQLRouterTest, test_sql_insert_placeholder) { - SQLRouterOptions sql_opt; - sql_opt.zk_cluster = mc_->GetZkCluster(); - sql_opt.zk_path = mc_->GetZkPath(); - auto router = NewClusterSQLRouter(sql_opt); - ASSERT_TRUE(router != nullptr); +TEST_F(SQLRouterTest, testSqlInsertPlaceholder) { std::string name = "test" + GenRand(); std::string db = "db" + GenRand(); ::hybridse::sdk::Status status; - bool ok = router->CreateDB(db, &status); + bool ok = router_->CreateDB(db, &status); ASSERT_TRUE(ok); std::string ddl = "create table " + name + "(" "col1 string, col2 bigint," "index(key=col1, ts=col2));"; - ok = router->ExecuteDDL(db, ddl, &status); + ok = router_->ExecuteDDL(db, ddl, &status); ASSERT_TRUE(ok); - ASSERT_TRUE(router->RefreshCatalog()); + ASSERT_TRUE(router_->RefreshCatalog()); std::string insert = "insert into " + name + " values('hello', 1590);"; std::string insert_placeholder1 = "insert into " + name + " values(?, ?);"; std::string insert_placeholder2 = "insert into " + name + " values(?, 1592);"; std::string insert_placeholder3 = "insert into " + name + " values('hi', ?);"; - ok = router->ExecuteInsert(db, insert, &status); + ok = router_->ExecuteInsert(db, insert, &status); ASSERT_TRUE(ok); - std::shared_ptr insert_row1 = router->GetInsertRow(db, insert_placeholder1, &status); + std::string sql_select = "select col1, col2 from " + name + ";"; + auto rs = router_->ExecuteSQL(db, sql_select, &status); + ASSERT_EQ(rs->Size(), 1); + + auto get_col1 = [](const std::shared_ptr& rs) { return rs->GetStringUnsafe(0); }; + auto get_col2 = [](const std::shared_ptr& rs) { return rs->GetInt64Unsafe(1); }; + + CheckNextRow(rs, get_col1, get_col2, "hello", 1590); + + std::shared_ptr insert_row1 = router_->GetInsertRow(db, insert_placeholder1, &status); ASSERT_EQ(status.code, 0); ASSERT_TRUE(insert_row1->Init(5)); ASSERT_TRUE(insert_row1->AppendString("world")); ASSERT_TRUE(insert_row1->AppendInt64(1591)); ASSERT_TRUE(insert_row1->Build()); - ok = router->ExecuteInsert(db, insert_placeholder1, insert_row1, &status); + ok = router_->ExecuteInsert(db, insert_placeholder1, insert_row1, &status); ASSERT_TRUE(ok); + rs = router_->ExecuteSQL(db, sql_select, &status); + ASSERT_EQ(rs->Size(), 2); + // world, 1591 - 1st row + // hello, 1590 + CheckNextRow(rs, get_col1, get_col2, "world", 1591); + CheckNextRow(rs, get_col1, get_col2, "hello", 1590); + { - std::shared_ptr insert_row2 = router->GetInsertRow(db, insert_placeholder2, &status); + std::shared_ptr insert_row2 = router_->GetInsertRow(db, insert_placeholder2, &status); ASSERT_EQ(status.code, 0); ASSERT_TRUE(insert_row2->Init(4)); ASSERT_TRUE(insert_row2->AppendString("wrd")); ASSERT_FALSE(insert_row2->Build()); } { - std::shared_ptr insert_row2 = router->GetInsertRow(db, insert_placeholder2, &status); + std::shared_ptr insert_row2 = router_->GetInsertRow(db, insert_placeholder2, &status); ASSERT_EQ(status.code, 0); ASSERT_TRUE(insert_row2->Init(4)); ASSERT_FALSE(insert_row2->AppendString("wordd")); ASSERT_FALSE(insert_row2->Build()); } - std::shared_ptr insert_row2 = router->GetInsertRow(db, insert_placeholder2, &status); + // ?, 1592 + std::shared_ptr insert_row2 = router_->GetInsertRow(db, insert_placeholder2, &status); ASSERT_EQ(status.code, 0); ASSERT_TRUE(insert_row2->Init(4)); ASSERT_TRUE(insert_row2->AppendString("word")); ASSERT_TRUE(insert_row2->Build()); - ok = router->ExecuteInsert(db, insert_placeholder2, insert_row2, &status); - ASSERT_TRUE(ok); - - std::shared_ptr insert_row3 = router->GetInsertRow(db, insert_placeholder3, &status); + ok = router_->ExecuteInsert(db, insert_placeholder2, insert_row2, &status); + ASSERT_TRUE(ok); + rs = router_->ExecuteSQL(db, sql_select, &status); + ASSERT_EQ(rs->Size(), 3); + // world, 1591 - 1st row + // word, 1592 - 2nd row + // hello, 1590 + // check 2nd row + Step(rs, 2); + CheckCurRow(rs, get_col1, get_col2, "word", 1592); + + // hi, ? + std::shared_ptr insert_row3 = router_->GetInsertRow(db, insert_placeholder3, &status); ASSERT_EQ(status.code, 0); ASSERT_TRUE(insert_row3->Init(0)); ASSERT_TRUE(insert_row3->AppendInt64(1593)); ASSERT_TRUE(insert_row3->Build()); - ok = router->ExecuteInsert(db, insert_placeholder3, insert_row3, &status); - ASSERT_TRUE(ok); - - std::shared_ptr insert_rows1 = router->GetInsertRows(db, insert_placeholder1, &status); + ok = router_->ExecuteInsert(db, insert_placeholder3, insert_row3, &status); + ASSERT_TRUE(ok); + rs = router_->ExecuteSQL(db, sql_select, &status); + ASSERT_EQ(rs->Size(), 4); + // world, 1591 - 1st row + // word, 1592 - 2nd row + // hello, 1590 + // hi, 1593 - 4th row + Step(rs, 4); + CheckCurRow(rs, get_col1, get_col2, "hi", 1593); + + // ?,?, insert 2 rows + std::shared_ptr insert_rows1 = router_->GetInsertRows(db, insert_placeholder1, &status); ASSERT_EQ(status.code, 0); std::shared_ptr insert_rows1_1 = insert_rows1->NewRow(); ASSERT_TRUE(insert_rows1_1->Init(2)); @@ -250,10 +303,24 @@ TEST_F(SQLRouterTest, test_sql_insert_placeholder) { ASSERT_TRUE(insert_rows1_2->AppendString("12")); ASSERT_TRUE(insert_rows1_2->AppendInt64(1595)); ASSERT_TRUE(insert_rows1_2->Build()); - ok = router->ExecuteInsert(db, insert_placeholder1, insert_rows1, &status); - ASSERT_TRUE(ok); - - std::shared_ptr insert_rows2 = router->GetInsertRows(db, insert_placeholder2, &status); + ok = router_->ExecuteInsert(db, insert_placeholder1, insert_rows1, &status); + ASSERT_TRUE(ok); + rs = router_->ExecuteSQL(db, sql_select, &status); + ASSERT_EQ(rs->Size(), 6); + PrintRsAndReset(rs, get_col1, get_col2); + // world, 1591 - 1st row + // word 1592 + // hello 1590 + // 11 1594 + // hi 1593 + // 12 1595 + Step(rs, 4); + CheckCurRow(rs, get_col1, get_col2, "11", 1594); + Step(rs, 2); + CheckCurRow(rs, get_col1, get_col2, "12", 1595); + + // ?, 1592 - insert 2 rows + std::shared_ptr insert_rows2 = router_->GetInsertRows(db, insert_placeholder2, &status); ASSERT_EQ(status.code, 0); std::shared_ptr insert_rows2_1 = insert_rows2->NewRow(); ASSERT_TRUE(insert_rows2_1->Init(2)); @@ -263,10 +330,18 @@ TEST_F(SQLRouterTest, test_sql_insert_placeholder) { ASSERT_TRUE(insert_rows2_2->Init(2)); ASSERT_TRUE(insert_rows2_2->AppendString("22")); ASSERT_TRUE(insert_rows2_2->Build()); - ok = router->ExecuteInsert(db, insert_placeholder2, insert_rows2, &status); - ASSERT_TRUE(ok); - - std::shared_ptr insert_rows3 = router->GetInsertRows(db, insert_placeholder3, &status); + ok = router_->ExecuteInsert(db, insert_placeholder2, insert_rows2, &status); + ASSERT_TRUE(ok); + rs = router_->ExecuteSQL(db, sql_select, &status); + ASSERT_EQ(rs->Size(), 8); + PrintRsAndReset(rs, get_col1, get_col2); + Step(rs, 5); + CheckCurRow(rs, get_col1, get_col2, "22", 1592); + Step(rs, 2); + CheckCurRow(rs, get_col1, get_col2, "21", 1592); + + // hi, ? + std::shared_ptr insert_rows3 = router_->GetInsertRows(db, insert_placeholder3, &status); ASSERT_EQ(status.code, 0); std::shared_ptr insert_rows3_1 = insert_rows3->NewRow(); ASSERT_TRUE(insert_rows3_1->Init(0)); @@ -276,103 +351,69 @@ TEST_F(SQLRouterTest, test_sql_insert_placeholder) { ASSERT_TRUE(insert_rows3_2->Init(0)); ASSERT_TRUE(insert_rows3_2->AppendInt64(1597)); ASSERT_TRUE(insert_rows3_2->Build()); - ok = router->ExecuteInsert(db, insert_placeholder3, insert_rows3, &status); + ok = router_->ExecuteInsert(db, insert_placeholder3, insert_rows3, &status); ASSERT_TRUE(ok); - - ASSERT_TRUE(router->RefreshCatalog()); - std::string sql_select = "select col1, col2 from " + name + ";"; - auto rs = router->ExecuteSQL(db, sql_select, &status); - ASSERT_TRUE(rs != nullptr); + ASSERT_TRUE(router_->RefreshCatalog()); + rs = router_->ExecuteSQL(db, sql_select, &status); + ASSERT_TRUE(rs); + PrintRsAndReset(rs, get_col1, get_col2); ASSERT_EQ(10, rs->Size()); - ASSERT_TRUE(rs->Next()); - ASSERT_EQ("hello", rs->GetStringUnsafe(0)); - ASSERT_EQ(1590, rs->GetInt64Unsafe(1)); - ASSERT_TRUE(rs->Next()); - ASSERT_EQ("world", rs->GetStringUnsafe(0)); - ASSERT_EQ(1591, rs->GetInt64Unsafe(1)); - ASSERT_TRUE(rs->Next()); - ASSERT_EQ("22", rs->GetStringUnsafe(0)); - ASSERT_EQ(1592, rs->GetInt64Unsafe(1)); - ASSERT_TRUE(rs->Next()); - ASSERT_EQ("11", rs->GetStringUnsafe(0)); - ASSERT_EQ(1594, rs->GetInt64Unsafe(1)); - ASSERT_TRUE(rs->Next()); - ASSERT_EQ("hi", rs->GetStringUnsafe(0)); - ASSERT_EQ(1597, rs->GetInt64Unsafe(1)); - ASSERT_TRUE(rs->Next()); - ASSERT_EQ("hi", rs->GetStringUnsafe(0)); - ASSERT_EQ(1596, rs->GetInt64Unsafe(1)); - ASSERT_TRUE(rs->Next()); - ASSERT_EQ("hi", rs->GetStringUnsafe(0)); - ASSERT_EQ(1593, rs->GetInt64Unsafe(1)); - ASSERT_TRUE(rs->Next()); - ASSERT_EQ("12", rs->GetStringUnsafe(0)); - ASSERT_EQ(1595, rs->GetInt64Unsafe(1)); - ASSERT_TRUE(rs->Next()); - ASSERT_EQ("21", rs->GetStringUnsafe(0)); - ASSERT_EQ(1592, rs->GetInt64Unsafe(1)); - ASSERT_TRUE(rs->Next()); - ASSERT_EQ("word", rs->GetStringUnsafe(0)); - ASSERT_EQ(1592, rs->GetInt64Unsafe(1)); - ASSERT_FALSE(rs->Next()); + Step(rs, 6); + CheckCurRow(rs, get_col1, get_col2, "hi", 1597); + CheckNextRow(rs, get_col1, get_col2, "hi", 1596); - ok = router->ExecuteDDL(db, "drop table " + name + ";", &status); + ok = router_->ExecuteDDL(db, "drop table " + name + ";", &status); ASSERT_TRUE(ok); - ok = router->DropDB(db, &status); + ok = router_->DropDB(db, &status); ASSERT_TRUE(ok); } TEST_F(SQLRouterTest, test_sql_insert_with_column_list) { - SQLRouterOptions sql_opt; - sql_opt.zk_cluster = mc_->GetZkCluster(); - sql_opt.zk_path = mc_->GetZkPath(); - auto router = NewClusterSQLRouter(sql_opt); - ASSERT_TRUE(router != nullptr); std::string name = "test" + GenRand(); std::string db = "db" + GenRand(); ::hybridse::sdk::Status status; - bool ok = router->CreateDB(db, &status); + bool ok = router_->CreateDB(db, &status); ASSERT_TRUE(ok); std::string ddl = "create table " + name + "(" "col1 int, col2 int, col3 string NOT NULL, col4 " "bigint NOT NULL, index(key=col3, ts=col4));"; - ok = router->ExecuteDDL(db, ddl, &status); + ok = router_->ExecuteDDL(db, ddl, &status); ASSERT_TRUE(ok); - ASSERT_TRUE(router->RefreshCatalog()); + ASSERT_TRUE(router_->RefreshCatalog()); // normal insert std::string insert1 = "insert into " + name + "(col3, col4) values('hello', 1000);"; status = ::hybridse::sdk::Status(); - ok = router->ExecuteInsert(db, insert1, &status); + ok = router_->ExecuteInsert(db, insert1, &status); ASSERT_TRUE(ok); // col3 shouldn't be null std::string insert2 = "insert into " + name + "(col4) values(1000);"; status = ::hybridse::sdk::Status(); - ok = router->ExecuteInsert(db, insert2, &status); + ok = router_->ExecuteInsert(db, insert2, &status); ASSERT_FALSE(ok); // col5 not exist std::string insert3 = "insert into " + name + "(col5) values(1000);"; status = ::hybridse::sdk::Status(); - ok = router->ExecuteInsert(db, insert3, &status); + ok = router_->ExecuteInsert(db, insert3, &status); ASSERT_FALSE(ok); // duplicate col4 std::string insert4 = "insert into " + name + "(col4, col4) values(1000, 1000);"; status = ::hybridse::sdk::Status(); - ok = router->ExecuteInsert(db, insert4, &status); + ok = router_->ExecuteInsert(db, insert4, &status); ASSERT_FALSE(ok); // normal placeholder insert std::string insert5 = "insert into " + name + "(col2, col3, col4) values(?, 'hello', ?);"; - std::shared_ptr r5 = router->GetInsertRow(db, insert5, &status); + std::shared_ptr r5 = router_->GetInsertRow(db, insert5, &status); ASSERT_TRUE(r5->Init(0)); ASSERT_TRUE(r5->AppendInt32(123)); ASSERT_TRUE(r5->AppendInt64(1001)); status = ::hybridse::sdk::Status(); - ok = router->ExecuteInsert(db, insert5, r5, &status); + ok = router_->ExecuteInsert(db, insert5, r5, &status); ASSERT_TRUE(ok); // todo: if placeholders are out of order. eg: insert into [table] (col4, @@ -380,7 +421,7 @@ TEST_F(SQLRouterTest, test_sql_insert_with_column_list) { std::string select = "select * from " + name + ";"; status = ::hybridse::sdk::Status(); - auto rs = router->ExecuteSQL(db, select, &status); + auto rs = router_->ExecuteSQL(db, select, &status); ASSERT_FALSE(rs == nullptr) << status.msg << "\n" << status.trace; ASSERT_EQ(2, rs->Size()); @@ -398,42 +439,37 @@ TEST_F(SQLRouterTest, test_sql_insert_with_column_list) { ASSERT_FALSE(rs->Next()); - ok = router->ExecuteDDL(db, "drop table " + name + ";", &status); + ok = router_->ExecuteDDL(db, "drop table " + name + ";", &status); ASSERT_TRUE(ok); - ok = router->DropDB(db, &status); + ok = router_->DropDB(db, &status); ASSERT_TRUE(ok); } -TEST_F(SQLRouterTest, test_sql_insert_placeholder_with_date_column_key) { - SQLRouterOptions sql_opt; - sql_opt.zk_cluster = mc_->GetZkCluster(); - sql_opt.zk_path = mc_->GetZkPath(); - auto router = NewClusterSQLRouter(sql_opt); - ASSERT_TRUE(router != nullptr); +TEST_F(SQLRouterTest, testSqlInsertPlaceholderWithDateColumnKey) { std::string name = "test" + GenRand(); std::string db = "db" + GenRand(); ::hybridse::sdk::Status status; - bool ok = router->CreateDB(db, &status); + bool ok = router_->CreateDB(db, &status); ASSERT_TRUE(ok); std::string ddl = "create table " + name + "(" "col1 int, col2 date NOT NULL, col3 " "bigint NOT NULL, index(key=col2, ts=col3));"; - ok = router->ExecuteDDL(db, ddl, &status); + ok = router_->ExecuteDDL(db, ddl, &status); ASSERT_TRUE(ok); - ASSERT_TRUE(router->RefreshCatalog()); + ASSERT_TRUE(router_->RefreshCatalog()); std::string insert1 = "insert into " + name + " values(?, ?, ?);"; - std::shared_ptr r1 = router->GetInsertRow(db, insert1, &status); + std::shared_ptr r1 = router_->GetInsertRow(db, insert1, &status); ASSERT_FALSE(r1 == nullptr); ASSERT_TRUE(r1->Init(0)); ASSERT_TRUE(r1->AppendInt32(123)); ASSERT_TRUE(r1->AppendDate(2020, 7, 22)); ASSERT_TRUE(r1->AppendInt64(1000)); - ok = router->ExecuteInsert(db, insert1, r1, &status); + ok = router_->ExecuteInsert(db, insert1, r1, &status); ASSERT_TRUE(ok); std::string select = "select * from " + name + ";"; - auto rs = router->ExecuteSQL(db, select, &status); + auto rs = router_->ExecuteSQL(db, select, &status); ASSERT_FALSE(rs == nullptr); ASSERT_EQ(1, rs->Size()); int32_t year; @@ -450,72 +486,67 @@ TEST_F(SQLRouterTest, test_sql_insert_placeholder_with_date_column_key) { ASSERT_FALSE(rs->Next()); - ok = router->ExecuteDDL(db, "drop table " + name + ";", &status); + ok = router_->ExecuteDDL(db, "drop table " + name + ";", &status); ASSERT_TRUE(ok); - ok = router->DropDB(db, &status); + ok = router_->DropDB(db, &status); ASSERT_TRUE(ok); } -TEST_F(SQLRouterTest, test_sql_insert_placeholder_with_column_key_1) { - SQLRouterOptions sql_opt; - sql_opt.zk_cluster = mc_->GetZkCluster(); - sql_opt.zk_path = mc_->GetZkPath(); - auto router = NewClusterSQLRouter(sql_opt); - ASSERT_TRUE(router != nullptr); +TEST_F(SQLRouterTest, testSqlInsertPlaceholderWithColumnKey1) { std::string name = "test" + GenRand(); std::string db = "db" + GenRand(); ::hybridse::sdk::Status status; - bool ok = router->CreateDB(db, &status); + bool ok = router_->CreateDB(db, &status); ASSERT_TRUE(ok); std::string ddl = "create table " + name + "(" "col1 int, col2 int NOT NULL, col3 string NOT NULL, col4 " "bigint NOT NULL, index(key=(col2, col3), ts=col4));"; - ok = router->ExecuteDDL(db, ddl, &status); + ok = router_->ExecuteDDL(db, ddl, &status); ASSERT_TRUE(ok); - ASSERT_TRUE(router->RefreshCatalog()); + ASSERT_TRUE(router_->RefreshCatalog()); std::string insert1 = "insert into " + name + " values(?, ?, ?, ?);"; - std::shared_ptr r1 = router->GetInsertRow(db, insert1, &status); + std::shared_ptr r1 = router_->GetInsertRow(db, insert1, &status); ASSERT_FALSE(r1 == nullptr); ASSERT_TRUE(r1->Init(5)); ASSERT_TRUE(r1->AppendInt32(123)); ASSERT_TRUE(r1->AppendInt32(321)); ASSERT_TRUE(r1->AppendString("hello")); ASSERT_TRUE(r1->AppendInt64(1000)); - ok = router->ExecuteInsert(db, insert1, r1, &status); + ok = router_->ExecuteInsert(db, insert1, r1, &status); ASSERT_TRUE(ok); std::string insert2 = "insert into " + name + " values(?, ?, 'hello', ?);"; - std::shared_ptr r2 = router->GetInsertRow(db, insert2, &status); + std::shared_ptr r2 = router_->GetInsertRow(db, insert2, &status); ASSERT_FALSE(r2 == nullptr); ASSERT_TRUE(r2->Init(0)); ASSERT_TRUE(r2->AppendInt32(456)); ASSERT_TRUE(r2->AppendInt32(654)); ASSERT_TRUE(r2->AppendInt64(1001)); - ok = router->ExecuteInsert(db, insert2, r2, &status); + ok = router_->ExecuteInsert(db, insert2, r2, &status); ASSERT_TRUE(ok); std::string insert3 = "insert into " + name + " values(?, 987, ?, ?);"; - std::shared_ptr r3 = router->GetInsertRow(db, insert3, &status); + std::shared_ptr r3 = router_->GetInsertRow(db, insert3, &status); ASSERT_FALSE(r3 == nullptr); ASSERT_TRUE(r3->Init(5)); ASSERT_TRUE(r3->AppendInt32(789)); ASSERT_TRUE(r3->AppendString("hello")); ASSERT_TRUE(r3->AppendInt64(1002)); - ok = router->ExecuteInsert(db, insert3, r3, &status); + ok = router_->ExecuteInsert(db, insert3, r3, &status); ASSERT_TRUE(ok); std::string insert4 = "insert into " + name + " values(?, 0,'hello', ?);"; - std::shared_ptr r4 = router->GetInsertRow(db, insert4, &status); + std::shared_ptr r4 = router_->GetInsertRow(db, insert4, &status); ASSERT_FALSE(r4 == nullptr); ASSERT_TRUE(r4->Init(0)); ASSERT_TRUE(r4->AppendInt32(1)); ASSERT_TRUE(r4->AppendInt64(1003)); - ok = router->ExecuteInsert(db, insert4, r4, &status); + ok = router_->ExecuteInsert(db, insert4, r4, &status); ASSERT_TRUE(ok); std::string select = "select * from " + name + ";"; - auto rs = router->ExecuteSQL(db, select, &status); + auto rs = router_->ExecuteSQL(db, select, &status); ASSERT_FALSE(rs == nullptr); ASSERT_EQ(4, rs->Size()); @@ -525,6 +556,12 @@ TEST_F(SQLRouterTest, test_sql_insert_placeholder_with_column_key_1) { ASSERT_EQ("hello", rs->GetStringUnsafe(2)); ASSERT_EQ(rs->GetInt64Unsafe(3), 1003); + ASSERT_TRUE(rs->Next()); + ASSERT_EQ(456, rs->GetInt32Unsafe(0)); + ASSERT_EQ(654, rs->GetInt32Unsafe(1)); + ASSERT_EQ("hello", rs->GetStringUnsafe(2)); + ASSERT_EQ(rs->GetInt64Unsafe(3), 1001); + ASSERT_TRUE(rs->Next()); ASSERT_EQ(123, rs->GetInt32Unsafe(0)); ASSERT_EQ(321, rs->GetInt32Unsafe(1)); @@ -537,92 +574,81 @@ TEST_F(SQLRouterTest, test_sql_insert_placeholder_with_column_key_1) { ASSERT_EQ("hello", rs->GetStringUnsafe(2)); ASSERT_EQ(rs->GetInt64Unsafe(3), 1002); - ASSERT_TRUE(rs->Next()); - ASSERT_EQ(456, rs->GetInt32Unsafe(0)); - ASSERT_EQ(654, rs->GetInt32Unsafe(1)); - ASSERT_EQ("hello", rs->GetStringUnsafe(2)); - ASSERT_EQ(rs->GetInt64Unsafe(3), 1001); - ASSERT_FALSE(rs->Next()); - ok = router->ExecuteDDL(db, "drop table " + name + ";", &status); + ok = router_->ExecuteDDL(db, "drop table " + name + ";", &status); ASSERT_TRUE(ok); - ok = router->DropDB(db, &status); + ok = router_->DropDB(db, &status); ASSERT_TRUE(ok); } -TEST_F(SQLRouterTest, test_sql_insert_placeholder_with_column_key_2) { - SQLRouterOptions sql_opt; - sql_opt.zk_cluster = mc_->GetZkCluster(); - sql_opt.zk_path = mc_->GetZkPath(); - auto router = NewClusterSQLRouter(sql_opt); - ASSERT_TRUE(router != nullptr); +TEST_F(SQLRouterTest, testSqlInsertPlaceholderWithColumnKey2) { std::string name = "test" + GenRand(); std::string db = "db" + GenRand(); ::hybridse::sdk::Status status; - bool ok = router->CreateDB(db, &status); + bool ok = router_->CreateDB(db, &status); ASSERT_TRUE(ok); std::string ddl = "create table " + name + "(" "col1 string NOT NULL, col2 bigint NOT NULL, col3 date NOT NULL, col4 " "int NOT NULL, index(key=(col1, col4), ts=col2));"; status = ::hybridse::sdk::Status(); - ok = router->ExecuteDDL(db, ddl, &status); + ok = router_->ExecuteDDL(db, ddl, &status); ASSERT_TRUE(ok); - ASSERT_TRUE(router->RefreshCatalog()); + ASSERT_TRUE(router_->RefreshCatalog()); std::string insert1 = "insert into " + name + " values(?, ?, ?, ?);"; status = ::hybridse::sdk::Status(); - std::shared_ptr r1 = router->GetInsertRow(db, insert1, &status); + std::shared_ptr r1 = router_->GetInsertRow(db, insert1, &status); ASSERT_TRUE(r1->Init(5)); ASSERT_TRUE(r1->AppendString("hello")); ASSERT_TRUE(r1->AppendInt64(1000)); ASSERT_TRUE(r1->AppendDate(2020, 7, 13)); ASSERT_TRUE(r1->AppendInt32(123)); status = ::hybridse::sdk::Status(); - ok = router->ExecuteInsert(db, insert1, r1, &status); + ok = router_->ExecuteInsert(db, insert1, r1, &status); ASSERT_TRUE(ok); std::string insert2 = "insert into " + name + " values('hello', ?, ?, ?);"; status = ::hybridse::sdk::Status(); - std::shared_ptr r2 = router->GetInsertRow(db, insert2, &status); + std::shared_ptr r2 = router_->GetInsertRow(db, insert2, &status); ASSERT_TRUE(r2->Init(0)); ASSERT_TRUE(r2->AppendInt64(1001)); ASSERT_TRUE(r2->AppendDate(2020, 7, 20)); ASSERT_TRUE(r2->AppendInt32(456)); status = ::hybridse::sdk::Status(); - ok = router->ExecuteInsert(db, insert2, r2, &status); + ok = router_->ExecuteInsert(db, insert2, r2, &status); ASSERT_TRUE(ok); std::string insert3 = "insert into " + name + " values(?, ?, ?, 789);"; status = ::hybridse::sdk::Status(); - std::shared_ptr r3 = router->GetInsertRow(db, insert3, &status); + std::shared_ptr r3 = router_->GetInsertRow(db, insert3, &status); ASSERT_TRUE(r3->Init(5)); ASSERT_TRUE(r3->AppendString("hello")); ASSERT_TRUE(r3->AppendInt64(1002)); ASSERT_TRUE(r3->AppendDate(2020, 7, 22)); status = ::hybridse::sdk::Status(); - ok = router->ExecuteInsert(db, insert3, r3, &status); + ok = router_->ExecuteInsert(db, insert3, r3, &status); ASSERT_TRUE(ok); std::string insert4 = "insert into " + name + " values('hello', ?, ?, 000);"; status = ::hybridse::sdk::Status(); - std::shared_ptr r4 = router->GetInsertRow(db, insert4, &status); + std::shared_ptr r4 = router_->GetInsertRow(db, insert4, &status); ASSERT_TRUE(r4->Init(0)); ASSERT_TRUE(r4->AppendInt64(1003)); ASSERT_TRUE(r4->AppendDate(2020, 7, 22)); status = ::hybridse::sdk::Status(); - ok = router->ExecuteInsert(db, insert4, r4, &status); + ok = router_->ExecuteInsert(db, insert4, r4, &status); ASSERT_TRUE(ok); std::string insert5 = "insert into " + name + " values('hello', 1004, '2020-07-31', 001);"; status = ::hybridse::sdk::Status(); - ok = router->ExecuteInsert(db, insert5, &status); + ok = router_->ExecuteInsert(db, insert5, &status); ASSERT_TRUE(ok); std::string insert6 = "insert into " + name + " values('hello', 1004, '2020-07-31', ?);"; status = ::hybridse::sdk::Status(); - ok = router->ExecuteInsert(db, insert6, &status); + ok = router_->ExecuteInsert(db, insert6, &status); ASSERT_FALSE(ok); int32_t year; @@ -630,7 +656,7 @@ TEST_F(SQLRouterTest, test_sql_insert_placeholder_with_column_key_2) { int32_t day; std::string select = "select * from " + name + ";"; status = ::hybridse::sdk::Status(); - auto rs = router->ExecuteSQL(db, select, &status); + auto rs = router_->ExecuteSQL(db, select, &status); ASSERT_TRUE(nullptr != rs) << status.msg; ASSERT_EQ(5, rs->Size()); @@ -645,74 +671,71 @@ TEST_F(SQLRouterTest, test_sql_insert_placeholder_with_column_key_2) { ASSERT_TRUE(rs->Next()); ASSERT_EQ("hello", rs->GetStringUnsafe(0)); - ASSERT_EQ(rs->GetInt64Unsafe(1), 1003); + ASSERT_EQ(rs->GetInt64Unsafe(1), 1000); ASSERT_TRUE(rs->GetDate(2, &year, &month, &day)); ASSERT_EQ(year, 2020); ASSERT_EQ(month, 7); - ASSERT_EQ(day, 22); - ASSERT_EQ(0, rs->GetInt32Unsafe(3)); + ASSERT_EQ(day, 13); + ASSERT_EQ(123, rs->GetInt32Unsafe(3)); ASSERT_TRUE(rs->Next()); ASSERT_EQ("hello", rs->GetStringUnsafe(0)); - ASSERT_EQ(rs->GetInt64Unsafe(1), 1002); + ASSERT_EQ(rs->GetInt64Unsafe(1), 1004); ASSERT_TRUE(rs->GetDate(2, &year, &month, &day)); ASSERT_EQ(year, 2020); ASSERT_EQ(month, 7); - ASSERT_EQ(day, 22); - ASSERT_EQ(789, rs->GetInt32Unsafe(3)); + ASSERT_EQ(day, 31); + ASSERT_EQ(1, rs->GetInt32Unsafe(3)); ASSERT_TRUE(rs->Next()); ASSERT_EQ("hello", rs->GetStringUnsafe(0)); - ASSERT_EQ(rs->GetInt64Unsafe(1), 1000); + ASSERT_EQ(rs->GetInt64Unsafe(1), 1003); ASSERT_TRUE(rs->GetDate(2, &year, &month, &day)); ASSERT_EQ(year, 2020); ASSERT_EQ(month, 7); - ASSERT_EQ(day, 13); - ASSERT_EQ(123, rs->GetInt32Unsafe(3)); + ASSERT_EQ(day, 22); + ASSERT_EQ(0, rs->GetInt32Unsafe(3)); ASSERT_TRUE(rs->Next()); ASSERT_EQ("hello", rs->GetStringUnsafe(0)); - ASSERT_EQ(rs->GetInt64Unsafe(1), 1004); + ASSERT_EQ(rs->GetInt64Unsafe(1), 1002); ASSERT_TRUE(rs->GetDate(2, &year, &month, &day)); ASSERT_EQ(year, 2020); ASSERT_EQ(month, 7); - ASSERT_EQ(day, 31); - ASSERT_EQ(1, rs->GetInt32Unsafe(3)); + ASSERT_EQ(day, 22); + ASSERT_EQ(789, rs->GetInt32Unsafe(3)); + + ASSERT_FALSE(rs->Next()); status = ::hybridse::sdk::Status(); - ok = router->ExecuteDDL(db, "drop table " + name + ";", &status); + ok = router_->ExecuteDDL(db, "drop table " + name + ";", &status); ASSERT_TRUE(ok); - ok = router->DropDB(db, &status); + ok = router_->DropDB(db, &status); ASSERT_TRUE(ok); } TEST_F(SQLRouterTest, test_sql_insert_placeholder_with_type_check) { - SQLRouterOptions sql_opt; - sql_opt.zk_cluster = mc_->GetZkCluster(); - sql_opt.zk_path = mc_->GetZkPath(); - auto router = NewClusterSQLRouter(sql_opt); - ASSERT_TRUE(router != nullptr); std::string name = "test" + GenRand(); std::string db = "db" + GenRand(); ::hybridse::sdk::Status status; - bool ok = router->CreateDB(db, &status); + bool ok = router_->CreateDB(db, &status); ASSERT_TRUE(ok); std::string ddl = "create table " + name + "(" "col1 string NOT NULL, col2 bigint NOT NULL, col3 date NOT NULL, col4 " "int, col5 smallint, col6 float, col7 double," "index(key=col1, ts=col2));"; - ok = router->ExecuteDDL(db, ddl, &status); + ok = router_->ExecuteDDL(db, ddl, &status); ASSERT_TRUE(ok); - ASSERT_TRUE(router->RefreshCatalog()); + ASSERT_TRUE(router_->RefreshCatalog()); // test null std::string insert1 = "insert into " + name + " values(?, ?, ?, ?, ?, ?, ?);"; status = hybridse::sdk::Status(); - std::shared_ptr r1 = router->GetInsertRow(db, insert1, &status); + std::shared_ptr r1 = router_->GetInsertRow(db, insert1, &status); // test schema std::shared_ptr schema = r1->GetSchema(); @@ -740,35 +763,35 @@ TEST_F(SQLRouterTest, test_sql_insert_placeholder_with_type_check) { ASSERT_TRUE(r1->AppendNULL()); // appendnull automatically status = hybridse::sdk::Status(); - ok = router->ExecuteInsert(db, insert1, r1, &status); + ok = router_->ExecuteInsert(db, insert1, r1, &status); ASSERT_TRUE(ok); // test int convert and float convert std::string insert2 = "insert into " + name + " values('hello', ?, '2020-02-29', NULL, 123, 2.33, NULL);"; status = hybridse::sdk::Status(); - std::shared_ptr r2 = router->GetInsertRow(db, insert2, &status); + std::shared_ptr r2 = router_->GetInsertRow(db, insert2, &status); ASSERT_EQ(status.code, 0); ASSERT_TRUE(r2->Init(0)); ASSERT_TRUE(r2->AppendInt64(1001)); status = hybridse::sdk::Status(); - ok = router->ExecuteInsert(db, insert2, r2, &status); + ok = router_->ExecuteInsert(db, insert2, r2, &status); ASSERT_TRUE(ok); // test int to float std::string insert3 = "insert into " + name + " values('hello', ?, '2020-12-31', NULL, NULL, 123, 123);"; status = hybridse::sdk::Status(); - std::shared_ptr r3 = router->GetInsertRow(db, insert3, &status); + std::shared_ptr r3 = router_->GetInsertRow(db, insert3, &status); ASSERT_EQ(status.code, 0); ASSERT_TRUE(r3->Init(0)); ASSERT_TRUE(r3->AppendInt64(1002)); status = hybridse::sdk::Status(); - ok = router->ExecuteInsert(db, insert3, r3, &status); + ok = router_->ExecuteInsert(db, insert3, r3, &status); ASSERT_TRUE(ok); // test float to int std::string insert4 = "insert into " + name + " values('hello', ?, '2020-02-29', 2.33, 2.33, 123, 123);"; status = hybridse::sdk::Status(); - std::shared_ptr r4 = router->GetInsertRow(db, insert4, &status); + std::shared_ptr r4 = router_->GetInsertRow(db, insert4, &status); ASSERT_EQ(status.code, 1); int32_t year; @@ -776,7 +799,8 @@ TEST_F(SQLRouterTest, test_sql_insert_placeholder_with_type_check) { int32_t day; std::string select = "select * from " + name + ";"; status = hybridse::sdk::Status(); - auto rs = router->ExecuteSQL(db, select, &status); + auto rs = router_->ExecuteSQL(db, select, &status); + ASSERT_TRUE(status.IsOK()) << status.msg; ASSERT_EQ(3, rs->Size()); ASSERT_TRUE(rs->Next()); @@ -818,50 +842,44 @@ TEST_F(SQLRouterTest, test_sql_insert_placeholder_with_type_check) { ASSERT_FALSE(rs->Next()); status = hybridse::sdk::Status(); - ok = router->ExecuteDDL(db, "drop table " + name + ";", &status); + ok = router_->ExecuteDDL(db, "drop table " + name + ";", &status); ASSERT_TRUE(ok); status = hybridse::sdk::Status(); - ok = router->DropDB(db, &status); + ok = router_->DropDB(db, &status); ASSERT_TRUE(ok); } TEST_F(SQLRouterTest, smoketest_on_sql) { - SQLRouterOptions sql_opt; - sql_opt.zk_cluster = mc_->GetZkCluster(); - sql_opt.zk_path = mc_->GetZkPath(); - sql_opt.enable_debug = hybridse::sqlcase::SqlCase::IsDebug(); - auto router = NewClusterSQLRouter(sql_opt); - ASSERT_TRUE(router != nullptr); std::string name = "test" + GenRand(); std::string db = "db" + GenRand(); ::hybridse::sdk::Status status; - bool ok = router->CreateDB(db, &status); + bool ok = router_->CreateDB(db, &status); ASSERT_TRUE(ok); std::string ddl = "create table " + name + "(" "col1 string, col2 bigint," "index(key=col1, ts=col2));"; - ok = router->ExecuteDDL(db, ddl, &status); + ok = router_->ExecuteDDL(db, ddl, &status); ASSERT_TRUE(ok); - ASSERT_TRUE(router->RefreshCatalog()); + ASSERT_TRUE(router_->RefreshCatalog()); std::string insert = "insert into " + name + " values('hello', 1590);"; - ok = router->ExecuteInsert(db, insert, &status); + ok = router_->ExecuteInsert(db, insert, &status); ASSERT_TRUE(ok); - ASSERT_TRUE(router->RefreshCatalog()); + ASSERT_TRUE(router_->RefreshCatalog()); std::string sql_select = "select col1 from " + name + " ;"; - auto rs = router->ExecuteSQL(db, sql_select, &status); + auto rs = router_->ExecuteSQL(db, sql_select, &status); ASSERT_TRUE(rs != nullptr); ASSERT_EQ(1, rs->Size()); ASSERT_TRUE(rs->Next()); ASSERT_EQ("hello", rs->GetStringUnsafe(0)); std::string sql_window_batch = "select sum(col2) over w from " + name + " window w as (partition by " + name + ".col1 order by " + name + ".col2 ROWS BETWEEN 3 PRECEDING AND CURRENT ROW);"; - rs = router->ExecuteSQL(db, sql_window_batch, &status); + rs = router_->ExecuteSQL(db, sql_window_batch, &status); ASSERT_EQ(1, rs->Size()); ASSERT_TRUE(rs->Next()); ASSERT_EQ(1590, rs->GetInt64Unsafe(0)); { - std::shared_ptr row = router->GetRequestRow(db, sql_window_batch, &status); + std::shared_ptr row = router_->GetRequestRow(db, sql_window_batch, &status); ASSERT_TRUE(row != nullptr); ASSERT_EQ(2, row->GetSchema()->GetColumnCnt()); ASSERT_TRUE(row->Init(5)); @@ -873,14 +891,14 @@ TEST_F(SQLRouterTest, smoketest_on_sql) { " window w as (partition by " + name + ".col1 order by " + name + ".col2 ROWS BETWEEN 3 PRECEDING AND CURRENT ROW);"; - rs = router->ExecuteSQLRequest(db, sql_window_request, row, &status); + rs = router_->ExecuteSQLRequest(db, sql_window_request, row, &status); ASSERT_TRUE(rs != nullptr); ASSERT_EQ(1, rs->Size()); ASSERT_TRUE(rs->Next()); ASSERT_EQ(100, rs->GetInt64Unsafe(0)); } { - std::shared_ptr row = router->GetRequestRow(db, sql_window_batch, &status); + std::shared_ptr row = router_->GetRequestRow(db, sql_window_batch, &status); ASSERT_TRUE(row != nullptr); ASSERT_EQ(2, row->GetSchema()->GetColumnCnt()); ASSERT_TRUE(row->Init(5)); @@ -892,104 +910,89 @@ TEST_F(SQLRouterTest, smoketest_on_sql) { " window w as (partition by " + name + ".col1 order by " + name + ".col2 ROWS BETWEEN 3 PRECEDING AND CURRENT ROW);"; - rs = router->ExecuteSQLRequest(db, sql_window_request, row, &status); + rs = router_->ExecuteSQLRequest(db, sql_window_request, row, &status); ASSERT_TRUE(rs != nullptr); ASSERT_EQ(1, rs->Size()); ASSERT_TRUE(rs->Next()); ASSERT_EQ(100, rs->GetInt64Unsafe(0)); } - ok = router->ExecuteDDL(db, "drop table " + name + ";", &status); + ok = router_->ExecuteDDL(db, "drop table " + name + ";", &status); ASSERT_TRUE(ok); - ok = router->DropDB(db, &status); + ok = router_->DropDB(db, &status); ASSERT_TRUE(ok); } TEST_F(SQLRouterTest, smoke_explain_on_sql) { - SQLRouterOptions sql_opt; - sql_opt.zk_cluster = mc_->GetZkCluster(); - sql_opt.zk_path = mc_->GetZkPath(); - auto router = NewClusterSQLRouter(sql_opt); - ASSERT_TRUE(router != nullptr); std::string name = "test" + GenRand(); std::string db = "db" + GenRand(); ::hybridse::sdk::Status status; - bool ok = router->CreateDB(db, &status); + bool ok = router_->CreateDB(db, &status); ASSERT_TRUE(ok); std::string ddl = "create table " + name + "(" "col1 string, col2 timestamp, col3 date," "index(key=col1, ts=col2));"; - ok = router->ExecuteDDL(db, ddl, &status); + ok = router_->ExecuteDDL(db, ddl, &status); ASSERT_TRUE(ok); - ASSERT_TRUE(router->RefreshCatalog()); + ASSERT_TRUE(router_->RefreshCatalog()); std::string insert = "insert into " + name + " values('hello', 1591174600000l, '2020-06-03');"; - ok = router->ExecuteInsert(db, insert, &status); + ok = router_->ExecuteInsert(db, insert, &status); ASSERT_TRUE(ok); - ASSERT_TRUE(router->RefreshCatalog()); + ASSERT_TRUE(router_->RefreshCatalog()); std::string sql_select = "select * from " + name + " ;"; - auto explain = router->Explain(db, sql_select, &status); + auto explain = router_->Explain(db, sql_select, &status); ASSERT_TRUE(explain != nullptr); - ok = router->ExecuteDDL(db, "drop table " + name + ";", &status); + ok = router_->ExecuteDDL(db, "drop table " + name + ";", &status); ASSERT_TRUE(ok); - ok = router->DropDB(db, &status); + ok = router_->DropDB(db, &status); ASSERT_TRUE(ok); } TEST_F(SQLRouterTest, smoke_not_null) { - SQLRouterOptions sql_opt; - sql_opt.zk_cluster = mc_->GetZkCluster(); - sql_opt.zk_path = mc_->GetZkPath(); - auto router = NewClusterSQLRouter(sql_opt); - ASSERT_TRUE(router != nullptr); std::string name = "test" + GenRand(); std::string db = "db" + GenRand(); ::hybridse::sdk::Status status; - bool ok = router->CreateDB(db, &status); + bool ok = router_->CreateDB(db, &status); ASSERT_TRUE(ok); std::string ddl = "create table " + name + "(" "col1 string, col2 timestamp, col3 date not null," "index(key=col1, ts=col2));"; - ok = router->ExecuteDDL(db, ddl, &status); + ok = router_->ExecuteDDL(db, ddl, &status); ASSERT_TRUE(ok); - ASSERT_TRUE(router->RefreshCatalog()); + ASSERT_TRUE(router_->RefreshCatalog()); std::string insert = "insert into " + name + " values('hello', 1591174600000l, null);"; - ok = router->ExecuteInsert(db, insert, &status); + ok = router_->ExecuteInsert(db, insert, &status); ASSERT_FALSE(ok); - ok = router->ExecuteDDL(db, "drop table " + name + ";", &status); + ok = router_->ExecuteDDL(db, "drop table " + name + ";", &status); ASSERT_TRUE(ok); - ok = router->DropDB(db, &status); + ok = router_->DropDB(db, &status); ASSERT_TRUE(ok); } TEST_F(SQLRouterTest, smoketimestamptest_on_sql) { - SQLRouterOptions sql_opt; - sql_opt.zk_cluster = mc_->GetZkCluster(); - sql_opt.zk_path = mc_->GetZkPath(); - auto router = NewClusterSQLRouter(sql_opt); - ASSERT_TRUE(router != nullptr); std::string name = "test" + GenRand(); std::string db = "db" + GenRand(); ::hybridse::sdk::Status status; - bool ok = router->CreateDB(db, &status); + bool ok = router_->CreateDB(db, &status); ASSERT_TRUE(ok); std::string ddl = "create table " + name + "(" "col1 string, col2 timestamp, col3 date," "index(key=col1, ts=col2));"; - ok = router->ExecuteDDL(db, ddl, &status); + ok = router_->ExecuteDDL(db, ddl, &status); ASSERT_TRUE(ok); - ASSERT_TRUE(router->RefreshCatalog()); + ASSERT_TRUE(router_->RefreshCatalog()); std::string insert = "insert into " + name + " values('hello', 1591174600000l, '2020-06-03');"; - ok = router->ExecuteInsert(db, insert, &status); + ok = router_->ExecuteInsert(db, insert, &status); ASSERT_TRUE(ok); - ASSERT_TRUE(router->RefreshCatalog()); + ASSERT_TRUE(router_->RefreshCatalog()); std::string sql_select = "select * from " + name + " ;"; - auto rs = router->ExecuteSQL(db, sql_select, &status); + auto rs = router_->ExecuteSQL(db, sql_select, &status); ASSERT_TRUE(rs != nullptr); ASSERT_EQ(1, rs->Size()); ASSERT_EQ(3, rs->GetSchema()->GetColumnCnt()); @@ -1005,53 +1008,73 @@ TEST_F(SQLRouterTest, smoketimestamptest_on_sql) { ASSERT_EQ(3, day); ASSERT_FALSE(rs->Next()); - ok = router->ExecuteDDL(db, "drop table " + name + ";", &status); + ok = router_->ExecuteDDL(db, "drop table " + name + ";", &status); ASSERT_TRUE(ok); - ok = router->DropDB(db, &status); + ok = router_->DropDB(db, &status); ASSERT_TRUE(ok); } TEST_F(SQLRouterTest, smoketest_on_muti_partitions) { - SQLRouterOptions sql_opt; - sql_opt.zk_cluster = mc_->GetZkCluster(); - sql_opt.zk_path = mc_->GetZkPath(); - auto router = NewClusterSQLRouter(sql_opt); - ASSERT_TRUE(router != nullptr); std::string name = "test" + GenRand(); std::string db = "db" + GenRand(); ::hybridse::sdk::Status status; - bool ok = router->CreateDB(db, &status); + bool ok = router_->CreateDB(db, &status); ASSERT_TRUE(ok); auto endpoints = mc_->GetTbEndpoint(); std::string ddl = "create table " + name + "(" "col1 string, col2 bigint," "index(key=col1, ts=col2)) options(partitionnum=8);"; - ok = router->ExecuteDDL(db, ddl, &status); + ok = router_->ExecuteDDL(db, ddl, &status); ASSERT_TRUE(ok); - ASSERT_TRUE(router->RefreshCatalog()); + ASSERT_TRUE(router_->RefreshCatalog()); for (int i = 0; i < 100; i++) { std::string key = "'hello" + std::to_string(i) + "'"; std::string insert = "insert into " + name + " values(" + key + ", 1590);"; - ok = router->ExecuteInsert(db, insert, &status); + ok = router_->ExecuteInsert(db, insert, &status); ASSERT_TRUE(ok); } - ASSERT_TRUE(router->RefreshCatalog()); + ASSERT_TRUE(router_->RefreshCatalog()); std::string sql_select = "select col1 from " + name + " ;"; - auto rs = router->ExecuteSQL(db, sql_select, &status); + auto rs = router_->ExecuteSQL(db, sql_select, &status); ASSERT_TRUE(rs != nullptr); ASSERT_EQ(100, rs->Size()); ASSERT_TRUE(rs->Next()); - ok = router->ExecuteDDL(db, "drop table " + name + ";", &status); + ok = router_->ExecuteDDL(db, "drop table " + name + ";", &status); ASSERT_TRUE(ok); - ok = router->DropDB(db, &status); + ok = router_->DropDB(db, &status); ASSERT_TRUE(ok); } +TEST_F(SQLRouterTest, testGetHoleIdx) { + ::hybridse::sdk::Status status; + router_->ExecuteSQL("create database if not exists " + db_, &status); + ASSERT_TRUE(status.IsOK()); + std::string name("get_hold_idx_test"); + router_->ExecuteSQL(db_, "create table if not exists " + name + "(c1 int, c2 string, c3 timestamp)", &status); + ASSERT_TRUE(status.IsOK()); + ASSERT_TRUE(router_->RefreshCatalog()); + + // test + std::string insert1 = "insert into " + name + "(c3,c2,c1) values(?, 'abc', ?);"; + auto r1 = router_->GetInsertRow(db_, insert1, &status); + ASSERT_TRUE(r1); + auto hole_vec = r1->GetHoleIdx(); + ASSERT_EQ(hole_vec.size(), 2); + ASSERT_THAT(hole_vec, ::testing::ElementsAre(2, 0)); + + std::string insert2 = "insert into " + name + "(c3,c2,c1) values(?, 'abc', 123456);"; + auto r2 = router_->GetInsertRow(db_, insert2, &status); + ASSERT_TRUE(r2); + hole_vec = r2->GetHoleIdx(); + ASSERT_EQ(hole_vec.size(), 1); + ASSERT_THAT(hole_vec, ::testing::ElementsAre(2)); +} + class TableSchemaBuilder { public: - explicit TableSchemaBuilder(std::string table_name) : table_name_(table_name) {} - TableSchemaBuilder& AddCol(std::string name, hybridse::sdk::DataType type) { + explicit TableSchemaBuilder(std::string table_name) : table_name_(std::move(table_name)) {} + TableSchemaBuilder& AddCol(const std::string& name, hybridse::sdk::DataType type) { cols_.emplace_back(name, type); return *this; } @@ -1131,20 +1154,20 @@ TEST_F(SQLRouterTest, DDLParseMethods) { TEST_F(SQLRouterTest, DDLParseMethodsCombineIndex) { std::string sql = - "select reqId as reqId_75, \n" - "max(`fWatchedTimeLen`) over bo_hislabel_zUserId_uUserId_ingestionTime_1s_172801s_100 " - "as bo_hislabel_fWatchedTimeLen_multi_max_74, \n" - "avg(`fWatchedTimeLen`) over bo_hislabel_zUserId_uUserId_ingestionTime_1s_172801s_100 " - "as bo_hislabel_fWatchedTimeLen_multi_avg_75 \n" - "from \n" - "(select `eventTime` as `ingestionTime`, `zUserId` as `zUserId`, `uUserId` as `uUserId`, " - "timestamp('2019-07-18 09:20:20') as `nRequestTime`, double(0) as `fWatchedTimeLen`, " - "reqId from `flattenRequest`) \n" - "window bo_hislabel_zUserId_uUserId_ingestionTime_1s_172801s_100 as ( \n" - "UNION (select `ingestionTime`, `zUserId`, `uUserId`, `nRequestTime`, `fWatchedTimeLen`, " - "'' as reqId from `bo_hislabel`) \n" - "partition by `zUserId`,`uUserId` order by `ingestionTime` " - "rows_range between 172800999 preceding and 1s preceding MAXSIZE 100 INSTANCE_NOT_IN_WINDOW);"; + "select reqId as reqId_75, \n" + "max(`fWatchedTimeLen`) over bo_hislabel_zUserId_uUserId_ingestionTime_1s_172801s_100 " + "as bo_hislabel_fWatchedTimeLen_multi_max_74, \n" + "avg(`fWatchedTimeLen`) over bo_hislabel_zUserId_uUserId_ingestionTime_1s_172801s_100 " + "as bo_hislabel_fWatchedTimeLen_multi_avg_75 \n" + "from \n" + "(select `eventTime` as `ingestionTime`, `zUserId` as `zUserId`, `uUserId` as `uUserId`, " + "timestamp('2019-07-18 09:20:20') as `nRequestTime`, double(0) as `fWatchedTimeLen`, " + "reqId from `flattenRequest`) \n" + "window bo_hislabel_zUserId_uUserId_ingestionTime_1s_172801s_100 as ( \n" + "UNION (select `ingestionTime`, `zUserId`, `uUserId`, `nRequestTime`, `fWatchedTimeLen`, " + "'' as reqId from `bo_hislabel`) \n" + "partition by `zUserId`,`uUserId` order by `ingestionTime` " + "rows_range between 172800999 preceding and 1s preceding MAXSIZE 100 INSTANCE_NOT_IN_WINDOW);"; std::vector>>> table_map; { TableSchemaBuilder builder("flattenRequest"); @@ -1174,19 +1197,18 @@ TEST_F(SQLRouterTest, DDLParseMethodsCombineIndex) { ddl_list.at(0)); } -} // namespace sdk -} // namespace openmldb +} // namespace openmldb::sdk int main(int argc, char** argv) { ::hybridse::vm::Engine::InitializeGlobalLLVM(); ::testing::InitGoogleTest(&argc, argv); ::google::ParseCommandLineFlags(&argc, &argv, true); FLAGS_zk_session_timeout = 100000; - ::openmldb::sdk::MiniCluster mc(6181); + ::openmldb::sdk::MiniCluster mc(2181); ::openmldb::sdk::mc_ = &mc; int ok = ::openmldb::sdk::mc_->SetUp(1); sleep(1); - srand(time(NULL)); + srand(time(nullptr)); ok = RUN_ALL_TESTS(); ::openmldb::sdk::mc_->Close(); return ok;