diff --git a/cases/query/left_join.yml b/cases/query/left_join.yml index feb6c7d28b1..87e1c387ea6 100644 --- a/cases/query/left_join.yml +++ b/cases/query/left_join.yml @@ -380,3 +380,196 @@ cases: aa, 20, 3000 aa, 20, 2000 bb, NULL, NULL + - id: 6 + desc: lastjoin(leftjoin(filter, table)) + inputs: + - name: t1 + columns: ["c1 string","c2 int","c4 timestamp"] + indexs: ["index1:c1:c4"] + rows: + - ["aa",20,1000] + - ["bb",30,1000] + - ["cc",40,1000] + - ["dd",50,1000] + - name: t2 + columns: ["c1 string", "c2 int", "c4 timestamp"] + indexs: ["index1:c1:c4", "index2:c2:c4"] + rows: + - ["bb",20, 1000] + - ["aa",30, 2000] + - ["bb",30, 3000] + - ["cc",40, 4000] + - ["dd",50, 5000] + - name: t3 + columns: ["c1 string","c2 int","c3 bigint","c4 timestamp"] + indexs: ["index1:c1:c4"] + rows: + - ["aa",19,13,3000] + - ["bb",34,131,3000] + sql: | + select + t1.c1, + t1.c2, + tx.* + from t1 last join + ( + select t2.c1 as tx_0_c1, + t2.c2 as tx_0_c2, + t2.c4 as tx_0_c4, + t3.c2 as tx_1_c2, + t3.c3 as tx_1_c3 + from (select * from t2 where c1 != 'dd') t2 left join t3 + on t2.c1 = t3.c1 + ) tx + order by tx.tx_0_c4 + on t1.c2 = tx.tx_0_c2 + request_plan: | + SIMPLE_PROJECT(sources=(t1.c1, t1.c2, tx.tx_0_c1, tx.tx_0_c2, tx.tx_0_c4, tx.tx_1_c2, tx.tx_1_c3)) + REQUEST_JOIN(type=LastJoin, right_sort=(ASC), condition=, left_keys=(), right_keys=(), index_keys=(t1.c2)) + DATA_PROVIDER(request=t1) + RENAME(name=tx) + SIMPLE_PROJECT(sources=(t2.c1 -> tx_0_c1, t2.c2 -> tx_0_c2, t2.c4 -> tx_0_c4, t3.c2 -> tx_1_c2, t3.c3 -> tx_1_c3)) + REQUEST_JOIN(type=LeftJoin, condition=, left_keys=(), right_keys=(), index_keys=(t2.c1)) + RENAME(name=t2) + FILTER_BY(condition=c1 != dd, left_keys=, right_keys=, index_keys=) + DATA_PROVIDER(type=Partition, table=t2, index=index2) + DATA_PROVIDER(type=Partition, table=t3, index=index1) + expect: + order: c1 + columns: ["c1 string", "c2 int", "tx_0_c1 string", "tx_0_c2 int", "tx_0_c4 timestamp", "tx_1_c2 int", "tx_1_c3 int64"] + data: | + aa, 20, bb, 20, 1000, 34, 131 + bb, 30, bb, 30, 3000, 34, 131 + cc, 40, cc, 40, 4000, NULL, NULL + dd, 50, NULL, NULL, NULL, NULL, NULL + - id: 7 + desc: lastjoin(leftjoin(filter, filter)) + inputs: + - name: t1 + columns: ["c1 string","c2 int","c4 timestamp"] + indexs: ["index1:c1:c4"] + rows: + - ["aa",20,1000] + - ["bb",30,1000] + - ["cc",40,1000] + - ["dd",50,1000] + - name: t2 + columns: ["c1 string", "c2 int", "c4 timestamp"] + indexs: ["index1:c1:c4", "index2:c2:c4"] + rows: + - ["bb",20, 1000] + - ["aa",30, 2000] + - ["bb",30, 3000] + - ["cc",40, 4000] + - ["dd",50, 5000] + - name: t3 + columns: ["c1 string","c2 int","c3 bigint","c4 timestamp"] + indexs: ["index1:c1:c4"] + rows: + - ["aa",19,13,3000] + - ["bb",34,131,3000] + cluster_request_plan: | + SIMPLE_PROJECT(sources=(t1.c1, t1.c2, tx.tx_0_c1, tx.tx_0_c2, tx.tx_0_c4, tx.tx_1_c2, tx.tx_1_c3)) + REQUEST_JOIN(type=kJoinTypeConcat) + DATA_PROVIDER(request=t1) + REQUEST_JOIN(OUTPUT_RIGHT_ONLY, type=LastJoin, right_sort=(ASC), condition=, left_keys=(#5), right_keys=(#8), index_keys=) + SIMPLE_PROJECT(sources=(#5 -> t1.c2)) + DATA_PROVIDER(request=t1) + RENAME(name=tx) + SIMPLE_PROJECT(sources=(t2.c1 -> tx_0_c1, t2.c2 -> tx_0_c2, t2.c4 -> tx_0_c4, t3.c2 -> tx_1_c2, t3.c3 -> tx_1_c3)) + REQUEST_JOIN(type=LeftJoin, condition=, left_keys=(), right_keys=(), index_keys=(t2.c1)) + RENAME(name=t2) + FILTER_BY(condition=, left_keys=(), right_keys=(), index_keys=(30)) + DATA_PROVIDER(type=Partition, table=t2, index=index2) + RENAME(name=t3) + FILTER_BY(condition=c2 > 20, left_keys=, right_keys=, index_keys=) + DATA_PROVIDER(type=Partition, table=t3, index=index1) + sql: | + select + t1.c1, + t1.c2, + tx.* + from t1 last join + ( + select t2.c1 as tx_0_c1, + t2.c2 as tx_0_c2, + t2.c4 as tx_0_c4, + t3.c2 as tx_1_c2, + t3.c3 as tx_1_c3 + from (select * from t2 where c2 = 30) t2 left join (select * from t3 where c2 > 20) t3 + on t2.c1 = t3.c1 + ) tx + order by tx.tx_0_c4 + on t1.c2 = tx.tx_0_c2 + request_plan: | + expect: + order: c1 + columns: ["c1 string", "c2 int", "tx_0_c1 string", "tx_0_c2 int", "tx_0_c4 timestamp", "tx_1_c2 int", "tx_1_c3 int64"] + data: | + aa, 20, NULL, NULL, NULL, NULL, NULL + bb, 30, bb, 30, 3000, 34, 131 + cc, 40, NULL, NULL, NULL, NULL, NULL + dd, 50, NULL, NULL, NULL, NULL, NULL + - id: 8 + desc: lastjoin(leftjoin(filter, filter)) + inputs: + - name: t1 + columns: ["c1 string","c2 int","c4 timestamp"] + indexs: ["index1:c1:c4"] + rows: + - ["aa",20,1000] + - ["bb",30,1000] + - ["cc",40,1000] + - name: t2 + columns: ["c1 string", "c2 int", "c4 timestamp"] + indexs: ["index1:c1:c4", "index2:c2:c4"] + rows: + - ["bb",20, 1000] + - ["aa",20, 2000] + - ["bb",30, 3000] + - ["cc",40, 4000] + - name: t3 + columns: ["c1 string","c2 int","c3 bigint","c4 timestamp"] + indexs: ["index1:c1:c4"] + rows: + - ["aa",19,13,3000] + - ["bb",34,131,3000] + sql: | + select + t1.c1, + t1.c2, + tx.* + from t1 last join + ( + select t2.c1 as tx_0_c1, + t2.c2 as tx_0_c2, + t2.c4 as tx_0_c4, + t3.c2 as tx_1_c2, + t3.c3 as tx_1_c3 + from (select * from t2 where c2 = 20) t2 left join (select * from t3 where c1 = 'bb') t3 + on t2.c1 = t3.c1 + ) tx + on t1.c2 = tx.tx_0_c2 and not isnull(tx.tx_1_c2) + cluster_request_plan: | + SIMPLE_PROJECT(sources=(t1.c1, t1.c2, tx.tx_0_c1, tx.tx_0_c2, tx.tx_0_c4, tx.tx_1_c2, tx.tx_1_c3)) + REQUEST_JOIN(type=kJoinTypeConcat) + DATA_PROVIDER(request=t1) + REQUEST_JOIN(OUTPUT_RIGHT_ONLY, type=LastJoin, condition=NOT isnull(#89), left_keys=(#5), right_keys=(#8), index_keys=) + SIMPLE_PROJECT(sources=(#5 -> t1.c2)) + DATA_PROVIDER(request=t1) + RENAME(name=tx) + SIMPLE_PROJECT(sources=(t2.c1 -> tx_0_c1, t2.c2 -> tx_0_c2, t2.c4 -> tx_0_c4, t3.c2 -> tx_1_c2, t3.c3 -> tx_1_c3)) + REQUEST_JOIN(type=LeftJoin, condition=, left_keys=(t2.c1), right_keys=(t3.c1), index_keys=) + RENAME(name=t2) + FILTER_BY(condition=, left_keys=(), right_keys=(), index_keys=(20)) + DATA_PROVIDER(type=Partition, table=t2, index=index2) + RENAME(name=t3) + FILTER_BY(condition=, left_keys=(), right_keys=(), index_keys=(bb)) + DATA_PROVIDER(type=Partition, table=t3, index=index1) + expect: + order: c1 + columns: ["c1 string", "c2 int", "tx_0_c1 string", "tx_0_c2 int", "tx_0_c4 timestamp", "tx_1_c2 int", "tx_1_c3 int64"] + data: | + aa, 20, bb, 20, 1000, 34, 131 + bb, 30, NULL, NULL, NULL, NULL, NULL + cc, 40, NULL, NULL, NULL, NULL, NULL diff --git a/hybridse/include/vm/catalog.h b/hybridse/include/vm/catalog.h index 68139d1d930..4bd007645bd 100644 --- a/hybridse/include/vm/catalog.h +++ b/hybridse/include/vm/catalog.h @@ -333,7 +333,7 @@ class PartitionHandler : public TableHandler { // Return the iterator of row iterator // Return null by default - RowIterator* GetRawIterator() { return nullptr; } + RowIterator* GetRawIterator() override { return nullptr; } std::unique_ptr GetWindowIterator(const std::string& idx_name) override { return std::unique_ptr(); diff --git a/hybridse/include/vm/physical_op.h b/hybridse/include/vm/physical_op.h index 43c3d3d3af8..dd51c73bfd1 100644 --- a/hybridse/include/vm/physical_op.h +++ b/hybridse/include/vm/physical_op.h @@ -1678,14 +1678,22 @@ class PhysicalFilterNode : public PhysicalUnaryNode { public: PhysicalFilterNode(PhysicalOpNode *node, const node::ExprNode *condition) : PhysicalUnaryNode(node, kPhysicalOpFilter, true), filter_(condition) { - output_type_ = node->GetOutputType(); + if (node->GetOutputType() == kSchemaTypeGroup && filter_.index_key_.ValidKey()) { + output_type_ = kSchemaTypeTable; + } else { + output_type_ = node->GetOutputType(); + } fn_infos_.push_back(&filter_.condition_.fn_info()); fn_infos_.push_back(&filter_.index_key_.fn_info()); } PhysicalFilterNode(PhysicalOpNode *node, Filter filter) : PhysicalUnaryNode(node, kPhysicalOpFilter, true), filter_(filter) { - output_type_ = node->GetOutputType(); + if (node->GetOutputType() == kSchemaTypeGroup && filter_.index_key_.ValidKey()) { + output_type_ = kSchemaTypeTable; + } else { + output_type_ = node->GetOutputType(); + } fn_infos_.push_back(&filter_.condition_.fn_info()); fn_infos_.push_back(&filter_.index_key_.fn_info()); diff --git a/hybridse/src/vm/runner.cc b/hybridse/src/vm/runner.cc index bd28ec7032e..eb284e6e945 100644 --- a/hybridse/src/vm/runner.cc +++ b/hybridse/src/vm/runner.cc @@ -731,10 +731,9 @@ std::shared_ptr RequestJoinRunner::Run( auto left_part = std::dynamic_pointer_cast(left); auto right_part = std::dynamic_pointer_cast(right); return join_gen_->LazyJoinOptimized(left_part, right_part, ctx.GetParameterRow()); + } else { + return join_gen_->LazyJoin(left, right, ctx.GetParameterRow()); } - - LOG(WARNING) << "skip due to performance: left source of request join is table handler (unoptimized)"; - return std::shared_ptr(); } std::shared_ptr JoinRunner::Run(RunnerContext& ctx, diff --git a/hybridse/src/vm/runner_builder.cc b/hybridse/src/vm/runner_builder.cc index 63a346bf962..5d595ba9785 100644 --- a/hybridse/src/vm/runner_builder.cc +++ b/hybridse/src/vm/runner_builder.cc @@ -15,23 +15,11 @@ */ #include "vm/runner_builder.h" +#include "vm/physical_op.h" namespace hybridse { namespace vm { -static bool IsPartitionProvider(vm::PhysicalOpNode* n) { - switch (n->GetOpType()) { - case kPhysicalOpSimpleProject: - case kPhysicalOpRename: - case kPhysicalOpRequestJoin: - return IsPartitionProvider(n->GetProducer(0)); - case kPhysicalOpDataProvider: - return dynamic_cast(n)->provider_type_ == kProviderTypePartition; - default: - return false; - } -} - static vm::PhysicalDataProviderNode* request_node(vm::PhysicalOpNode* n) { switch (n->GetOpType()) { case kPhysicalOpDataProvider: @@ -297,13 +285,12 @@ ClusterTask RunnerBuilder::Build(PhysicalOpNode* node, Status& status) { } } if (support_cluster_optimized_) { - if (IsPartitionProvider(node->GetProducer(0))) { + if (node->GetOutputType() == kSchemaTypeGroup) { // route by index of the left source, and it should uncompleted auto& route_info = left_task.GetRouteInfo(); runner->AddProducer(left_task.GetRoot()); runner->AddProducer(right_task.GetRoot()); - return RegisterTask(node, - UnCompletedClusterTask(runner, route_info.table_handler_, route_info.index_)); + return RegisterTask(node, ClusterTask(runner, {}, route_info)); } } return RegisterTask(node, BinaryInherit(left_task, right_task, runner, index_key, kRightBias)); @@ -338,22 +325,25 @@ ClusterTask RunnerBuilder::Build(PhysicalOpNode* node, Status& status) { op->output_right_only()); if (support_cluster_optimized_) { - if (IsPartitionProvider(node->GetProducer(0))) { - // Partion left join partition, route by index of the left source, and it should uncompleted - auto& route_info = left_task.GetRouteInfo(); - runner->AddProducer(left_task.GetRoot()); - runner->AddProducer(right_task.GetRoot()); - return RegisterTask( - node, UnCompletedClusterTask(runner, route_info.table_handler_, route_info.index_)); - } - - if (right_task.IsCompletedClusterTask() && right_task.GetRouteInfo().lazy_route_ && - !op->join_.index_key_.ValidKey()) { - // join (.., filter) - auto& route_info = right_task.GetRouteInfo(); + if (node->GetOutputType() == kSchemaTypeRow) { + // complete cluster task from right + if (op->join().index_key().ValidKey()) { + // optimize key in this node + return RegisterTask(node, BinaryInherit(left_task, right_task, runner, + op->join().index_key(), kLeftBias)); + } else { + // optimize happens before, in left node + auto right_route_info = right_task.GetRouteInfo(); + runner->AddProducer(left_task.GetRoot()); + runner->AddProducer(right_task.GetRoot()); + return RegisterTask(node, ClusterTask(runner, {}, right_route_info)); + } + } else { + // uncomplete/lazify cluster task from left + auto left_route_info = left_task.GetRouteInfo(); runner->AddProducer(left_task.GetRoot()); runner->AddProducer(right_task.GetRoot()); - return RegisterTask(node, ClusterTask(runner, {}, route_info)); + return RegisterTask(node, ClusterTask(runner, {}, left_route_info)); } } @@ -372,9 +362,7 @@ ClusterTask RunnerBuilder::Build(PhysicalOpNode* node, Status& status) { } // concat join (any(tx), any(tx)), tx is not request table - auto left = request_node(node->GetProducer(0)); - // auto right = request_node(node->GetProducer(1)); - if (left->provider_type_ == kProviderTypePartition) { + if (node->GetOutputType() != kSchemaTypeRow) { runner->AddProducer(left_task.GetRoot()); runner->AddProducer(right_task.GetRoot()); return RegisterTask(node, ClusterTask(runner, {}, left_task.GetRouteInfo()));