From 69c5a8b5a319932f3dc07d336a35480bdefc13c0 Mon Sep 17 00:00:00 2001 From: dl239 Date: Fri, 27 Oct 2023 15:17:36 +0800 Subject: [PATCH 01/12] feat: support compress --- hybridse/include/node/node_enum.h | 6 +++++ hybridse/include/node/node_manager.h | 2 -- hybridse/include/node/sql_node.h | 33 ++++++++++++++++++++--- hybridse/src/node/node_manager.cc | 5 ---- hybridse/src/node/sql_node.cc | 12 +++++++++ hybridse/src/planv2/ast_node_converter.cc | 14 ++++++++-- src/sdk/node_adapter.cc | 6 +++++ 7 files changed, 66 insertions(+), 12 deletions(-) diff --git a/hybridse/include/node/node_enum.h b/hybridse/include/node/node_enum.h index 16e18291478..b903eaafdd5 100644 --- a/hybridse/include/node/node_enum.h +++ b/hybridse/include/node/node_enum.h @@ -97,6 +97,7 @@ enum SqlNodeType { kWithClauseEntry, kAlterTableStmt, kShowStmt, + kCompressType, kSqlNodeTypeLast, // debug type }; @@ -342,6 +343,11 @@ enum StorageMode { kHDD = 3, }; +enum CompressType { + kNoCompress = 0, + kSnappy = 1, +}; + // batch plan node type enum BatchPlanNodeType { kBatchDataset, kBatchPartition, kBatchMap }; diff --git a/hybridse/include/node/node_manager.h b/hybridse/include/node/node_manager.h index ab87e588a53..e70f0a59564 100644 --- a/hybridse/include/node/node_manager.h +++ b/hybridse/include/node/node_manager.h @@ -399,8 +399,6 @@ class NodeManager { SqlNode *MakeReplicaNumNode(int num); - SqlNode *MakeStorageModeNode(StorageMode storage_mode); - SqlNode *MakePartitionNumNode(int num); SqlNode *MakeDistributionsNode(const NodePointVector& distribution_list); diff --git a/hybridse/include/node/sql_node.h b/hybridse/include/node/sql_node.h index bbdfc83313f..507f8215fe9 100644 --- a/hybridse/include/node/sql_node.h +++ b/hybridse/include/node/sql_node.h @@ -25,6 +25,7 @@ #include #include "absl/status/statusor.h" +#include "absl/strings/match.h" #include "absl/strings/str_cat.h" #include "absl/strings/string_view.h" #include "boost/algorithm/string.hpp" @@ -309,17 +310,26 @@ inline const std::string StorageModeName(StorageMode mode) { } inline const StorageMode NameToStorageMode(const std::string& name) { - if (boost::iequals(name, "memory")) { + if (absl::EqualsIgnoreCase(name, "memory")) { return kMemory; - } else if (boost::iequals(name, "hdd")) { + } else if (absl::EqualsIgnoreCase(name, "hdd")) { return kHDD; - } else if (boost::iequals(name, "ssd")) { + } else if (absl::EqualsIgnoreCase(name, "ssd")) { return kSSD; } else { return kUnknown; } } +inline absl::StatusOr NameToCompressType(const std::string& name) { + if (absl::EqualsIgnoreCase(name, "snappy")) { + return CompressType::kSnappy; + } else if (absl::EqualsIgnoreCase(name, "nocompress")) { + return CompressType::kNoCompress; + } + return absl::Status(absl::StatusCode::kInvalidArgument, absl::StrCat("invalid compress type: ", name)); +} + inline const std::string RoleTypeName(RoleType type) { switch (type) { case kLeader: @@ -1881,6 +1891,23 @@ class StorageModeNode : public SqlNode { StorageMode storage_mode_; }; +class CompressTypeNode : public SqlNode { + public: + CompressTypeNode() : SqlNode(kCompressType, 0, 0), compress_type_(kNoCompress) {} + + explicit CompressTypeNode(CompressType compress_type) + : SqlNode(kStorageMode, 0, 0), compress_type_(compress_type) {} + + ~CompressTypeNode() {} + + CompressType GetCompressType() const { return compress_type_; } + + void Print(std::ostream &output, const std::string &org_tab) const; + + private: + CompressType compress_type_; +}; + class CreateTableLikeClause { public: CreateTableLikeClause() = default; diff --git a/hybridse/src/node/node_manager.cc b/hybridse/src/node/node_manager.cc index 8f6f80d7517..f60ba20d6b2 100644 --- a/hybridse/src/node/node_manager.cc +++ b/hybridse/src/node/node_manager.cc @@ -1031,11 +1031,6 @@ SqlNode *NodeManager::MakeReplicaNumNode(int num) { return RegisterNode(node_ptr); } -SqlNode *NodeManager::MakeStorageModeNode(StorageMode storage_mode) { - SqlNode *node_ptr = new StorageModeNode(storage_mode); - return RegisterNode(node_ptr); -} - SqlNode *NodeManager::MakePartitionNumNode(int num) { SqlNode *node_ptr = new PartitionNumNode(num); return RegisterNode(node_ptr); diff --git a/hybridse/src/node/sql_node.cc b/hybridse/src/node/sql_node.cc index 16b88cd51ba..935923a8177 100644 --- a/hybridse/src/node/sql_node.cc +++ b/hybridse/src/node/sql_node.cc @@ -1168,6 +1168,7 @@ static absl::flat_hash_map CreateSqlNodeTypeToNa {kReplicaNum, "kReplicaNum"}, {kPartitionNum, "kPartitionNum"}, {kStorageMode, "kStorageMode"}, + {kCompressType, "kCompressType"}, {kFn, "kFn"}, {kFnParaList, "kFnParaList"}, {kCreateSpStmt, "kCreateSpStmt"}, @@ -2598,6 +2599,17 @@ void StorageModeNode::Print(std::ostream &output, const std::string &org_tab) co PrintValue(output, tab, StorageModeName(storage_mode_), "storage_mode", true); } +void CompressTypeNode::Print(std::ostream &output, const std::string &org_tab) const { + SqlNode::Print(output, org_tab); + const std::string tab = org_tab + INDENT + SPACE_ED; + output << "\n"; + if (compress_type_ == CompressType::kSnappy) { + PrintValue(output, tab, "snappy", "compress_type", true); + } else { + PrintValue(output, tab, "nocompress", "compress_type", true); + } +} + void PartitionNumNode::Print(std::ostream &output, const std::string &org_tab) const { SqlNode::Print(output, org_tab); const std::string tab = org_tab + INDENT + SPACE_ED; diff --git a/hybridse/src/planv2/ast_node_converter.cc b/hybridse/src/planv2/ast_node_converter.cc index c0c3864716b..affb85f91bc 100644 --- a/hybridse/src/planv2/ast_node_converter.cc +++ b/hybridse/src/planv2/ast_node_converter.cc @@ -1761,8 +1761,18 @@ base::Status ConvertTableOption(const zetasql::ASTOptionsEntry* entry, node::Nod } else if (absl::EqualsIgnoreCase("storage_mode", identifier_v)) { std::string storage_mode; CHECK_STATUS(AstStringLiteralToString(entry->value(), &storage_mode)); - boost::to_lower(storage_mode); - *output = node_manager->MakeStorageModeNode(node::NameToStorageMode(storage_mode)); + absl::AsciiStrToLower(&storage_mode); + *output = node_manager->MakeNode(node::NameToStorageMode(storage_mode)); + } else if (absl::EqualsIgnoreCase("compress_type", identifier_v)) { + std::string compress_type; + CHECK_STATUS(AstStringLiteralToString(entry->value(), &compress_type)); + absl::AsciiStrToLower(&compress_type); + auto ret = node::NameToCompressType(compress_type); + if (ret.ok()) { + *output = node_manager->MakeNode(*ret); + } else { + return base::Status(common::kSqlAstError, ret.status().ToString()); + } } else { return base::Status(common::kSqlAstError, absl::StrCat("invalid option ", identifier)); } diff --git a/src/sdk/node_adapter.cc b/src/sdk/node_adapter.cc index b148c8a4ca9..ef9de07a774 100644 --- a/src/sdk/node_adapter.cc +++ b/src/sdk/node_adapter.cc @@ -225,6 +225,7 @@ bool NodeAdapter::TransformToTableDef(::hybridse::node::CreatePlanNode* create_n hybridse::node::NodePointVector distribution_list; hybridse::node::StorageMode storage_mode = hybridse::node::kMemory; + hybridse::node::CompressType compress_type = hybridse::node::kNoCompress; // different default value for cluster and standalone mode int replica_num = 1; int partition_num = 1; @@ -253,6 +254,10 @@ bool NodeAdapter::TransformToTableDef(::hybridse::node::CreatePlanNode* create_n storage_mode = dynamic_cast(table_option)->GetStorageMode(); break; } + case hybridse::node::kCompressType: { + compress_type = dynamic_cast(table_option)->GetCompressType(); + break; + } case hybridse::node::kDistributions: { distribution_list = dynamic_cast(table_option)->GetDistributionList(); @@ -293,6 +298,7 @@ bool NodeAdapter::TransformToTableDef(::hybridse::node::CreatePlanNode* create_n table->set_replica_num(replica_num); table->set_partition_num(partition_num); table->set_storage_mode(static_cast(storage_mode)); + table->set_compress_type(static_cast(compress_type)); bool has_generate_index = false; std::set index_names; std::map column_names; From ded9ccea4cc684d42dea2381ab43e65b7e88b8be Mon Sep 17 00:00:00 2001 From: dl239 Date: Fri, 27 Oct 2023 17:54:13 +0800 Subject: [PATCH 02/12] feat: update desc --- cases/plan/create.yaml | 37 +++++++++++++++++++++++++++++ hybridse/include/node/sql_node.h | 2 +- hybridse/src/node/plan_node_test.cc | 3 ++- hybridse/src/node/sql_node_test.cc | 2 +- src/sdk/sdk_util.cc | 5 ++++ src/sdk/sql_cluster_router.cc | 6 +++-- src/sdk/sql_cluster_test.cc | 2 +- 7 files changed, 51 insertions(+), 6 deletions(-) diff --git a/cases/plan/create.yaml b/cases/plan/create.yaml index 315ec30a305..f1076934391 100644 --- a/cases/plan/create.yaml +++ b/cases/plan/create.yaml @@ -1035,3 +1035,40 @@ cases: +-kind: HIVE +-path: hdfs://path +-table_option_list: [] + + - id: 34 + desc: Create 指定压缩 + sql: | + create table t1( + column1 int, + column2 timestamp, + index(key=column1, ts=column2)) OPTIONS (compress_type="snappy"); + expect: + node_tree_str: | + +-node[CREATE] + +-table: t1 + +-IF NOT EXIST: 0 + +-column_desc_list[list]: + | +-0: + | | +-node[kColumnDesc] + | | +-column_name: column1 + | | +-column_type: int32 + | | +-NOT NULL: 0 + | +-1: + | | +-node[kColumnDesc] + | | +-column_name: column2 + | | +-column_type: timestamp + | | +-NOT NULL: 0 + | +-2: + | +-node[kColumnIndex] + | +-keys: [column1] + | +-ts_col: column2 + | +-abs_ttl: -2 + | +-lat_ttl: -2 + | +-ttl_type: + | +-version_column: + | +-version_count: 0 + +-table_option_list[list]: + +-0: + +-node[kCompressType] + +-compress_type: snappy diff --git a/hybridse/include/node/sql_node.h b/hybridse/include/node/sql_node.h index 507f8215fe9..9cc8c005afa 100644 --- a/hybridse/include/node/sql_node.h +++ b/hybridse/include/node/sql_node.h @@ -1896,7 +1896,7 @@ class CompressTypeNode : public SqlNode { CompressTypeNode() : SqlNode(kCompressType, 0, 0), compress_type_(kNoCompress) {} explicit CompressTypeNode(CompressType compress_type) - : SqlNode(kStorageMode, 0, 0), compress_type_(compress_type) {} + : SqlNode(kCompressType, 0, 0), compress_type_(compress_type) {} ~CompressTypeNode() {} diff --git a/hybridse/src/node/plan_node_test.cc b/hybridse/src/node/plan_node_test.cc index 4f0d55d0166..5ffb76142a7 100644 --- a/hybridse/src/node/plan_node_test.cc +++ b/hybridse/src/node/plan_node_test.cc @@ -239,7 +239,8 @@ TEST_F(PlanNodeTest, ExtractColumnsAndIndexsTest) { manager_->MakeColumnDescNode("col3", node::kFloat, true), manager_->MakeColumnDescNode("col4", node::kVarchar, true), manager_->MakeColumnDescNode("col5", node::kTimestamp, true), index_node}, - {manager_->MakeReplicaNumNode(3), manager_->MakePartitionNumNode(8), manager_->MakeStorageModeNode(kMemory)}, + {manager_->MakeReplicaNumNode(3), manager_->MakePartitionNumNode(8), + manager_->MakeNode(kMemory)}, false); ASSERT_TRUE(nullptr != node); std::vector columns; diff --git a/hybridse/src/node/sql_node_test.cc b/hybridse/src/node/sql_node_test.cc index 545d9b647fd..227cb80dcea 100644 --- a/hybridse/src/node/sql_node_test.cc +++ b/hybridse/src/node/sql_node_test.cc @@ -676,7 +676,7 @@ TEST_F(SqlNodeTest, CreateIndexNodeTest) { node_manager_->MakeColumnDescNode("col4", node::kVarchar, true), node_manager_->MakeColumnDescNode("col5", node::kTimestamp, true), index_node}, {node_manager_->MakeReplicaNumNode(3), node_manager_->MakePartitionNumNode(8), - node_manager_->MakeStorageModeNode(kMemory)}, + node_manager_->MakeNode(kMemory)}, false); ASSERT_TRUE(nullptr != node); std::vector columns; diff --git a/src/sdk/sdk_util.cc b/src/sdk/sdk_util.cc index f6027f7c08b..1df87969040 100644 --- a/src/sdk/sdk_util.cc +++ b/src/sdk/sdk_util.cc @@ -88,6 +88,11 @@ std::string SDKUtil::GenCreateTableSQL(const ::openmldb::nameserver::TableInfo& } else { ss << ", STORAGE_MODE='Memory'"; } + if (table_info.compress_type() == type::CompressType::kSnappy) { + ss << ", COMPRESS_TYPE='Snappy'"; + } else { + ss << ", COMPRESS_TYPE='NoCompress'"; + } ss << ");"; return ss.str(); } diff --git a/src/sdk/sql_cluster_router.cc b/src/sdk/sql_cluster_router.cc index 90054f277aa..e77e0b12a03 100644 --- a/src/sdk/sql_cluster_router.cc +++ b/src/sdk/sql_cluster_router.cc @@ -1684,9 +1684,11 @@ std::shared_ptr SQLClusterRouter::HandleSQLCmd(const h } ss.str(""); std::unordered_map options; - options["storage_mode"] = StorageMode_Name(table->storage_mode()); + std::string storage_mode = StorageMode_Name(table->storage_mode()); // remove the prefix 'k', i.e., change kMemory to Memory - options["storage_mode"] = options["storage_mode"].substr(1, options["storage_mode"].size() - 1); + options["storage_mode"] = storage_mode.substr(1, storage_mode.size() - 1); + std::string compress_type = CompressType_Name(table->compress_type()); + options["compress_type"] = compress_type.substr(1, compress_type.size() -1); ::openmldb::cmd::PrintTableOptions(options, ss); result.emplace_back(std::vector{ss.str()}); return ResultSetSQL::MakeResultSet({FORMAT_STRING_KEY}, result, status); diff --git a/src/sdk/sql_cluster_test.cc b/src/sdk/sql_cluster_test.cc index 70b6f7a20f2..46a41a39b4e 100644 --- a/src/sdk/sql_cluster_test.cc +++ b/src/sdk/sql_cluster_test.cc @@ -265,7 +265,7 @@ TEST_F(SQLClusterDDLTest, ShowCreateTable) { "`col2` int,\n" "`col3` bigInt NOT NULL,\n" "INDEX (KEY=`col1`, TTL_TYPE=ABSOLUTE, TTL=100m)\n" - ") OPTIONS (PARTITIONNUM=1, REPLICANUM=1, STORAGE_MODE='Memory');"; + ") OPTIONS (PARTITIONNUM=1, REPLICANUM=1, STORAGE_MODE='Memory', COMPRESS_TYPE='NoCompress');"; ASSERT_TRUE(router->ExecuteDDL(db, ddl, &status)) << "ddl: " << ddl; ASSERT_TRUE(router->RefreshCatalog()); auto rs = router->ExecuteSQL(db, "show create table t1;", &status); From a364b71655f93e4bfd8e7950fa937478aa271f10 Mon Sep 17 00:00:00 2001 From: dl239 Date: Mon, 30 Oct 2023 11:28:20 +0800 Subject: [PATCH 03/12] docs: add doc --- .../reference/sql/ddl/CREATE_TABLE_STATEMENT.md | 16 +++++++++++----- docs/en/reference/sql/ddl/DESC_STATEMENT.md | 10 +++++----- .../sql/ddl/SHOW_CREATE_TABLE_STATEMENT.md | 2 +- .../openmldb_sql/ddl/CREATE_TABLE_STATEMENT.md | 16 +++++++++++----- docs/zh/openmldb_sql/ddl/DESC_STATEMENT.md | 10 +++++----- .../ddl/SHOW_CREATE_TABLE_STATEMENT.md | 2 +- 6 files changed, 34 insertions(+), 22 deletions(-) diff --git a/docs/en/reference/sql/ddl/CREATE_TABLE_STATEMENT.md b/docs/en/reference/sql/ddl/CREATE_TABLE_STATEMENT.md index a0d11d90657..ba62cf55231 100644 --- a/docs/en/reference/sql/ddl/CREATE_TABLE_STATEMENT.md +++ b/docs/en/reference/sql/ddl/CREATE_TABLE_STATEMENT.md @@ -473,6 +473,11 @@ StorageMode ::= 'Memory' | 'HDD' | 'SSD' +CompressTypeOption + ::= 'COMPRESS_TYPE' '=' CompressType +CompressType + ::= 'NoCompress' + | 'Snappy ``` @@ -484,6 +489,7 @@ StorageMode | `REPLICANUM` | It defines the number of replicas for the table. Note that the number of replicas is only configurable in Cluster version. | `OPTIONS (REPLICANUM=3)` | | `DISTRIBUTION` | It defines the distributed node endpoint configuration. Generally, it contains a Leader node and several followers. `(leader, [follower1, follower2, ..])`. Without explicit configuration, OpenMLDB will automatically configure `DISTRIBUTION` according to the environment and nodes. | `DISTRIBUTION = [ ('127.0.0.1:6527', [ '127.0.0.1:6528','127.0.0.1:6529' ])]` | | `STORAGE_MODE` | It defines the storage mode of the table. The supported modes are `Memory`, `HDD` and `SSD`. When not explicitly configured, it defaults to `Memory`.
If you need to support a storage mode other than `Memory` mode, `tablet` requires additional configuration options. For details, please refer to [tablet configuration file **conf/tablet.flags**](../../../deploy/conf.md#the-configuration-file-for-apiserver:-conf/tablet.flags). | `OPTIONS (STORAGE_MODE='HDD')` | +| `COMPRESS_TYPE` | It defines the compress types of the table. The supported compress type are `NoCompress` and `Snappy`. The default value is `NoCompress` | `OPTIONS (COMPRESS_TYPE='Snappy')` #### The Difference between Disk Table and Memory Table @@ -515,11 +521,11 @@ DESC t1; --- -------------------- ------ ---------- ------ --------------- 1 INDEX_0_1651143735 col1 std_time 0min kAbsoluteTime --- -------------------- ------ ---------- ------ --------------- - -------------- - storage_mode - -------------- - HDD - -------------- + --------------- -------------- + compress_type storage_mode + --------------- -------------- + NoCompress HDD + --------------- -------------- ``` The following sql command create a table with specified distribution. ```sql diff --git a/docs/en/reference/sql/ddl/DESC_STATEMENT.md b/docs/en/reference/sql/ddl/DESC_STATEMENT.md index 8179c952c56..a7d288064bb 100644 --- a/docs/en/reference/sql/ddl/DESC_STATEMENT.md +++ b/docs/en/reference/sql/ddl/DESC_STATEMENT.md @@ -56,11 +56,11 @@ desc t1; --- -------------------- ------ ---------- ---------- --------------- 1 INDEX_0_1658136511 col1 std_time 43200min kAbsoluteTime --- -------------------- ------ ---------- ---------- --------------- - -------------- - storage_mode - -------------- - Memory - -------------- + --------------- -------------- + compress_type storage_mode + --------------- -------------- + NoCompress Memory + --------------- -------------- ``` diff --git a/docs/en/reference/sql/ddl/SHOW_CREATE_TABLE_STATEMENT.md b/docs/en/reference/sql/ddl/SHOW_CREATE_TABLE_STATEMENT.md index dd411410e65..967ebce316a 100644 --- a/docs/en/reference/sql/ddl/SHOW_CREATE_TABLE_STATEMENT.md +++ b/docs/en/reference/sql/ddl/SHOW_CREATE_TABLE_STATEMENT.md @@ -21,7 +21,7 @@ show create table t1; `c3` bigInt, `c4` timestamp, INDEX (KEY=`c1`, TS=`c4`, TTL_TYPE=ABSOLUTE, TTL=0m) - ) OPTIONS (PARTITIONNUM=8, REPLICANUM=2, STORAGE_MODE='HDD'); + ) OPTIONS (PARTITIONNUM=8, REPLICANUM=2, STORAGE_MODE='HDD', COMPRESS_TYPE='NoCompress'); ------- --------------------------------------------------------------- 1 rows in set diff --git a/docs/zh/openmldb_sql/ddl/CREATE_TABLE_STATEMENT.md b/docs/zh/openmldb_sql/ddl/CREATE_TABLE_STATEMENT.md index 1dffc9d4cae..a44f699eed3 100644 --- a/docs/zh/openmldb_sql/ddl/CREATE_TABLE_STATEMENT.md +++ b/docs/zh/openmldb_sql/ddl/CREATE_TABLE_STATEMENT.md @@ -450,6 +450,11 @@ StorageMode ::= 'Memory' | 'HDD' | 'SSD' +CompressTypeOption + ::= 'COMPRESS_TYPE' '=' CompressType +CompressType + ::= 'NoCompress' + | 'Snappy' ``` @@ -460,6 +465,7 @@ StorageMode | `REPLICANUM` | 配置表的副本数。请注意,副本数只有在集群版中才可以配置。 | `OPTIONS (REPLICANUM=3)` | | `DISTRIBUTION` | 配置分布式的节点endpoint。一般包含一个Leader节点和若干Follower节点。`(leader, [follower1, follower2, ..])`。不显式配置时,OpenMLDB会自动根据环境和节点来配置`DISTRIBUTION`。 | `DISTRIBUTION = [ ('127.0.0.1:6527', [ '127.0.0.1:6528','127.0.0.1:6529' ])]` | | `STORAGE_MODE` | 表的存储模式,支持的模式有`Memory`、`HDD`或`SSD`。不显式配置时,默认为`Memory`。
如果需要支持非`Memory`模式的存储模式,`tablet`需要额外的配置选项,具体可参考[tablet配置文件 conf/tablet.flags](../../../deploy/conf.md)。 | `OPTIONS (STORAGE_MODE='HDD')` | +| `COMPRESS_TYPE` | 指定表的压缩类型。目前只支持Snappy压缩, 。默认为 `NoCompress` 即不压缩。 | `OPTIONS (COMPRESS_TYPE='Snappy')` #### 磁盘表与内存表区别 - 磁盘表对应`STORAGE_MODE`的取值为`HDD`或`SSD`。内存表对应的`STORAGE_MODE`取值为`Memory`。 @@ -488,11 +494,11 @@ DESC t1; --- -------------------- ------ ---------- ------ --------------- 1 INDEX_0_1651143735 col1 std_time 0min kAbsoluteTime --- -------------------- ------ ---------- ------ --------------- - -------------- - storage_mode - -------------- - HDD - -------------- + --------------- -------------- + compress_type storage_mode + --------------- -------------- + NoCompress HDD + --------------- -------------- ``` 创建一张表,指定分片的分布状态 ```sql diff --git a/docs/zh/openmldb_sql/ddl/DESC_STATEMENT.md b/docs/zh/openmldb_sql/ddl/DESC_STATEMENT.md index 1088411dc03..ca0d0de87bf 100644 --- a/docs/zh/openmldb_sql/ddl/DESC_STATEMENT.md +++ b/docs/zh/openmldb_sql/ddl/DESC_STATEMENT.md @@ -56,11 +56,11 @@ desc t1; --- -------------------- ------ ---------- ---------- --------------- 1 INDEX_0_1658136511 col1 std_time 43200min kAbsoluteTime --- -------------------- ------ ---------- ---------- --------------- - -------------- - storage_mode - -------------- - Memory - -------------- + --------------- -------------- + compress_type storage_mode + --------------- -------------- + NoCompress Memory + --------------- -------------- ``` diff --git a/docs/zh/openmldb_sql/ddl/SHOW_CREATE_TABLE_STATEMENT.md b/docs/zh/openmldb_sql/ddl/SHOW_CREATE_TABLE_STATEMENT.md index e697f687846..22c08fb754e 100644 --- a/docs/zh/openmldb_sql/ddl/SHOW_CREATE_TABLE_STATEMENT.md +++ b/docs/zh/openmldb_sql/ddl/SHOW_CREATE_TABLE_STATEMENT.md @@ -21,7 +21,7 @@ show create table t1; `c3` bigInt, `c4` timestamp, INDEX (KEY=`c1`, TS=`c4`, TTL_TYPE=ABSOLUTE, TTL=0m) - ) OPTIONS (PARTITIONNUM=8, REPLICANUM=2, STORAGE_MODE='HDD'); + ) OPTIONS (PARTITIONNUM=8, REPLICANUM=2, STORAGE_MODE='HDD', COMPRESS_TYPE='NoCompress'); ------- --------------------------------------------------------------- 1 rows in set From 6baa94057bc2b77254bfc67bbc7de50d9fce9713 Mon Sep 17 00:00:00 2001 From: dl239 Date: Mon, 30 Oct 2023 16:24:38 +0800 Subject: [PATCH 04/12] fix: fix test case --- src/cmd/sql_cmd_test.cc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/cmd/sql_cmd_test.cc b/src/cmd/sql_cmd_test.cc index 1896ac7c674..c5a78551a3d 100644 --- a/src/cmd/sql_cmd_test.cc +++ b/src/cmd/sql_cmd_test.cc @@ -461,11 +461,11 @@ TEST_P(DBSDKTest, Desc) { " --- ------- ----------- ------ --------- \n"; std::string expect_options = - " -------------- \n" - " storage_mode \n" - " -------------- \n" - " Memory \n" - " -------------- \n\n"; + " --------------- -------------- \n" + " compress_type storage_mode \n" + " --------------- -------------- \n" + " NoCompress Memory \n" + " --------------- -------------- \n\n"; // index name is dynamically assigned. do not check here std::vector expect = {expect_schema, "", expect_options}; From 539c6fc02104272e5f095b53a197057bc585de62 Mon Sep 17 00:00:00 2001 From: dl239 Date: Tue, 31 Oct 2023 14:56:51 +0800 Subject: [PATCH 05/12] fix: fix select --- src/cmd/sql_cmd_test.cc | 31 +++++++++++++ src/storage/disk_table.cc | 13 +++--- src/storage/disk_table_iterator.cc | 72 +++++++++++++++++++++--------- src/storage/disk_table_iterator.h | 27 +++++++---- src/storage/mem_table.cc | 10 +++-- src/storage/mem_table_iterator.cc | 34 ++++++++++---- src/storage/mem_table_iterator.h | 16 +++++-- src/tablet/tablet_impl.cc | 28 +++++++----- 8 files changed, 167 insertions(+), 64 deletions(-) diff --git a/src/cmd/sql_cmd_test.cc b/src/cmd/sql_cmd_test.cc index c5a78551a3d..f2fa7ae1f45 100644 --- a/src/cmd/sql_cmd_test.cc +++ b/src/cmd/sql_cmd_test.cc @@ -331,6 +331,37 @@ TEST_P(DBSDKTest, Select) { ASSERT_TRUE(status.IsOK()); } +TEST_P(DBSDKTest, SelectSnappy) { + auto cli = GetParam(); + cs = cli->cs; + sr = cli->sr; + hybridse::sdk::Status status; + if (cs->IsClusterMode()) { + sr->ExecuteSQL("SET @@execute_mode='online';", &status); + ASSERT_TRUE(status.IsOK()) << "error msg: " + status.msg; + } + std::string db = "db" + GenRand(); + sr->ExecuteSQL("create database " + db + ";", &status); + ASSERT_TRUE(status.IsOK()); + sr->ExecuteSQL("use " + db + ";", &status); + ASSERT_TRUE(status.IsOK()); + std::string create_sql = + "create table trans (c1 string, c2 bigint, c3 date," + "index(key=c1, ts=c2, abs_ttl=0, ttl_type=absolute)) options (compress_type='snappy');"; + sr->ExecuteSQL(create_sql, &status); + ASSERT_TRUE(status.IsOK()); + std::string insert_sql = "insert into trans values ('aaa', 1635247427000, \"2021-05-20\");"; + sr->ExecuteSQL(insert_sql, &status); + ASSERT_TRUE(status.IsOK()); + auto rs = sr->ExecuteSQL("select * from trans", &status); + ASSERT_TRUE(status.IsOK()); + ASSERT_EQ(1, rs->Size()); + sr->ExecuteSQL("drop table trans;", &status); + ASSERT_TRUE(status.IsOK()); + sr->ExecuteSQL("drop database " + db + ";", &status); + ASSERT_TRUE(status.IsOK()); +} + TEST_F(SqlCmdTest, SelectMultiPartition) { auto sr = cluster_cli.sr; std::string db_name = "test" + GenRand(); diff --git a/src/storage/disk_table.cc b/src/storage/disk_table.cc index 8f508bac6c5..beb1c2fc9e3 100644 --- a/src/storage/disk_table.cc +++ b/src/storage/disk_table.cc @@ -543,10 +543,10 @@ TableIterator* DiskTable::NewIterator(uint32_t idx, const std::string& pk, Ticke if (inner_index && inner_index->GetIndex().size() > 1) { auto ts_col = index_def->GetTsColumn(); if (ts_col) { - return new DiskTableIterator(db_, it, snapshot, pk, ts_col->GetId()); + return new DiskTableIterator(db_, it, snapshot, pk, ts_col->GetId(), GetCompressType()); } } - return new DiskTableIterator(db_, it, snapshot, pk); + return new DiskTableIterator(db_, it, snapshot, pk, GetCompressType()); } TraverseIterator* DiskTable::NewTraverseIterator(uint32_t index) { @@ -569,10 +569,10 @@ TraverseIterator* DiskTable::NewTraverseIterator(uint32_t index) { auto ts_col = index_def->GetTsColumn(); if (ts_col) { return new DiskTableTraverseIterator(db_, it, snapshot, ttl->ttl_type, expire_time, expire_cnt, - ts_col->GetId()); + ts_col->GetId(), GetCompressType()); } } - return new DiskTableTraverseIterator(db_, it, snapshot, ttl->ttl_type, expire_time, expire_cnt); + return new DiskTableTraverseIterator(db_, it, snapshot, ttl->ttl_type, expire_time, expire_cnt, GetCompressType()); } ::hybridse::vm::WindowIterator* DiskTable::NewWindowIterator(uint32_t idx) { @@ -595,10 +595,11 @@ ::hybridse::vm::WindowIterator* DiskTable::NewWindowIterator(uint32_t idx) { auto ts_col = index_def->GetTsColumn(); if (ts_col) { return new DiskTableKeyIterator(db_, it, snapshot, ttl->ttl_type, expire_time, expire_cnt, - ts_col->GetId(), cf_hs_[inner_pos + 1]); + ts_col->GetId(), cf_hs_[inner_pos + 1], GetCompressType()); } } - return new DiskTableKeyIterator(db_, it, snapshot, ttl->ttl_type, expire_time, expire_cnt, cf_hs_[inner_pos + 1]); + return new DiskTableKeyIterator(db_, it, snapshot, ttl->ttl_type, expire_time, expire_cnt, + cf_hs_[inner_pos + 1], GetCompressType()); } bool DiskTable::DeleteIndex(const std::string& idx_name) { diff --git a/src/storage/disk_table_iterator.cc b/src/storage/disk_table_iterator.cc index 7b78bec4f3e..d934715e880 100644 --- a/src/storage/disk_table_iterator.cc +++ b/src/storage/disk_table_iterator.cc @@ -15,7 +15,7 @@ */ #include "storage/disk_table_iterator.h" - +#include #include #include "gflags/gflags.h" #include "storage/key_transform.h" @@ -26,12 +26,12 @@ namespace openmldb { namespace storage { DiskTableIterator::DiskTableIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot, - const std::string& pk) - : db_(db), it_(it), snapshot_(snapshot), pk_(pk), ts_(0) {} + const std::string& pk, type::CompressType compress_type) + : db_(db), it_(it), snapshot_(snapshot), pk_(pk), ts_(0), compress_type_(compress_type) {} DiskTableIterator::DiskTableIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot, - const std::string& pk, uint32_t ts_idx) - : db_(db), it_(it), snapshot_(snapshot), pk_(pk), ts_(0), ts_idx_(ts_idx) { + const std::string& pk, uint32_t ts_idx, type::CompressType compress_type) + : db_(db), it_(it), snapshot_(snapshot), pk_(pk), ts_(0), ts_idx_(ts_idx), compress_type_(compress_type) { has_ts_idx_ = true; } @@ -55,7 +55,13 @@ void DiskTableIterator::Next() { return it_->Next(); } openmldb::base::Slice DiskTableIterator::GetValue() const { rocksdb::Slice value = it_->value(); - return openmldb::base::Slice(value.data(), value.size()); + if (compress_type_ == type::CompressType::kSnappy) { + tmp_buf_.clear(); + snappy::Uncompress(value.data(), value.size(), &tmp_buf_); + return openmldb::base::Slice(tmp_buf_); + } else { + return openmldb::base::Slice(value.data(), value.size()); + } } std::string DiskTableIterator::GetPK() const { return pk_; } @@ -85,7 +91,8 @@ void DiskTableIterator::Seek(const uint64_t ts) { DiskTableTraverseIterator::DiskTableTraverseIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot, ::openmldb::storage::TTLType ttl_type, const uint64_t& expire_time, - const uint64_t& expire_cnt) + const uint64_t& expire_cnt, + type::CompressType compress_type) : db_(db), it_(it), snapshot_(snapshot), @@ -93,12 +100,14 @@ DiskTableTraverseIterator::DiskTableTraverseIterator(rocksdb::DB* db, rocksdb::I expire_value_(expire_time, expire_cnt, ttl_type), has_ts_idx_(false), ts_idx_(0), - traverse_cnt_(0) {} + traverse_cnt_(0), + compress_type_(compress_type) {} DiskTableTraverseIterator::DiskTableTraverseIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot, ::openmldb::storage::TTLType ttl_type, const uint64_t& expire_time, - const uint64_t& expire_cnt, int32_t ts_idx) + const uint64_t& expire_cnt, int32_t ts_idx, + type::CompressType compress_type) : db_(db), it_(it), snapshot_(snapshot), @@ -106,7 +115,8 @@ DiskTableTraverseIterator::DiskTableTraverseIterator(rocksdb::DB* db, rocksdb::I expire_value_(expire_time, expire_cnt, ttl_type), has_ts_idx_(true), ts_idx_(ts_idx), - traverse_cnt_(0) {} + traverse_cnt_(0), + compress_type_(compress_type) {} DiskTableTraverseIterator::~DiskTableTraverseIterator() { delete it_; @@ -154,6 +164,11 @@ void DiskTableTraverseIterator::Next() { openmldb::base::Slice DiskTableTraverseIterator::GetValue() const { rocksdb::Slice value = it_->value(); + if (compress_type_ == type::CompressType::kSnappy) { + tmp_buf_.clear(); + snappy::Uncompress(value.data(), value.size(), &tmp_buf_); + return openmldb::base::Slice(tmp_buf_); + } return openmldb::base::Slice(value.data(), value.size()); } @@ -297,7 +312,8 @@ void DiskTableTraverseIterator::NextPK() { DiskTableKeyIterator::DiskTableKeyIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot, ::openmldb::storage::TTLType ttl_type, const uint64_t& expire_time, const uint64_t& expire_cnt, - rocksdb::ColumnFamilyHandle* column_handle) + rocksdb::ColumnFamilyHandle* column_handle, + type::CompressType compress_type) : db_(db), it_(it), snapshot_(snapshot), @@ -306,12 +322,14 @@ DiskTableKeyIterator::DiskTableKeyIterator(rocksdb::DB* db, rocksdb::Iterator* i expire_cnt_(expire_cnt), has_ts_idx_(false), ts_idx_(0), - column_handle_(column_handle) {} + column_handle_(column_handle), + compress_type_(compress_type) {} DiskTableKeyIterator::DiskTableKeyIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot, ::openmldb::storage::TTLType ttl_type, const uint64_t& expire_time, const uint64_t& expire_cnt, int32_t ts_idx, - rocksdb::ColumnFamilyHandle* column_handle) + rocksdb::ColumnFamilyHandle* column_handle, + type::CompressType compress_type) : db_(db), it_(it), snapshot_(snapshot), @@ -320,7 +338,8 @@ DiskTableKeyIterator::DiskTableKeyIterator(rocksdb::DB* db, rocksdb::Iterator* i expire_cnt_(expire_cnt), has_ts_idx_(true), ts_idx_(ts_idx), - column_handle_(column_handle) {} + column_handle_(column_handle), + compress_type_(compress_type) {} DiskTableKeyIterator::~DiskTableKeyIterator() { delete it_; @@ -398,7 +417,7 @@ std::unique_ptr<::hybridse::vm::RowIterator> DiskTableKeyIterator::GetValue() { ro.pin_data = true; rocksdb::Iterator* it = db_->NewIterator(ro, column_handle_); return std::make_unique(db_, it, snapshot, ttl_type_, expire_time_, - expire_cnt_, pk_, ts_, has_ts_idx_, ts_idx_); + expire_cnt_, pk_, ts_, has_ts_idx_, ts_idx_, compress_type_); } ::hybridse::vm::RowIterator* DiskTableKeyIterator::GetRawValue() { @@ -408,14 +427,14 @@ ::hybridse::vm::RowIterator* DiskTableKeyIterator::GetRawValue() { // ro.prefix_same_as_start = true; ro.pin_data = true; rocksdb::Iterator* it = db_->NewIterator(ro, column_handle_); - return new DiskTableRowIterator(db_, it, snapshot, ttl_type_, expire_time_, expire_cnt_, pk_, ts_, has_ts_idx_, - ts_idx_); + return new DiskTableRowIterator(db_, it, snapshot, ttl_type_, expire_time_, + expire_cnt_, pk_, ts_, has_ts_idx_, ts_idx_, compress_type_); } DiskTableRowIterator::DiskTableRowIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot, ::openmldb::storage::TTLType ttl_type, uint64_t expire_time, uint64_t expire_cnt, std::string pk, uint64_t ts, bool has_ts_idx, - uint32_t ts_idx) + uint32_t ts_idx, type::CompressType compress_type) : db_(db), it_(it), snapshot_(snapshot), @@ -426,7 +445,8 @@ DiskTableRowIterator::DiskTableRowIterator(rocksdb::DB* db, rocksdb::Iterator* i ts_(ts), has_ts_idx_(has_ts_idx), ts_idx_(ts_idx), - row_() {} + row_(), + compress_type_(compress_type) {} DiskTableRowIterator::~DiskTableRowIterator() { delete it_; @@ -470,9 +490,17 @@ const ::hybridse::codec::Row& DiskTableRowIterator::GetValue() { } valid_value_ = true; size_t size = it_->value().size(); - int8_t* copyed_row_data = reinterpret_cast(malloc(size)); - memcpy(copyed_row_data, it_->value().data(), size); - row_.Reset(::hybridse::base::RefCountedSlice::CreateManaged(copyed_row_data, size)); + if (compress_type_ == type::CompressType::kSnappy) { + tmp_buf_.clear(); + snappy::Uncompress(it_->value().data(), size, &tmp_buf_); + int8_t* copyed_row_data = reinterpret_cast(malloc(tmp_buf_.size())); + memcpy(copyed_row_data, tmp_buf_.data(), tmp_buf_.size()); + row_.Reset(::hybridse::base::RefCountedSlice::CreateManaged(copyed_row_data, tmp_buf_.size())); + } else { + int8_t* copyed_row_data = reinterpret_cast(malloc(size)); + memcpy(copyed_row_data, it_->value().data(), size); + row_.Reset(::hybridse::base::RefCountedSlice::CreateManaged(copyed_row_data, size)); + } return row_; } diff --git a/src/storage/disk_table_iterator.h b/src/storage/disk_table_iterator.h index 88f7225c5a9..df9b98fca9c 100644 --- a/src/storage/disk_table_iterator.h +++ b/src/storage/disk_table_iterator.h @@ -29,9 +29,10 @@ namespace storage { class DiskTableIterator : public TableIterator { public: - DiskTableIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot, const std::string& pk); - DiskTableIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot, const std::string& pk, - uint32_t ts_idx); + DiskTableIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot, + const std::string& pk, type::CompressType compress_type); + DiskTableIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot, + const std::string& pk, uint32_t ts_idx, type::CompressType compress_type); virtual ~DiskTableIterator(); bool Valid() override; void Next() override; @@ -49,16 +50,18 @@ class DiskTableIterator : public TableIterator { uint64_t ts_; uint32_t ts_idx_; bool has_ts_idx_ = false; + type::CompressType compress_type_; + mutable std::string tmp_buf_; }; class DiskTableTraverseIterator : public TraverseIterator { public: DiskTableTraverseIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot, ::openmldb::storage::TTLType ttl_type, const uint64_t& expire_time, - const uint64_t& expire_cnt); + const uint64_t& expire_cnt, type::CompressType compress_type); DiskTableTraverseIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot, ::openmldb::storage::TTLType ttl_type, const uint64_t& expire_time, - const uint64_t& expire_cnt, int32_t ts_idx); + const uint64_t& expire_cnt, int32_t ts_idx, type::CompressType compress_type); virtual ~DiskTableTraverseIterator(); bool Valid() override; void Next() override; @@ -84,13 +87,16 @@ class DiskTableTraverseIterator : public TraverseIterator { bool has_ts_idx_; uint32_t ts_idx_; uint64_t traverse_cnt_; + type::CompressType compress_type_; + mutable std::string tmp_buf_; }; class DiskTableRowIterator : public ::hybridse::vm::RowIterator { public: DiskTableRowIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot, ::openmldb::storage::TTLType ttl_type, uint64_t expire_time, uint64_t expire_cnt, - std::string pk, uint64_t ts, bool has_ts_idx, uint32_t ts_idx); + std::string pk, uint64_t ts, bool has_ts_idx, uint32_t ts_idx, + type::CompressType compress_type); ~DiskTableRowIterator(); @@ -129,17 +135,21 @@ class DiskTableRowIterator : public ::hybridse::vm::RowIterator { ::hybridse::codec::Row row_; bool pk_valid_; bool valid_value_ = false; + type::CompressType compress_type_; + std::string tmp_buf_; }; class DiskTableKeyIterator : public ::hybridse::vm::WindowIterator { public: DiskTableKeyIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot, ::openmldb::storage::TTLType ttl_type, const uint64_t& expire_time, const uint64_t& expire_cnt, - int32_t ts_idx, rocksdb::ColumnFamilyHandle* column_handle); + int32_t ts_idx, rocksdb::ColumnFamilyHandle* column_handle, + type::CompressType compress_type); DiskTableKeyIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot, ::openmldb::storage::TTLType ttl_type, const uint64_t& expire_time, const uint64_t& expire_cnt, - rocksdb::ColumnFamilyHandle* column_handle); + rocksdb::ColumnFamilyHandle* column_handle, + type::CompressType compress_type); ~DiskTableKeyIterator() override; @@ -171,6 +181,7 @@ class DiskTableKeyIterator : public ::hybridse::vm::WindowIterator { uint64_t ts_; uint32_t ts_idx_; rocksdb::ColumnFamilyHandle* column_handle_; + type::CompressType compress_type_; }; } // namespace storage diff --git a/src/storage/mem_table.cc b/src/storage/mem_table.cc index 8cbb145e323..02290f709f5 100644 --- a/src/storage/mem_table.cc +++ b/src/storage/mem_table.cc @@ -749,7 +749,8 @@ ::hybridse::vm::WindowIterator* MemTable::NewWindowIterator(uint32_t index) { if (ts_col) { ts_idx = ts_col->GetId(); } - return new MemTableKeyIterator(segments_[real_idx], seg_cnt_, ttl->ttl_type, expire_time, expire_cnt, ts_idx); + return new MemTableKeyIterator(segments_[real_idx], seg_cnt_, ttl->ttl_type, + expire_time, expire_cnt, ts_idx, GetCompressType()); } TraverseIterator* MemTable::NewTraverseIterator(uint32_t index) { @@ -768,10 +769,11 @@ TraverseIterator* MemTable::NewTraverseIterator(uint32_t index) { uint32_t real_idx = index_def->GetInnerPos(); auto ts_col = index_def->GetTsColumn(); if (ts_col) { - return new MemTableTraverseIterator(segments_[real_idx], seg_cnt_, ttl->ttl_type, expire_time, expire_cnt, - ts_col->GetId()); + return new MemTableTraverseIterator(segments_[real_idx], seg_cnt_, ttl->ttl_type, + expire_time, expire_cnt, ts_col->GetId(), GetCompressType()); } - return new MemTableTraverseIterator(segments_[real_idx], seg_cnt_, ttl->ttl_type, expire_time, expire_cnt, 0); + return new MemTableTraverseIterator(segments_[real_idx], seg_cnt_, ttl->ttl_type, + expire_time, expire_cnt, 0, GetCompressType()); } bool MemTable::GetBulkLoadInfo(::openmldb::api::BulkLoadInfoResponse* response) { diff --git a/src/storage/mem_table_iterator.cc b/src/storage/mem_table_iterator.cc index 8b0f074427a..22cd7964640 100644 --- a/src/storage/mem_table_iterator.cc +++ b/src/storage/mem_table_iterator.cc @@ -15,7 +15,7 @@ */ #include "storage/mem_table_iterator.h" - +#include #include #include "base/hash.h" #include "gflags/gflags.h" @@ -48,7 +48,13 @@ const uint64_t& MemTableWindowIterator::GetKey() const { } const ::hybridse::codec::Row& MemTableWindowIterator::GetValue() { - row_.Reset(reinterpret_cast(it_->GetValue()->data), it_->GetValue()->size); + if (compress_type_ == type::CompressType::kSnappy) { + tmp_buf_.clear(); + snappy::Uncompress(it_->GetValue()->data, it_->GetValue()->size, &tmp_buf_); + row_.Reset(reinterpret_cast(tmp_buf_.data()), tmp_buf_.size()); + } else { + row_.Reset(reinterpret_cast(it_->GetValue()->data), it_->GetValue()->size); + } return row_; } @@ -69,7 +75,8 @@ void MemTableWindowIterator::SeekToFirst() { } MemTableKeyIterator::MemTableKeyIterator(Segment** segments, uint32_t seg_cnt, ::openmldb::storage::TTLType ttl_type, - uint64_t expire_time, uint64_t expire_cnt, uint32_t ts_index) + uint64_t expire_time, uint64_t expire_cnt, uint32_t ts_index, + type::CompressType compress_type) : segments_(segments), seg_cnt_(seg_cnt), seg_idx_(0), @@ -79,7 +86,8 @@ MemTableKeyIterator::MemTableKeyIterator(Segment** segments, uint32_t seg_cnt, : expire_time_(expire_time), expire_cnt_(expire_cnt), ticket_(), - ts_idx_(0) { + ts_idx_(0), + compress_type_(compress_type) { uint32_t idx = 0; if (segments_[0]->GetTsIdx(ts_index, idx) == 0) { ts_idx_ = idx; @@ -142,7 +150,7 @@ ::hybridse::vm::RowIterator* MemTableKeyIterator::GetRawValue() { ticket_.Push((KeyEntry*)pk_it_->GetValue()); // NOLINT } it->SeekToFirst(); - return new MemTableWindowIterator(it, ttl_type_, expire_time_, expire_cnt_); + return new MemTableWindowIterator(it, ttl_type_, expire_time_, expire_cnt_, compress_type_); } std::unique_ptr<::hybridse::vm::RowIterator> MemTableKeyIterator::GetValue() { @@ -177,8 +185,9 @@ void MemTableKeyIterator::NextPK() { } MemTableTraverseIterator::MemTableTraverseIterator(Segment** segments, uint32_t seg_cnt, - ::openmldb::storage::TTLType ttl_type, uint64_t expire_time, - uint64_t expire_cnt, uint32_t ts_index) + ::openmldb::storage::TTLType ttl_type, uint64_t expire_time, + uint64_t expire_cnt, uint32_t ts_index, + type::CompressType compress_type) : segments_(segments), seg_cnt_(seg_cnt), seg_idx_(0), @@ -188,7 +197,8 @@ MemTableTraverseIterator::MemTableTraverseIterator(Segment** segments, uint32_t ts_idx_(0), expire_value_(expire_time, expire_cnt, ttl_type), ticket_(), - traverse_cnt_(0) { + traverse_cnt_(0), + compress_type_(compress_type) { uint32_t idx = 0; if (segments_[0]->GetTsIdx(ts_index, idx) == 0) { ts_idx_ = idx; @@ -320,7 +330,13 @@ void MemTableTraverseIterator::Seek(const std::string& key, uint64_t ts) { } openmldb::base::Slice MemTableTraverseIterator::GetValue() const { - return openmldb::base::Slice(it_->GetValue()->data, it_->GetValue()->size); + if (compress_type_ == type::CompressType::kSnappy) { + tmp_buf_.clear(); + snappy::Uncompress(it_->GetValue()->data, it_->GetValue()->size, &tmp_buf_); + return openmldb::base::Slice(tmp_buf_); + } else { + return openmldb::base::Slice(it_->GetValue()->data, it_->GetValue()->size); + } } uint64_t MemTableTraverseIterator::GetKey() const { diff --git a/src/storage/mem_table_iterator.h b/src/storage/mem_table_iterator.h index 967345fc2a9..5e5ba461181 100644 --- a/src/storage/mem_table_iterator.h +++ b/src/storage/mem_table_iterator.h @@ -27,8 +27,9 @@ namespace storage { class MemTableWindowIterator : public ::hybridse::vm::RowIterator { public: MemTableWindowIterator(TimeEntries::Iterator* it, ::openmldb::storage::TTLType ttl_type, uint64_t expire_time, - uint64_t expire_cnt) - : it_(it), record_idx_(1), expire_value_(expire_time, expire_cnt, ttl_type), row_() {} + uint64_t expire_cnt, type::CompressType compress_type) + : it_(it), record_idx_(1), expire_value_(expire_time, expire_cnt, ttl_type), + row_(), compress_type_(compress_type) {} ~MemTableWindowIterator(); @@ -51,12 +52,15 @@ class MemTableWindowIterator : public ::hybridse::vm::RowIterator { uint32_t record_idx_; TTLSt expire_value_; ::hybridse::codec::Row row_; + type::CompressType compress_type_; + std::string tmp_buf_; }; class MemTableKeyIterator : public ::hybridse::vm::WindowIterator { public: MemTableKeyIterator(Segment** segments, uint32_t seg_cnt, ::openmldb::storage::TTLType ttl_type, - uint64_t expire_time, uint64_t expire_cnt, uint32_t ts_index); + uint64_t expire_time, uint64_t expire_cnt, uint32_t ts_index, + type::CompressType compress_type); ~MemTableKeyIterator() override; @@ -87,12 +91,14 @@ class MemTableKeyIterator : public ::hybridse::vm::WindowIterator { uint64_t expire_cnt_; Ticket ticket_; uint32_t ts_idx_; + type::CompressType compress_type_; }; class MemTableTraverseIterator : public TraverseIterator { public: MemTableTraverseIterator(Segment** segments, uint32_t seg_cnt, ::openmldb::storage::TTLType ttl_type, - uint64_t expire_time, uint64_t expire_cnt, uint32_t ts_index); + uint64_t expire_time, uint64_t expire_cnt, uint32_t ts_index, + type::CompressType compress_type); ~MemTableTraverseIterator() override; inline bool Valid() override; void Next() override; @@ -115,6 +121,8 @@ class MemTableTraverseIterator : public TraverseIterator { TTLSt expire_value_; Ticket ticket_; uint64_t traverse_cnt_; + type::CompressType compress_type_; + mutable std::string tmp_buf_; }; } // namespace storage diff --git a/src/tablet/tablet_impl.cc b/src/tablet/tablet_impl.cc index f30f1f8b74b..e6924b64ce9 100644 --- a/src/tablet/tablet_impl.cc +++ b/src/tablet/tablet_impl.cc @@ -724,6 +724,22 @@ void TabletImpl::Put(RpcController* controller, const ::openmldb::api::PutReques response->set_msg("exceed max memory"); return; } + ::openmldb::api::LogEntry entry; + entry.set_pk(request->pk()); + entry.set_ts(request->time()); + if (table->GetCompressType() == openmldb::type::CompressType::kSnappy) { + const auto& raw_val = request->value(); + std::string* val = entry.mutable_value(); + ::snappy::Compress(raw_val.c_str(), raw_val.length(), val); + } else { + entry.set_value(request->value()); + } + if (request->dimensions_size() > 0) { + entry.mutable_dimensions()->CopyFrom(request->dimensions()); + } + if (request->ts_dimensions_size() > 0) { + entry.mutable_ts_dimensions()->CopyFrom(request->ts_dimensions()); + } bool ok = false; if (request->dimensions_size() > 0) { int32_t ret_code = CheckDimessionPut(request, table->GetIdxCnt()); @@ -733,7 +749,7 @@ void TabletImpl::Put(RpcController* controller, const ::openmldb::api::PutReques return; } DLOG(INFO) << "put data to tid " << tid << " pid " << pid << " with key " << request->dimensions(0).key(); - ok = table->Put(request->time(), request->value(), request->dimensions()); + ok = table->Put(entry.ts(), entry.value(), entry.dimensions()); } if (!ok) { response->set_code(::openmldb::base::ReturnCode::kPutFailed); @@ -743,23 +759,13 @@ void TabletImpl::Put(RpcController* controller, const ::openmldb::api::PutReques response->set_code(::openmldb::base::ReturnCode::kOk); std::shared_ptr replicator; - ::openmldb::api::LogEntry entry; do { replicator = GetReplicator(request->tid(), request->pid()); if (!replicator) { PDLOG(WARNING, "fail to find table tid %u pid %u leader's log replicator", tid, pid); break; } - entry.set_pk(request->pk()); - entry.set_ts(request->time()); - entry.set_value(request->value()); entry.set_term(replicator->GetLeaderTerm()); - if (request->dimensions_size() > 0) { - entry.mutable_dimensions()->CopyFrom(request->dimensions()); - } - if (request->ts_dimensions_size() > 0) { - entry.mutable_ts_dimensions()->CopyFrom(request->ts_dimensions()); - } // Aggregator update assumes that binlog_offset is strictly increasing // so the update should be protected within the replicator lock From 3e3f0d0fb149bfab0361e27ce7c71a87bc756d7b Mon Sep 17 00:00:00 2001 From: dl239 Date: Tue, 31 Oct 2023 16:34:00 +0800 Subject: [PATCH 06/12] fix: fix test case --- src/tablet/tablet_impl_test.cc | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/src/tablet/tablet_impl_test.cc b/src/tablet/tablet_impl_test.cc index da5cc626bf0..ca6baa32178 100644 --- a/src/tablet/tablet_impl_test.cc +++ b/src/tablet/tablet_impl_test.cc @@ -128,17 +128,12 @@ bool RollWLogFile(::openmldb::storage::WriteHandle** wh, ::openmldb::storage::Lo return true; } -void PrepareLatestTableData(TabletImpl& tablet, int32_t tid, int32_t pid, bool compress = false) { // NOLINT +void PrepareLatestTableData(TabletImpl& tablet, int32_t tid, int32_t pid) { // NOLINT for (int32_t i = 0; i < 100; i++) { ::openmldb::api::PutRequest prequest; ::openmldb::test::SetDimension(0, std::to_string(i % 10), prequest.add_dimensions()); prequest.set_time(i + 1); std::string value = ::openmldb::test::EncodeKV(std::to_string(i % 10), std::to_string(i)); - if (compress) { - std::string compressed; - ::snappy::Compress(value.c_str(), value.length(), &compressed); - value.swap(compressed); - } prequest.set_value(value); prequest.set_tid(tid); prequest.set_pid(pid); @@ -153,11 +148,6 @@ void PrepareLatestTableData(TabletImpl& tablet, int32_t tid, int32_t pid, bool c ::openmldb::test::SetDimension(0, "10", prequest.add_dimensions()); prequest.set_time(i % 10 + 1); std::string value = ::openmldb::test::EncodeKV("10", std::to_string(i)); - if (compress) { - std::string compressed; - ::snappy::Compress(value.c_str(), value.length(), &compressed); - value.swap(compressed); - } prequest.set_value(value); prequest.set_tid(tid); prequest.set_pid(pid); @@ -5314,7 +5304,7 @@ TEST_P(TabletImplTest, PutCompress) { MockClosure closure; tablet.CreateTable(NULL, &request, &response, &closure); ASSERT_EQ(0, response.code()); - PrepareLatestTableData(tablet, id, 0, true); + PrepareLatestTableData(tablet, id, 0); } { From 3ed80f9031484fe0702449db0170fb89cdc8e805 Mon Sep 17 00:00:00 2001 From: dl239 Date: Tue, 31 Oct 2023 20:33:57 +0800 Subject: [PATCH 07/12] test: add case --- .../jdbc/RequestPreparedStatementTest.java | 39 +++++++++++++------ 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/RequestPreparedStatementTest.java b/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/RequestPreparedStatementTest.java index dc520b74221..2df84d78ae7 100644 --- a/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/RequestPreparedStatementTest.java +++ b/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/RequestPreparedStatementTest.java @@ -23,6 +23,7 @@ import java.sql.*; import org.testng.Assert; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import java.sql.PreparedStatement; @@ -49,20 +50,30 @@ public class RequestPreparedStatementTest { } } - @Test - public void testRequest() { + @DataProvider(name = "createOption") + Object[][] getCreateParm() { + return new Object[][] { {"NoCompress", "Memory"}, + {"NoCompress", "HDD"}, + {"Snappy", "Memory"}, + {"Snappy", "HDD"} }; + } + + @Test(dataProvider = "createOption") + public void testRequest(String compressType, String storageMode) { String dbname = "db" + random.nextInt(100000); executor.dropDB(dbname); boolean ok = executor.createDB(dbname); Assert.assertTrue(ok); - String createTableSql = "create table trans(c1 string,\n" + + String baseSql = "create table trans(c1 string,\n" + " c3 int,\n" + " c4 bigint,\n" + " c5 float,\n" + " c6 double,\n" + " c7 timestamp,\n" + " c8 date,\n" + - " index(key=c1, ts=c7));"; + " index(key=c1, ts=c7))\n "; + String createTableSql = String.format("%s OPTIONS (compress_type='%s', storage_mode='%s');", + baseSql, compressType, storageMode); executor.executeDDL(dbname, createTableSql); String insertSql = "insert into trans values(\"aa\",23,33,1.4,2.4,1590738993000,\"2020-05-04\");"; PreparedStatement pstmt = null; @@ -127,8 +138,8 @@ public void testRequest() { } } - @Test - public void testDeploymentRequest() { + @Test(dataProvider = "createOption") + public void testDeploymentRequest(String compressType, String storageMode) { java.sql.Statement state = executor.getStatement(); String dbname = "db" + random.nextInt(100000); String deploymentName = "dp_test1"; @@ -136,14 +147,16 @@ public void testDeploymentRequest() { state.execute("drop database if exists " + dbname + ";"); state.execute("create database " + dbname + ";"); state.execute("use " + dbname + ";"); - String createTableSql = "create table trans(c1 string,\n" + + String baseSql = "create table trans(c1 string,\n" + " c3 int,\n" + " c4 bigint,\n" + " c5 float,\n" + " c6 double,\n" + " c7 timestamp,\n" + " c8 date,\n" + - " index(key=c1, ts=c7));"; + " index(key=c1, ts=c7))"; + String createTableSql = String.format(" %s OPTIONS (compress_type='%s', storage_mode='%s');", + baseSql, compressType, storageMode); state.execute(createTableSql); String selectSql = "SELECT c1, c3, sum(c4) OVER w1 as w1_c4_sum FROM trans WINDOW w1 AS " + "(PARTITION BY trans.c1 ORDER BY trans.c7 ROWS BETWEEN 2 PRECEDING AND CURRENT ROW);"; @@ -217,20 +230,22 @@ public void testDeploymentRequest() { } } - @Test - public void testBatchRequest() { + @Test(dataProvider = "createOption") + public void testBatchRequest(String compressType, String storageMode) { String dbname = "db" + random.nextInt(100000); executor.dropDB(dbname); boolean ok = executor.createDB(dbname); Assert.assertTrue(ok); - String createTableSql = "create table trans(c1 string,\n" + + String baseSql = "create table trans(c1 string,\n" + " c3 int,\n" + " c4 bigint,\n" + " c5 float,\n" + " c6 double,\n" + " c7 timestamp,\n" + " c8 date,\n" + - " index(key=c1, ts=c7));"; + " index(key=c1, ts=c7))"; + String createTableSql = String.format(" %s OPTIONS (compress_type='%s', storage_mode='%s');", + baseSql, compressType, storageMode); executor.executeDDL(dbname, createTableSql); String insertSql = "insert into trans values(\"aa\",23,33,1.4,2.4,1590738993000,\"2020-05-04\");"; PreparedStatement pstmt = null; From 9f8a91598d7f29be5317ed266431727de1235b6a Mon Sep 17 00:00:00 2001 From: dl239 Date: Wed, 1 Nov 2023 11:00:13 +0800 Subject: [PATCH 08/12] fix: fix isExpire --- src/storage/mem_table.cc | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/storage/mem_table.cc b/src/storage/mem_table.cc index 02290f709f5..7c7005272dd 100644 --- a/src/storage/mem_table.cc +++ b/src/storage/mem_table.cc @@ -433,6 +433,11 @@ bool MemTable::IsExpire(const LogEntry& entry) { } } const int8_t* data = reinterpret_cast(entry.value().data()); + std::string uncompress_data; + if (GetCompressType() == openmldb::type::kSnappy) { + snappy::Uncompress(entry.value().data(), entry.value().size(), &uncompress_data); + data = reinterpret_cast(uncompress_data.data()); + } uint8_t version = codec::RowView::GetSchemaVersion(data); auto decoder = GetVersionDecoder(version); if (decoder == nullptr) { From bafddd52f3151189db41999955ba6c42a05a0adf Mon Sep 17 00:00:00 2001 From: dl239 Date: Wed, 1 Nov 2023 16:27:05 +0800 Subject: [PATCH 09/12] fix: fix select --- src/catalog/distribute_iterator.cc | 23 +++++++++---------- src/catalog/tablet_catalog.cc | 2 +- src/cmd/openmldb.cc | 31 ++----------------------- src/cmd/sql_cmd_test.cc | 16 +++++++++---- src/codec/row_codec.cc | 14 ++++++++++++ src/codec/row_codec.h | 2 ++ src/tablet/tablet_impl.cc | 36 ++++-------------------------- 7 files changed, 46 insertions(+), 78 deletions(-) diff --git a/src/catalog/distribute_iterator.cc b/src/catalog/distribute_iterator.cc index e99431728d5..b82afbb81fd 100644 --- a/src/catalog/distribute_iterator.cc +++ b/src/catalog/distribute_iterator.cc @@ -175,20 +175,19 @@ const ::hybridse::codec::Row& FullTableIterator::GetValue() { } valid_value_ = true; + base::Slice slice_row; if (it_ && it_->Valid()) { - value_ = ::hybridse::codec::Row( - ::hybridse::base::RefCountedSlice::Create(it_->GetValue().data(), it_->GetValue().size())); - return value_; + slice_row = it_->GetValue(); } else { - auto slice_row = kv_it_->GetValue(); - size_t sz = slice_row.size(); - int8_t* copyed_row_data = reinterpret_cast(malloc(sz)); - memcpy(copyed_row_data, slice_row.data(), sz); - auto shared_slice = ::hybridse::base::RefCountedSlice::CreateManaged(copyed_row_data, sz); - buffered_slices_.push_back(shared_slice); - value_.Reset(shared_slice); - return value_; + slice_row = kv_it_->GetValue(); } + size_t sz = slice_row.size(); + int8_t* copyed_row_data = reinterpret_cast(malloc(sz)); + memcpy(copyed_row_data, slice_row.data(), sz); + auto shared_slice = ::hybridse::base::RefCountedSlice::CreateManaged(copyed_row_data, sz); + buffered_slices_.push_back(shared_slice); + value_.Reset(shared_slice); + return value_; } DistributeWindowIterator::DistributeWindowIterator(uint32_t tid, uint32_t pid_num, std::shared_ptr tables, @@ -424,7 +423,7 @@ const ::hybridse::codec::Row& RemoteWindowIterator::GetValue() { memcpy(copyed_row_data, slice_row.data(), sz); auto shared_slice = ::hybridse::base::RefCountedSlice::CreateManaged(copyed_row_data, sz); row_.Reset(shared_slice); - DLOG(INFO) << "get value pk " << pk_ << " ts_key " << kv_it_->GetKey() << " ts " << ts_; + LOG(INFO) << "get value pk " << pk_ << " ts_key " << kv_it_->GetKey() << " ts " << ts_; valid_value_ = true; return row_; } diff --git a/src/catalog/tablet_catalog.cc b/src/catalog/tablet_catalog.cc index a9e74ff7061..cdf979167fc 100644 --- a/src/catalog/tablet_catalog.cc +++ b/src/catalog/tablet_catalog.cc @@ -503,7 +503,7 @@ bool TabletCatalog::UpdateTableInfo(const ::openmldb::nameserver::TableInfo& tab return false; } db_it->second.emplace(table_name, handler); - LOG(INFO) << "add table " << table_name << "to db " << db_name << " tid " << table_info.tid(); + LOG(INFO) << "add table " << table_name << " to db " << db_name << " tid " << table_info.tid(); } if (bool updated = false; !handler->Update(table_info, client_manager_, &updated)) { return false; diff --git a/src/cmd/openmldb.cc b/src/cmd/openmldb.cc index 3cf22b2df6d..ccfda4ff5ed 100644 --- a/src/cmd/openmldb.cc +++ b/src/cmd/openmldb.cc @@ -18,7 +18,6 @@ #include #include #include -#include #include #include @@ -341,11 +340,6 @@ ::openmldb::base::Status PutSchemaData(const ::openmldb::nameserver::TableInfo& return ::openmldb::base::Status(-1, "Encode data error"); } - if (table_info.compress_type() == ::openmldb::type::CompressType::kSnappy) { - std::string compressed; - ::snappy::Compress(value.c_str(), value.length(), &compressed); - value = compressed; - } const int tid = table_info.tid(); PutData(tid, dimensions, ts, value, table_info.table_partition()); @@ -1396,11 +1390,6 @@ void HandleNSGet(const std::vector& parts, ::openmldb::client::NsCl std::string msg; bool ok = tb_client->Get(tid, pid, key, timestamp, value, ts, msg); if (ok) { - if (tables[0].compress_type() == ::openmldb::type::CompressType::kSnappy) { - std::string uncompressed; - ::snappy::Uncompress(value.c_str(), value.length(), &uncompressed); - value = uncompressed; - } std::cout << "value :" << value << std::endl; } else { std::cout << "Get failed. error msg: " << msg << std::endl; @@ -1447,11 +1436,6 @@ void HandleNSGet(const std::vector& parts, ::openmldb::client::NsCl return; } } - if (tables[0].compress_type() == ::openmldb::type::CompressType::kSnappy) { - std::string uncompressed; - ::snappy::Uncompress(value.c_str(), value.length(), &uncompressed); - value.swap(uncompressed); - } row.clear(); codec.DecodeRow(value, &row); ::openmldb::cmd::TransferString(&row); @@ -1848,25 +1832,14 @@ void HandleNSPreview(const std::vector& parts, ::openmldb::client:: row.push_back(std::to_string(index)); if (no_schema) { - std::string value = it->GetValue().ToString(); - if (tables[0].compress_type() == ::openmldb::type::CompressType::kSnappy) { - std::string uncompressed; - ::snappy::Uncompress(value.c_str(), value.length(), &uncompressed); - value = uncompressed; - } row.push_back(it->GetPK()); row.push_back(std::to_string(it->GetKey())); - row.push_back(value); + row.push_back(it->GetValue().ToString()); } else { if (!has_ts_col) { row.push_back(std::to_string(it->GetKey())); } - std::string value; - if (tables[0].compress_type() == ::openmldb::type::CompressType::kSnappy) { - ::snappy::Uncompress(it->GetValue().data(), it->GetValue().size(), &value); - } else { - value.assign(it->GetValue().data(), it->GetValue().size()); - } + std::string value(it->GetValue().data(), it->GetValue().size()); codec.DecodeRow(value, &row); ::openmldb::cmd::TransferString(&row); uint64_t row_size = row.size(); diff --git a/src/cmd/sql_cmd_test.cc b/src/cmd/sql_cmd_test.cc index f2fa7ae1f45..8f17d276be6 100644 --- a/src/cmd/sql_cmd_test.cc +++ b/src/cmd/sql_cmd_test.cc @@ -350,12 +350,20 @@ TEST_P(DBSDKTest, SelectSnappy) { "index(key=c1, ts=c2, abs_ttl=0, ttl_type=absolute)) options (compress_type='snappy');"; sr->ExecuteSQL(create_sql, &status); ASSERT_TRUE(status.IsOK()); - std::string insert_sql = "insert into trans values ('aaa', 1635247427000, \"2021-05-20\");"; - sr->ExecuteSQL(insert_sql, &status); - ASSERT_TRUE(status.IsOK()); + int insert_num = 100; + for (int i = 0; i < insert_num; i++) { + auto insert_sql = absl::StrCat("insert into trans values ('aaa", i, "', 1635247427000, \"2021-05-20\");"); + sr->ExecuteSQL(insert_sql, &status); + ASSERT_TRUE(status.IsOK()); + } auto rs = sr->ExecuteSQL("select * from trans", &status); ASSERT_TRUE(status.IsOK()); - ASSERT_EQ(1, rs->Size()); + ASSERT_EQ(insert_num, rs->Size()); + int count = 0; + while (rs->Next()) { + count++; + } + EXPECT_EQ(count, insert_num); sr->ExecuteSQL("drop table trans;", &status); ASSERT_TRUE(status.IsOK()); sr->ExecuteSQL("drop database " + db + ";", &status); diff --git a/src/codec/row_codec.cc b/src/codec/row_codec.cc index 64641d4f14c..9131259a03b 100644 --- a/src/codec/row_codec.cc +++ b/src/codec/row_codec.cc @@ -300,6 +300,20 @@ int32_t EncodeRows(const boost::container::dequeappend(&total_size, 4); + memrev32ifbe(&pk_size); + buf->append(&pk_size, 4); + memrev64ifbe(&time); + buf->append(&time, 8); + buf->append(pk); + buf->append(data, size); +} + void EncodeFull(const std::string& pk, uint64_t time, const char* data, const size_t size, char* buffer, uint32_t offset) { buffer += offset; diff --git a/src/codec/row_codec.h b/src/codec/row_codec.h index 5f4f01b9690..06220bbb591 100644 --- a/src/codec/row_codec.h +++ b/src/codec/row_codec.h @@ -88,6 +88,8 @@ void EncodeFull(const std::string& pk, uint64_t time, const char* data, const si void EncodeFull(const std::string& pk, uint64_t time, const DataBlock* data, char* buffer, uint32_t offset); +void EncodeFull(const std::string& pk, uint64_t time, const char* data, const size_t size, butil::IOBuf* buf); + void Decode(const std::string* str, std::vector>& pairs); // NOLINT void DecodeFull(const std::string* str, diff --git a/src/tablet/tablet_impl.cc b/src/tablet/tablet_impl.cc index e6924b64ce9..d8680b73ca8 100644 --- a/src/tablet/tablet_impl.cc +++ b/src/tablet/tablet_impl.cc @@ -1446,14 +1446,12 @@ void TabletImpl::Traverse(RpcController* controller, const ::openmldb::api::Trav DEBUGLOG("tid %u, pid %u seek to first", tid, pid); it->SeekToFirst(); } - std::map>> value_map; - std::vector key_seq; - uint32_t total_block_size = 0; bool remove_duplicated_record = false; if (request->has_enable_remove_duplicated_record()) { remove_duplicated_record = request->enable_remove_duplicated_record(); } uint32_t scount = 0; + butil::IOBuf buf; for (; it->Valid(); it->Next()) { if (request->limit() > 0 && scount > request->limit() - 1) { DEBUGLOG("reache the limit %u ", request->limit()); @@ -1475,16 +1473,9 @@ void TabletImpl::Traverse(RpcController* controller, const ::openmldb::api::Trav continue; } } - auto map_it = value_map.find(last_pk); - if (map_it == value_map.end()) { - auto pair = value_map.emplace(last_pk, std::vector>()); - map_it = pair.first; - map_it->second.reserve(request->limit()); - key_seq.emplace_back(map_it->first); - } openmldb::base::Slice value = it->GetValue(); - map_it->second.emplace_back(it->GetKey(), value); - total_block_size += last_pk.length() + value.size(); + DLOG(INFO) << "encode pk " << it->GetPK() << " ts " << it->GetKey() << " size " << value.size(); + ::openmldb::codec::EncodeFull(it->GetPK(), it->GetKey(), value.data(), value.size(), &buf); scount++; if (FLAGS_max_traverse_cnt > 0 && it->GetCount() >= FLAGS_max_traverse_cnt) { DEBUGLOG("traverse cnt %lu max %lu, key %s ts %lu", it->GetCount(), FLAGS_max_traverse_cnt, last_pk.c_str(), @@ -1504,26 +1495,7 @@ void TabletImpl::Traverse(RpcController* controller, const ::openmldb::api::Trav } else if (scount < request->limit()) { is_finish = true; } - uint32_t total_size = scount * (8 + 4 + 4) + total_block_size; - std::string* pairs = response->mutable_pairs(); - if (scount <= 0) { - pairs->resize(0); - } else { - pairs->resize(total_size); - } - char* rbuffer = reinterpret_cast(&((*pairs)[0])); - uint32_t offset = 0; - for (const auto& key : key_seq) { - auto iter = value_map.find(key); - if (iter == value_map.end()) { - continue; - } - for (const auto& pair : iter->second) { - DLOG(INFO) << "encode pk " << key << " ts " << pair.first << " size " << pair.second.size(); - ::openmldb::codec::EncodeFull(key, pair.first, pair.second.data(), pair.second.size(), rbuffer, offset); - offset += (4 + 4 + 8 + key.length() + pair.second.size()); - } - } + buf.copy_to(response->mutable_pairs()); delete it; DLOG(INFO) << "tid " << tid << " pid " << pid << " traverse count " << scount << " last_pk " << last_pk << " last_time " << last_time << " ts_pos " << ts_pos; From 629cefcc3b07dd6494948c4e6179e1fb03e9ddca Mon Sep 17 00:00:00 2001 From: dl239 Date: Wed, 1 Nov 2023 16:58:04 +0800 Subject: [PATCH 10/12] fix: fix compile --- src/codec/row_codec.h | 1 + 1 file changed, 1 insertion(+) diff --git a/src/codec/row_codec.h b/src/codec/row_codec.h index 06220bbb591..79cb365dceb 100644 --- a/src/codec/row_codec.h +++ b/src/codec/row_codec.h @@ -24,6 +24,7 @@ #include "base/status.h" #include "boost/container/deque.hpp" +#include "butil/iobuf.h" #include "codec/codec.h" #include "storage/segment.h" From acd8bc27e948b13703be049d499f6c58e747104b Mon Sep 17 00:00:00 2001 From: dl239 Date: Wed, 1 Nov 2023 17:45:09 +0800 Subject: [PATCH 11/12] fix: fix java case --- .../openmldb/jdbc/RequestPreparedStatementTest.java | 10 ++++++---- onebox/start_onebox.sh | 2 ++ 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/RequestPreparedStatementTest.java b/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/RequestPreparedStatementTest.java index 2df84d78ae7..8f621f862e9 100644 --- a/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/RequestPreparedStatementTest.java +++ b/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/RequestPreparedStatementTest.java @@ -317,8 +317,8 @@ public void testBatchRequest(String compressType, String storageMode) { } } - @Test - public void testDeploymentBatchRequest() { + @Test(dataProvider = "createOption") + public void testDeploymentBatchRequest(String compressType, String storageMode) { java.sql.Statement state = executor.getStatement(); String dbname = "db" + random.nextInt(100000); String deploymentName = "dp_test1"; @@ -326,14 +326,16 @@ public void testDeploymentBatchRequest() { state.execute("drop database if exists " + dbname + ";"); state.execute("create database " + dbname + ";"); state.execute("use " + dbname + ";"); - String createTableSql = "create table trans(c1 string,\n" + + String baseSql = "create table trans(c1 string,\n" + " c3 int,\n" + " c4 bigint,\n" + " c5 float,\n" + " c6 double,\n" + " c7 timestamp,\n" + " c8 date,\n" + - " index(key=c1, ts=c7));"; + " index(key=c1, ts=c7))"; + String createTableSql = String.format(" %s OPTIONS (compress_type='%s', storage_mode='%s');", + baseSql, compressType, storageMode); state.execute(createTableSql); String selectSql = "SELECT c1, c3, sum(c4) OVER w1 as w1_c4_sum FROM trans WINDOW w1 AS " + "(PARTITION BY trans.c1 ORDER BY trans.c7 ROWS BETWEEN 2 PRECEDING AND CURRENT ROW);"; diff --git a/onebox/start_onebox.sh b/onebox/start_onebox.sh index 639e409b37c..1d92dc7cb62 100755 --- a/onebox/start_onebox.sh +++ b/onebox/start_onebox.sh @@ -75,6 +75,8 @@ cluster_start_component() { --zk_keep_alive_check_interval=60000 --db_root_path="$binlog_dir" --recycle_bin_root_path="$recycle_bin_dir" + --hdd_root_path="$binlog_dir" + --recycle_bin_hdd_root_path="$recycle_bin_dir" ) elif [[ $role = 'nameserver' ]]; then extra_opts+=( From ed4a63339e3fdfb17d82c5bf8d7bdfe7c9b6bf71 Mon Sep 17 00:00:00 2001 From: dl239 Date: Thu, 2 Nov 2023 17:03:12 +0800 Subject: [PATCH 12/12] fix: fix scan --- src/base/kv_iterator_test.cc | 16 ++--- src/cmd/display.h | 23 ++----- src/cmd/openmldb.cc | 2 +- src/codec/codec_bench_test.cc | 12 ++-- src/codec/codec_test.cc | 22 ++---- src/codec/row_codec.cc | 75 +++------------------ src/codec/row_codec.h | 14 +--- src/storage/mem_table.cc | 4 +- src/storage/segment.cc | 29 +++++--- src/storage/segment.h | 9 ++- src/storage/segment_test.cc | 12 ++-- src/tablet/tablet_impl.cc | 123 +++++----------------------------- src/tablet/tablet_impl.h | 9 +-- 13 files changed, 88 insertions(+), 262 deletions(-) diff --git a/src/base/kv_iterator_test.cc b/src/base/kv_iterator_test.cc index 3c35d6ba472..11e4228c5b3 100644 --- a/src/base/kv_iterator_test.cc +++ b/src/base/kv_iterator_test.cc @@ -77,13 +77,12 @@ TEST_F(KvIteratorTest, Iterator) { TEST_F(KvIteratorTest, HasPK) { auto response = std::make_shared<::openmldb::api::TraverseResponse>(); - std::string* pairs = response->mutable_pairs(); - pairs->resize(52); - char* data = reinterpret_cast(&((*pairs)[0])); ::openmldb::storage::DataBlock* db1 = new ::openmldb::storage::DataBlock(1, "hello", 5); ::openmldb::storage::DataBlock* db2 = new ::openmldb::storage::DataBlock(1, "hell1", 5); - ::openmldb::codec::EncodeFull("test1", 9527, db1, data, 0); - ::openmldb::codec::EncodeFull("test2", 9528, db2, data, 26); + butil::IOBuf buf; + ::openmldb::codec::EncodeFull("test1", 9527, db1->data, db1->size, &buf); + ::openmldb::codec::EncodeFull("test2", 9528, db2->data, db2->size, &buf); + buf.copy_to(response->mutable_pairs()); TraverseKvIterator kv_it(response); ASSERT_TRUE(kv_it.Valid()); ASSERT_STREQ("test1", kv_it.GetPK().c_str()); @@ -100,19 +99,18 @@ TEST_F(KvIteratorTest, HasPK) { TEST_F(KvIteratorTest, NextPK) { auto response = std::make_shared<::openmldb::api::TraverseResponse>(); - std::string* pairs = response->mutable_pairs(); - pairs->resize(16*9 + 90); std::string value("hello"); - char* data = reinterpret_cast(&((*pairs)[0])); uint32_t offset = 0; + butil::IOBuf buf; for (int i = 0; i < 3; i++) { std::string pk = "test" + std::to_string(i); uint64_t ts = 9500; for (int j = 0; j < 3; j++) { - ::openmldb::codec::EncodeFull(pk, ts - j, value.data(), value.size(), data, offset); + ::openmldb::codec::EncodeFull(pk, ts - j, value.data(), value.size(), &buf); offset += 16 + 10; } } + buf.copy_to(response->mutable_pairs()); TraverseKvIterator kv_it(response); int count = 0; while (kv_it.Valid()) { diff --git a/src/cmd/display.h b/src/cmd/display.h index 6f01549bada..024de5f2ed8 100644 --- a/src/cmd/display.h +++ b/src/cmd/display.h @@ -146,7 +146,7 @@ __attribute__((unused)) static void PrintColumnKey( stream << t; } -__attribute__((unused)) static void ShowTableRows(bool is_compress, ::openmldb::codec::SDKCodec* codec, +__attribute__((unused)) static void ShowTableRows(::openmldb::codec::SDKCodec* codec, ::openmldb::cmd::SDKIterator* it) { std::vector row = codec->GetColNames(); if (!codec->HasTSCol()) { @@ -160,12 +160,7 @@ __attribute__((unused)) static void ShowTableRows(bool is_compress, ::openmldb:: while (it->Valid()) { std::vector vrow; openmldb::base::Slice data = it->GetValue(); - std::string value; - if (is_compress) { - ::snappy::Uncompress(data.data(), data.size(), &value); - } else { - value.assign(data.data(), data.size()); - } + std::string value(data.data(), data.size()); codec->DecodeRow(value, &vrow); if (!codec->HasTSCol()) { vrow.insert(vrow.begin(), std::to_string(it->GetKey())); @@ -186,19 +181,16 @@ __attribute__((unused)) static void ShowTableRows(bool is_compress, ::openmldb:: __attribute__((unused)) static void ShowTableRows(const ::openmldb::api::TableMeta& table_info, ::openmldb::cmd::SDKIterator* it) { ::openmldb::codec::SDKCodec codec(table_info); - bool is_compress = table_info.compress_type() == ::openmldb::type::CompressType::kSnappy ? true : false; - ShowTableRows(is_compress, &codec, it); + ShowTableRows(&codec, it); } __attribute__((unused)) static void ShowTableRows(const ::openmldb::nameserver::TableInfo& table_info, ::openmldb::cmd::SDKIterator* it) { ::openmldb::codec::SDKCodec codec(table_info); - bool is_compress = table_info.compress_type() == ::openmldb::type::CompressType::kSnappy ? true : false; - ShowTableRows(is_compress, &codec, it); + ShowTableRows(&codec, it); } -__attribute__((unused)) static void ShowTableRows(const std::string& key, ::openmldb::cmd::SDKIterator* it, - const ::openmldb::type::CompressType compress_type) { +__attribute__((unused)) static void ShowTableRows(const std::string& key, ::openmldb::cmd::SDKIterator* it) { ::baidu::common::TPrinter tp(4, FLAGS_max_col_display_length); std::vector row; row.push_back("#"); @@ -209,11 +201,6 @@ __attribute__((unused)) static void ShowTableRows(const std::string& key, ::open uint32_t index = 1; while (it->Valid()) { std::string value = it->GetValue().ToString(); - if (compress_type == ::openmldb::type::CompressType::kSnappy) { - std::string uncompressed; - ::snappy::Uncompress(value.c_str(), value.length(), &uncompressed); - value = uncompressed; - } row.clear(); row.push_back(std::to_string(index)); row.push_back(key); diff --git a/src/cmd/openmldb.cc b/src/cmd/openmldb.cc index ccfda4ff5ed..d132190f588 100644 --- a/src/cmd/openmldb.cc +++ b/src/cmd/openmldb.cc @@ -1572,7 +1572,7 @@ void HandleNSScan(const std::vector& parts, ::openmldb::client::NsC std::vector> iter_vec; iter_vec.push_back(std::move(it)); ::openmldb::cmd::SDKIterator sdk_it(iter_vec, limit); - ::openmldb::cmd::ShowTableRows(key, &sdk_it, tables[0].compress_type()); + ::openmldb::cmd::ShowTableRows(key, &sdk_it); } } else { if (parts.size() < 6) { diff --git a/src/codec/codec_bench_test.cc b/src/codec/codec_bench_test.cc index 3b90515d55f..aaf314782f4 100644 --- a/src/codec/codec_bench_test.cc +++ b/src/codec/codec_bench_test.cc @@ -41,8 +41,10 @@ void RunHasTs(::openmldb::storage::DataBlock* db) { datas.emplace_back(1000, std::move(::openmldb::base::Slice(db->data, db->size))); total_block_size += db->size; } - std::string pairs; - ::openmldb::codec::EncodeRows(datas, total_block_size, &pairs); + butil::IOBuf buf; + for (const auto& pair : datas) { + Encode(pair.first, pair.second.data(), pair.second.size(), &buf); + } } void RunNoneTs(::openmldb::storage::DataBlock* db) { @@ -53,8 +55,10 @@ void RunNoneTs(::openmldb::storage::DataBlock* db) { datas.push_back(::openmldb::base::Slice(db->data, db->size)); total_block_size += db->size; } - std::string pairs; - ::openmldb::codec::EncodeRows(datas, total_block_size, &pairs); + butil::IOBuf buf; + for (const auto& v : datas) { + Encode(0, v.data(), v.size(), &buf); + } } TEST_F(CodecBenchmarkTest, ProjectTest) { diff --git a/src/codec/codec_test.cc b/src/codec/codec_test.cc index 68a9c2d7552..6c6ae99f804 100644 --- a/src/codec/codec_test.cc +++ b/src/codec/codec_test.cc @@ -34,31 +34,21 @@ class CodecTest : public ::testing::Test { ~CodecTest() {} }; -TEST_F(CodecTest, EncodeRows_empty) { - boost::container::deque> data; - std::string pairs; - int32_t size = ::openmldb::codec::EncodeRows(data, 0, &pairs); - ASSERT_EQ(size, 0); -} - -TEST_F(CodecTest, EncodeRows_invalid) { - boost::container::deque> data; - int32_t size = ::openmldb::codec::EncodeRows(data, 0, NULL); - ASSERT_EQ(size, -1); -} - TEST_F(CodecTest, EncodeRows) { boost::container::deque> data; std::string test1 = "value1"; std::string test2 = "value2"; std::string empty; - uint32_t total_block_size = test1.length() + test2.length() + empty.length(); data.emplace_back(1, std::move(::openmldb::base::Slice(test1.c_str(), test1.length()))); data.emplace_back(2, std::move(::openmldb::base::Slice(test2.c_str(), test2.length()))); data.emplace_back(3, std::move(::openmldb::base::Slice(empty.c_str(), empty.length()))); + butil::IOBuf buf; + for (const auto& pair : data) { + Encode(pair.first, pair.second.data(), pair.second.size(), &buf); + } std::string pairs; - int32_t size = ::openmldb::codec::EncodeRows(data, total_block_size, &pairs); - ASSERT_EQ(size, 3 * 12 + 6 + 6); + buf.copy_to(&pairs); + ASSERT_EQ(pairs.size(), 3 * 12 + 6 + 6); std::vector> new_data; ::openmldb::codec::Decode(&pairs, new_data); ASSERT_EQ(data.size(), new_data.size()); diff --git a/src/codec/row_codec.cc b/src/codec/row_codec.cc index 9131259a03b..f59e45b9d1e 100644 --- a/src/codec/row_codec.cc +++ b/src/codec/row_codec.cc @@ -243,6 +243,15 @@ void Encode(uint64_t time, const char* data, const size_t size, char* buffer, ui memcpy(buffer, static_cast(data), size); } +void Encode(uint64_t time, const char* data, const size_t size, butil::IOBuf* buf) { + uint32_t total_size = 8 + size; + memrev32ifbe(&total_size); + buf->append(&total_size, 4); + memrev64ifbe(&time); + buf->append(&time, 8); + buf->append(data, size); +} + void Encode(uint64_t time, const DataBlock* data, char* buffer, uint32_t offset) { return Encode(time, data->data, data->size, buffer, offset); } @@ -259,47 +268,6 @@ void Encode(const DataBlock* data, char* buffer, uint32_t offset) { return Encode(data->data, data->size, buffer, offset); } -int32_t EncodeRows(const std::vector<::openmldb::base::Slice>& rows, uint32_t total_block_size, - std::string* body) { - if (body == NULL) { - PDLOG(WARNING, "invalid output body"); - return -1; - } - - uint32_t total_size = rows.size() * 4 + total_block_size; - if (rows.size() > 0) { - body->resize(total_size); - } - uint32_t offset = 0; - char* rbuffer = reinterpret_cast(&((*body)[0])); - for (auto lit = rows.begin(); lit != rows.end(); ++lit) { - ::openmldb::codec::Encode(lit->data(), lit->size(), rbuffer, offset); - offset += (4 + lit->size()); - } - return total_size; -} - -int32_t EncodeRows(const boost::container::deque>& rows, - uint32_t total_block_size, std::string* pairs) { - if (pairs == NULL) { - PDLOG(WARNING, "invalid output pairs"); - return -1; - } - - uint32_t total_size = rows.size() * (8 + 4) + total_block_size; - if (rows.size() > 0) { - pairs->resize(total_size); - } - - char* rbuffer = reinterpret_cast(&((*pairs)[0])); - uint32_t offset = 0; - for (auto lit = rows.begin(); lit != rows.end(); ++lit) { - ::openmldb::codec::Encode(lit->first, lit->second.data(), lit->second.size(), rbuffer, offset); - offset += (4 + 8 + lit->second.size()); - } - return total_size; -} - void EncodeFull(const std::string& pk, uint64_t time, const char* data, const size_t size, butil::IOBuf* buf) { uint32_t pk_size = pk.length(); uint32_t total_size = 8 + pk_size + size; @@ -314,31 +282,6 @@ void EncodeFull(const std::string& pk, uint64_t time, const char* data, const si buf->append(data, size); } -void EncodeFull(const std::string& pk, uint64_t time, const char* data, const size_t size, char* buffer, - uint32_t offset) { - buffer += offset; - uint32_t pk_size = pk.length(); - uint32_t total_size = 8 + pk_size + size; - DEBUGLOG("encode total size %u pk size %u", total_size, pk_size); - memcpy(buffer, static_cast(&total_size), 4); - memrev32ifbe(buffer); - buffer += 4; - memcpy(buffer, static_cast(&pk_size), 4); - memrev32ifbe(buffer); - buffer += 4; - memcpy(buffer, static_cast(&time), 8); - memrev64ifbe(buffer); - buffer += 8; - memcpy(buffer, static_cast(pk.c_str()), pk_size); - buffer += pk_size; - memcpy(buffer, static_cast(data), size); -} - -void EncodeFull(const std::string& pk, uint64_t time, const DataBlock* data, char* buffer, - uint32_t offset) { - return EncodeFull(pk, time, data->data, data->size, buffer, offset); -} - void Decode(const std::string* str, std::vector>& pairs) { // NOLINT const char* buffer = str->c_str(); uint32_t total_size = str->length(); diff --git a/src/codec/row_codec.h b/src/codec/row_codec.h index 79cb365dceb..f2ac1f69ea7 100644 --- a/src/codec/row_codec.h +++ b/src/codec/row_codec.h @@ -71,24 +71,14 @@ bool DecodeRows(const std::string& data, uint32_t count, const Schema& schema, void Encode(uint64_t time, const char* data, const size_t size, char* buffer, uint32_t offset); +void Encode(uint64_t time, const char* data, const size_t size, butil::IOBuf* buf); + void Encode(uint64_t time, const DataBlock* data, char* buffer, uint32_t offset); void Encode(const char* data, const size_t size, char* buffer, uint32_t offset); void Encode(const DataBlock* data, char* buffer, uint32_t offset); -int32_t EncodeRows(const std::vector<::openmldb::base::Slice>& rows, uint32_t total_block_size, - std::string* body); - -int32_t EncodeRows(const boost::container::deque>& rows, - uint32_t total_block_size, std::string* pairs); -// encode pk, ts and value -void EncodeFull(const std::string& pk, uint64_t time, const char* data, const size_t size, char* buffer, - uint32_t offset); - -void EncodeFull(const std::string& pk, uint64_t time, const DataBlock* data, char* buffer, - uint32_t offset); - void EncodeFull(const std::string& pk, uint64_t time, const char* data, const size_t size, butil::IOBuf* buf); void Decode(const std::string* str, std::vector>& pairs); // NOLINT diff --git a/src/storage/mem_table.cc b/src/storage/mem_table.cc index 7c7005272dd..087f0a50aa7 100644 --- a/src/storage/mem_table.cc +++ b/src/storage/mem_table.cc @@ -528,9 +528,9 @@ TableIterator* MemTable::NewIterator(uint32_t index, const std::string& pk, Tick Segment* segment = segments_[real_idx][seg_idx]; auto ts_col = index_def->GetTsColumn(); if (ts_col) { - return segment->NewIterator(spk, ts_col->GetId(), ticket); + return segment->NewIterator(spk, ts_col->GetId(), ticket, GetCompressType()); } - return segment->NewIterator(spk, ticket); + return segment->NewIterator(spk, ticket, GetCompressType()); } uint64_t MemTable::GetRecordIdxByteSize() { diff --git a/src/storage/segment.cc b/src/storage/segment.cc index aec7f083b36..d79b6e85681 100644 --- a/src/storage/segment.cc +++ b/src/storage/segment.cc @@ -15,7 +15,7 @@ */ #include "storage/segment.h" - +#include #include #include "base/glog_wrapper.h" @@ -742,36 +742,38 @@ int Segment::GetCount(const Slice& key, uint32_t idx, uint64_t& count) { return 0; } -MemTableIterator* Segment::NewIterator(const Slice& key, Ticket& ticket) { +MemTableIterator* Segment::NewIterator(const Slice& key, Ticket& ticket, type::CompressType compress_type) { if (entries_ == nullptr || ts_cnt_ > 1) { - return new MemTableIterator(nullptr); + return new MemTableIterator(nullptr, compress_type); } void* entry = nullptr; if (entries_->Get(key, entry) < 0 || entry == nullptr) { - return new MemTableIterator(nullptr); + return new MemTableIterator(nullptr, compress_type); } ticket.Push(reinterpret_cast(entry)); - return new MemTableIterator(reinterpret_cast(entry)->entries.NewIterator()); + return new MemTableIterator(reinterpret_cast(entry)->entries.NewIterator(), compress_type); } -MemTableIterator* Segment::NewIterator(const Slice& key, uint32_t idx, Ticket& ticket) { +MemTableIterator* Segment::NewIterator(const Slice& key, uint32_t idx, + Ticket& ticket, type::CompressType compress_type) { auto pos = ts_idx_map_.find(idx); if (pos == ts_idx_map_.end()) { - return new MemTableIterator(nullptr); + return new MemTableIterator(nullptr, compress_type); } if (ts_cnt_ == 1) { - return NewIterator(key, ticket); + return NewIterator(key, ticket, compress_type); } void* entry_arr = nullptr; if (entries_->Get(key, entry_arr) < 0 || entry_arr == nullptr) { - return new MemTableIterator(nullptr); + return new MemTableIterator(nullptr, compress_type); } auto entry = reinterpret_cast(entry_arr)[pos->second]; ticket.Push(entry); - return new MemTableIterator(entry->entries.NewIterator()); + return new MemTableIterator(entry->entries.NewIterator(), compress_type); } -MemTableIterator::MemTableIterator(TimeEntries::Iterator* it) : it_(it) {} +MemTableIterator::MemTableIterator(TimeEntries::Iterator* it, type::CompressType compress_type) + : it_(it), compress_type_(compress_type) {} MemTableIterator::~MemTableIterator() { if (it_ != nullptr) { @@ -797,6 +799,11 @@ void MemTableIterator::Next() { } ::openmldb::base::Slice MemTableIterator::GetValue() const { + if (compress_type_ == type::CompressType::kSnappy) { + tmp_buf_.clear(); + snappy::Uncompress(it_->GetValue()->data, it_->GetValue()->size, &tmp_buf_); + return openmldb::base::Slice(tmp_buf_); + } return ::openmldb::base::Slice(it_->GetValue()->data, it_->GetValue()->size); } diff --git a/src/storage/segment.h b/src/storage/segment.h index 8e320400e39..fe58dd893a0 100644 --- a/src/storage/segment.h +++ b/src/storage/segment.h @@ -22,6 +22,7 @@ #include #include // NOLINT #include +#include #include #include "base/skiplist.h" @@ -40,7 +41,7 @@ using ::openmldb::base::Slice; class MemTableIterator : public TableIterator { public: - explicit MemTableIterator(TimeEntries::Iterator* it); + explicit MemTableIterator(TimeEntries::Iterator* it, type::CompressType compress_type); virtual ~MemTableIterator(); void Seek(const uint64_t time) override; bool Valid() override; @@ -52,6 +53,8 @@ class MemTableIterator : public TableIterator { private: TimeEntries::Iterator* it_; + type::CompressType compress_type_; + mutable std::string tmp_buf_; }; struct SliceComparator { @@ -93,9 +96,9 @@ class Segment { void Gc4TTLOrHead(const uint64_t time, const uint64_t keep_cnt, StatisticsInfo* statistics_info); void GcAllType(const std::map& ttl_st_map, StatisticsInfo* statistics_info); - MemTableIterator* NewIterator(const Slice& key, Ticket& ticket); // NOLINT + MemTableIterator* NewIterator(const Slice& key, Ticket& ticket, type::CompressType compress_type); // NOLINT MemTableIterator* NewIterator(const Slice& key, uint32_t idx, - Ticket& ticket); // NOLINT + Ticket& ticket, type::CompressType compress_type); // NOLINT uint64_t GetIdxCnt() const { return idx_cnt_vec_[0]->load(std::memory_order_relaxed); diff --git a/src/storage/segment_test.cc b/src/storage/segment_test.cc index 8b4728a9150..c51c0984473 100644 --- a/src/storage/segment_test.cc +++ b/src/storage/segment_test.cc @@ -61,7 +61,7 @@ TEST_F(SegmentTest, PutAndScan) { segment.Put(pk, 9529, value.c_str(), value.size()); ASSERT_EQ(1, (int64_t)segment.GetPkCnt()); Ticket ticket; - std::unique_ptr it(segment.NewIterator("test1", ticket)); + std::unique_ptr it(segment.NewIterator("test1", ticket, type::CompressType::kNoCompress)); it->Seek(9530); ASSERT_TRUE(it->Valid()); ASSERT_EQ(9529, (int64_t)it->GetKey()); @@ -103,7 +103,7 @@ TEST_F(SegmentTest, Delete) { segment.Put(pk, 9529, value.c_str(), value.size()); ASSERT_EQ(1, (int64_t)segment.GetPkCnt()); Ticket ticket; - std::unique_ptr it(segment.NewIterator("test1", ticket)); + std::unique_ptr it(segment.NewIterator("test1", ticket, type::CompressType::kNoCompress)); int size = 0; it->SeekToFirst(); while (it->Valid()) { @@ -112,7 +112,7 @@ TEST_F(SegmentTest, Delete) { } ASSERT_EQ(4, size); ASSERT_TRUE(segment.Delete(std::nullopt, pk)); - it.reset(segment.NewIterator("test1", ticket)); + it.reset(segment.NewIterator("test1", ticket, type::CompressType::kNoCompress)); ASSERT_FALSE(it->Valid()); segment.IncrGcVersion(); segment.IncrGcVersion(); @@ -178,7 +178,7 @@ TEST_F(SegmentTest, Iterator) { segment.Put(pk, 9769, "test2", 5); ASSERT_EQ(1, (int64_t)segment.GetPkCnt()); Ticket ticket; - std::unique_ptr it(segment.NewIterator("test1", ticket)); + std::unique_ptr it(segment.NewIterator("test1", ticket, type::CompressType::kNoCompress)); it->SeekToFirst(); int size = 0; while (it->Valid()) { @@ -208,7 +208,7 @@ TEST_F(SegmentTest, TestGc4Head) { segment.Gc4Head(1, &gc_info); CheckStatisticsInfo(CreateStatisticsInfo(1, 0, GetRecordSize(5)), gc_info); Ticket ticket; - std::unique_ptr it(segment.NewIterator(pk, ticket)); + std::unique_ptr it(segment.NewIterator(pk, ticket, type::CompressType::kNoCompress)); it->Seek(9769); ASSERT_TRUE(it->Valid()); ASSERT_EQ(9769, (int64_t)it->GetKey()); @@ -401,7 +401,7 @@ TEST_F(SegmentTest, TestDeleteRange) { ASSERT_EQ(100, GetCount(&segment, 0)); std::string pk = "key2"; Ticket ticket; - std::unique_ptr it(segment.NewIterator(pk, ticket)); + std::unique_ptr it(segment.NewIterator(pk, ticket, type::CompressType::kNoCompress)); it->Seek(1005); ASSERT_TRUE(it->Valid() && it->GetKey() == 1005); ASSERT_TRUE(segment.Delete(std::nullopt, pk, 1005, 1004)); diff --git a/src/tablet/tablet_impl.cc b/src/tablet/tablet_impl.cc index d8680b73ca8..edf009f3b61 100644 --- a/src/tablet/tablet_impl.cc +++ b/src/tablet/tablet_impl.cc @@ -463,9 +463,6 @@ int32_t TabletImpl::GetIndex(const ::openmldb::api::GetRequest* request, const : bool enable_project = false; openmldb::codec::RowProject row_project(vers_schema, request->projection()); if (request->projection().size() > 0) { - if (meta.compress_type() == ::openmldb::type::kSnappy) { - return -1; - } bool ok = row_project.Init(); if (!ok) { PDLOG(WARNING, "invalid project list"); @@ -891,7 +888,7 @@ int TabletImpl::CheckTableMeta(const openmldb::api::TableMeta* table_meta, std:: } int32_t TabletImpl::ScanIndex(const ::openmldb::api::ScanRequest* request, const ::openmldb::api::TableMeta& meta, - const std::map>& vers_schema, + const std::map>& vers_schema, bool use_attachment, CombineIterator* combine_it, butil::IOBuf* io_buf, uint32_t* count, bool* is_finish) { uint32_t limit = request->limit(); if (combine_it == nullptr || io_buf == nullptr || count == nullptr || is_finish == nullptr) { @@ -915,12 +912,7 @@ int32_t TabletImpl::ScanIndex(const ::openmldb::api::ScanRequest* request, const bool enable_project = false; ::openmldb::codec::RowProject row_project(vers_schema, request->projection()); if (request->projection().size() > 0) { - if (meta.compress_type() == ::openmldb::type::kSnappy) { - LOG(WARNING) << "project on compress row data do not eing supported"; - return -1; - } - bool ok = row_project.Init(); - if (!ok) { + if (!row_project.Init()) { PDLOG(WARNING, "invalid project list"); return -1; } @@ -961,11 +953,19 @@ int32_t TabletImpl::ScanIndex(const ::openmldb::api::ScanRequest* request, const PDLOG(WARNING, "fail to make a projection"); return -4; } - io_buf->append(reinterpret_cast(ptr), size); + if (use_attachment) { + io_buf->append(reinterpret_cast(ptr), size); + } else { + ::openmldb::codec::Encode(ts, reinterpret_cast(ptr), size, io_buf); + } total_block_size += size; } else { openmldb::base::Slice data = combine_it->GetValue(); - io_buf->append(reinterpret_cast(data.data()), data.size()); + if (use_attachment) { + io_buf->append(reinterpret_cast(data.data()), data.size()); + } else { + ::openmldb::codec::Encode(ts, data.data(), data.size(), io_buf); + } total_block_size += data.size(); } record_count++; @@ -978,98 +978,6 @@ int32_t TabletImpl::ScanIndex(const ::openmldb::api::ScanRequest* request, const *count = record_count; return 0; } -int32_t TabletImpl::ScanIndex(const ::openmldb::api::ScanRequest* request, const ::openmldb::api::TableMeta& meta, - const std::map>& vers_schema, - CombineIterator* combine_it, std::string* pairs, uint32_t* count, bool* is_finish) { - uint32_t limit = request->limit(); - if (combine_it == nullptr || pairs == nullptr || count == nullptr || is_finish == nullptr) { - PDLOG(WARNING, "invalid args"); - return -1; - } - uint64_t st = request->st(); - uint64_t et = request->et(); - uint64_t expire_time = combine_it->GetExpireTime(); - ::openmldb::storage::TTLType ttl_type = combine_it->GetTTLType(); - if (ttl_type == ::openmldb::storage::TTLType::kAbsoluteTime || - ttl_type == ::openmldb::storage::TTLType::kAbsOrLat) { - et = std::max(et, expire_time); - } - if (st > 0 && st < et) { - PDLOG(WARNING, "invalid args for st %lu less than et %lu or expire time %lu", st, et, expire_time); - return -1; - } - - bool enable_project = false; - ::openmldb::codec::RowProject row_project(vers_schema, request->projection()); - if (!request->projection().empty()) { - if (meta.compress_type() == ::openmldb::type::kSnappy) { - LOG(WARNING) << "project on compress row data, not supported"; - return -1; - } - bool ok = row_project.Init(); - if (!ok) { - PDLOG(WARNING, "invalid project list"); - return -1; - } - enable_project = true; - } - bool remove_duplicated_record = request->enable_remove_duplicated_record(); - uint64_t last_time = 0; - boost::container::deque> tmp; - uint32_t total_block_size = 0; - combine_it->SeekToFirst(); - uint32_t skip_record_num = request->skip_record_num(); - while (combine_it->Valid()) { - if (limit > 0 && tmp.size() >= limit) { - *is_finish = false; - break; - } - if (remove_duplicated_record && !tmp.empty() && last_time == combine_it->GetTs()) { - combine_it->Next(); - continue; - } - if (combine_it->GetTs() == st && skip_record_num > 0) { - skip_record_num--; - combine_it->Next(); - continue; - } - uint64_t ts = combine_it->GetTs(); - if (ts <= et) { - break; - } - last_time = ts; - if (enable_project) { - int8_t* ptr = nullptr; - uint32_t size = 0; - openmldb::base::Slice data = combine_it->GetValue(); - const auto* row_ptr = reinterpret_cast(data.data()); - bool ok = row_project.Project(row_ptr, data.size(), &ptr, &size); - if (!ok) { - PDLOG(WARNING, "fail to make a projection"); - return -4; - } - tmp.emplace_back(ts, Slice(reinterpret_cast(ptr), size, true)); - total_block_size += size; - } else { - openmldb::base::Slice data = combine_it->GetValue(); - total_block_size += data.size(); - tmp.emplace_back(ts, data); - } - if (total_block_size > FLAGS_scan_max_bytes_size) { - LOG(WARNING) << "reach the max byte size " << FLAGS_scan_max_bytes_size << " cur is " << total_block_size; - *is_finish = false; - break; - } - combine_it->Next(); - } - int32_t ok = ::openmldb::codec::EncodeRows(tmp, total_block_size, pairs); - if (ok == -1) { - PDLOG(WARNING, "fail to encode rows"); - return -4; - } - *count = tmp.size(); - return 0; -} int32_t TabletImpl::CountIndex(uint64_t expire_time, uint64_t expire_cnt, ::openmldb::storage::TTLType ttl_type, ::openmldb::storage::TableIterator* it, const ::openmldb::api::CountRequest* request, @@ -1258,12 +1166,13 @@ void TabletImpl::Scan(RpcController* controller, const ::openmldb::api::ScanRequ int32_t code = 0; bool is_finish = true; if (!request->has_use_attachment() || !request->use_attachment()) { - std::string* pairs = response->mutable_pairs(); - code = ScanIndex(request, *table_meta, vers_schema, &combine_it, pairs, &count, &is_finish); + butil::IOBuf buf; + code = ScanIndex(request, *table_meta, vers_schema, false, &combine_it, &buf, &count, &is_finish); + buf.copy_to(response->mutable_pairs()); } else { auto* cntl = dynamic_cast(controller); butil::IOBuf& buf = cntl->response_attachment(); - code = ScanIndex(request, *table_meta, vers_schema, &combine_it, &buf, &count, &is_finish); + code = ScanIndex(request, *table_meta, vers_schema, true, &combine_it, &buf, &count, &is_finish); response->set_buf_size(buf.size()); DLOG(INFO) << " scan " << request->pk() << " with buf size " << buf.size(); } diff --git a/src/tablet/tablet_impl.h b/src/tablet/tablet_impl.h index d48f192ae26..7207b3ab8bd 100644 --- a/src/tablet/tablet_impl.h +++ b/src/tablet/tablet_impl.h @@ -239,14 +239,9 @@ class TabletImpl : public ::openmldb::api::TabletServer { const std::map>& vers_schema, CombineIterator* combine_it, std::string* value, uint64_t* ts); - // scan specified ttl type index int32_t ScanIndex(const ::openmldb::api::ScanRequest* request, const ::openmldb::api::TableMeta& meta, - const std::map>& vers_schema, CombineIterator* combine_it, - std::string* pairs, uint32_t* count, bool* is_finish); - - int32_t ScanIndex(const ::openmldb::api::ScanRequest* request, const ::openmldb::api::TableMeta& meta, - const std::map>& vers_schema, CombineIterator* combine_it, - butil::IOBuf* buf, uint32_t* count, bool* is_finish); + const std::map>& vers_schema, bool use_attachment, + CombineIterator* combine_it, butil::IOBuf* buf, uint32_t* count, bool* is_finish); int32_t CountIndex(uint64_t expire_time, uint64_t expire_cnt, ::openmldb::storage::TTLType ttl_type, ::openmldb::storage::TableIterator* it, const ::openmldb::api::CountRequest* request,