Skip to content

Commit

Permalink
fix(runner): build cluster request join runner
Browse files Browse the repository at this point in the history
For REQUESTJOIN(ANY1(T1), ANY2(T2)),
ANY1 may optimize T1, REQUESTJOIN, ANY2 may optimize T2, building
cluster task correctly
  • Loading branch information
aceforeverd committed Nov 9, 2023
1 parent 2d3302e commit 927744e
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 39 deletions.
193 changes: 193 additions & 0 deletions cases/query/left_join.yml
Original file line number Diff line number Diff line change
Expand Up @@ -380,3 +380,196 @@ cases:
aa, 20, 3000
aa, 20, 2000
bb, NULL, NULL
- id: 6
desc: lastjoin(leftjoin(filter<not optimized>, table))
inputs:
- name: t1
columns: ["c1 string","c2 int","c4 timestamp"]
indexs: ["index1:c1:c4"]
rows:
- ["aa",20,1000]
- ["bb",30,1000]
- ["cc",40,1000]
- ["dd",50,1000]
- name: t2
columns: ["c1 string", "c2 int", "c4 timestamp"]
indexs: ["index1:c1:c4", "index2:c2:c4"]
rows:
- ["bb",20, 1000]
- ["aa",30, 2000]
- ["bb",30, 3000]
- ["cc",40, 4000]
- ["dd",50, 5000]
- name: t3
columns: ["c1 string","c2 int","c3 bigint","c4 timestamp"]
indexs: ["index1:c1:c4"]
rows:
- ["aa",19,13,3000]
- ["bb",34,131,3000]
sql: |
select
t1.c1,
t1.c2,
tx.*
from t1 last join
(
select t2.c1 as tx_0_c1,
t2.c2 as tx_0_c2,
t2.c4 as tx_0_c4,
t3.c2 as tx_1_c2,
t3.c3 as tx_1_c3
from (select * from t2 where c1 != 'dd') t2 left join t3
on t2.c1 = t3.c1
) tx
order by tx.tx_0_c4
on t1.c2 = tx.tx_0_c2
request_plan: |
SIMPLE_PROJECT(sources=(t1.c1, t1.c2, tx.tx_0_c1, tx.tx_0_c2, tx.tx_0_c4, tx.tx_1_c2, tx.tx_1_c3))
REQUEST_JOIN(type=LastJoin, right_sort=(ASC), condition=, left_keys=(), right_keys=(), index_keys=(t1.c2))
DATA_PROVIDER(request=t1)
RENAME(name=tx)
SIMPLE_PROJECT(sources=(t2.c1 -> tx_0_c1, t2.c2 -> tx_0_c2, t2.c4 -> tx_0_c4, t3.c2 -> tx_1_c2, t3.c3 -> tx_1_c3))
REQUEST_JOIN(type=LeftJoin, condition=, left_keys=(), right_keys=(), index_keys=(t2.c1))
RENAME(name=t2)
FILTER_BY(condition=c1 != dd, left_keys=, right_keys=, index_keys=)
DATA_PROVIDER(type=Partition, table=t2, index=index2)
DATA_PROVIDER(type=Partition, table=t3, index=index1)
expect:
order: c1
columns: ["c1 string", "c2 int", "tx_0_c1 string", "tx_0_c2 int", "tx_0_c4 timestamp", "tx_1_c2 int", "tx_1_c3 int64"]
data: |
aa, 20, bb, 20, 1000, 34, 131
bb, 30, bb, 30, 3000, 34, 131
cc, 40, cc, 40, 4000, NULL, NULL
dd, 50, NULL, NULL, NULL, NULL, NULL
- id: 7
desc: lastjoin(leftjoin(filter<optimized>, filter<not optimized>))
inputs:
- name: t1
columns: ["c1 string","c2 int","c4 timestamp"]
indexs: ["index1:c1:c4"]
rows:
- ["aa",20,1000]
- ["bb",30,1000]
- ["cc",40,1000]
- ["dd",50,1000]
- name: t2
columns: ["c1 string", "c2 int", "c4 timestamp"]
indexs: ["index1:c1:c4", "index2:c2:c4"]
rows:
- ["bb",20, 1000]
- ["aa",30, 2000]
- ["bb",30, 3000]
- ["cc",40, 4000]
- ["dd",50, 5000]
- name: t3
columns: ["c1 string","c2 int","c3 bigint","c4 timestamp"]
indexs: ["index1:c1:c4"]
rows:
- ["aa",19,13,3000]
- ["bb",34,131,3000]
cluster_request_plan: |
SIMPLE_PROJECT(sources=(t1.c1, t1.c2, tx.tx_0_c1, tx.tx_0_c2, tx.tx_0_c4, tx.tx_1_c2, tx.tx_1_c3))
REQUEST_JOIN(type=kJoinTypeConcat)
DATA_PROVIDER(request=t1)
REQUEST_JOIN(OUTPUT_RIGHT_ONLY, type=LastJoin, right_sort=(ASC), condition=, left_keys=(#5), right_keys=(#8), index_keys=)
SIMPLE_PROJECT(sources=(#5 -> t1.c2))
DATA_PROVIDER(request=t1)
RENAME(name=tx)
SIMPLE_PROJECT(sources=(t2.c1 -> tx_0_c1, t2.c2 -> tx_0_c2, t2.c4 -> tx_0_c4, t3.c2 -> tx_1_c2, t3.c3 -> tx_1_c3))
REQUEST_JOIN(type=LeftJoin, condition=, left_keys=(), right_keys=(), index_keys=(t2.c1))
RENAME(name=t2)
FILTER_BY(condition=, left_keys=(), right_keys=(), index_keys=(30))
DATA_PROVIDER(type=Partition, table=t2, index=index2)
RENAME(name=t3)
FILTER_BY(condition=c2 > 20, left_keys=, right_keys=, index_keys=)
DATA_PROVIDER(type=Partition, table=t3, index=index1)
sql: |
select
t1.c1,
t1.c2,
tx.*
from t1 last join
(
select t2.c1 as tx_0_c1,
t2.c2 as tx_0_c2,
t2.c4 as tx_0_c4,
t3.c2 as tx_1_c2,
t3.c3 as tx_1_c3
from (select * from t2 where c2 = 30) t2 left join (select * from t3 where c2 > 20) t3
on t2.c1 = t3.c1
) tx
order by tx.tx_0_c4
on t1.c2 = tx.tx_0_c2
request_plan: |
expect:
order: c1
columns: ["c1 string", "c2 int", "tx_0_c1 string", "tx_0_c2 int", "tx_0_c4 timestamp", "tx_1_c2 int", "tx_1_c3 int64"]
data: |
aa, 20, NULL, NULL, NULL, NULL, NULL
bb, 30, bb, 30, 3000, 34, 131
cc, 40, NULL, NULL, NULL, NULL, NULL
dd, 50, NULL, NULL, NULL, NULL, NULL
- id: 8
desc: lastjoin(leftjoin(filter<optimized>, filter<optimized>))
inputs:
- name: t1
columns: ["c1 string","c2 int","c4 timestamp"]
indexs: ["index1:c1:c4"]
rows:
- ["aa",20,1000]
- ["bb",30,1000]
- ["cc",40,1000]
- name: t2
columns: ["c1 string", "c2 int", "c4 timestamp"]
indexs: ["index1:c1:c4", "index2:c2:c4"]
rows:
- ["bb",20, 1000]
- ["aa",20, 2000]
- ["bb",30, 3000]
- ["cc",40, 4000]
- name: t3
columns: ["c1 string","c2 int","c3 bigint","c4 timestamp"]
indexs: ["index1:c1:c4"]
rows:
- ["aa",19,13,3000]
- ["bb",34,131,3000]
sql: |
select
t1.c1,
t1.c2,
tx.*
from t1 last join
(
select t2.c1 as tx_0_c1,
t2.c2 as tx_0_c2,
t2.c4 as tx_0_c4,
t3.c2 as tx_1_c2,
t3.c3 as tx_1_c3
from (select * from t2 where c2 = 20) t2 left join (select * from t3 where c1 = 'bb') t3
on t2.c1 = t3.c1
) tx
on t1.c2 = tx.tx_0_c2 and not isnull(tx.tx_1_c2)
cluster_request_plan: |
SIMPLE_PROJECT(sources=(t1.c1, t1.c2, tx.tx_0_c1, tx.tx_0_c2, tx.tx_0_c4, tx.tx_1_c2, tx.tx_1_c3))
REQUEST_JOIN(type=kJoinTypeConcat)
DATA_PROVIDER(request=t1)
REQUEST_JOIN(OUTPUT_RIGHT_ONLY, type=LastJoin, condition=NOT isnull(#89), left_keys=(#5), right_keys=(#8), index_keys=)
SIMPLE_PROJECT(sources=(#5 -> t1.c2))
DATA_PROVIDER(request=t1)
RENAME(name=tx)
SIMPLE_PROJECT(sources=(t2.c1 -> tx_0_c1, t2.c2 -> tx_0_c2, t2.c4 -> tx_0_c4, t3.c2 -> tx_1_c2, t3.c3 -> tx_1_c3))
REQUEST_JOIN(type=LeftJoin, condition=, left_keys=(t2.c1), right_keys=(t3.c1), index_keys=)
RENAME(name=t2)
FILTER_BY(condition=, left_keys=(), right_keys=(), index_keys=(20))
DATA_PROVIDER(type=Partition, table=t2, index=index2)
RENAME(name=t3)
FILTER_BY(condition=, left_keys=(), right_keys=(), index_keys=(bb))
DATA_PROVIDER(type=Partition, table=t3, index=index1)
expect:
order: c1
columns: ["c1 string", "c2 int", "tx_0_c1 string", "tx_0_c2 int", "tx_0_c4 timestamp", "tx_1_c2 int", "tx_1_c3 int64"]
data: |
aa, 20, bb, 20, 1000, 34, 131
bb, 30, NULL, NULL, NULL, NULL, NULL
cc, 40, NULL, NULL, NULL, NULL, NULL
2 changes: 1 addition & 1 deletion hybridse/include/vm/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ class PartitionHandler : public TableHandler {

// Return the iterator of row iterator
// Return null by default
RowIterator* GetRawIterator() { return nullptr; }
RowIterator* GetRawIterator() override { return nullptr; }

Check warning on line 336 in hybridse/include/vm/catalog.h

View check run for this annotation

Codecov / codecov/patch

hybridse/include/vm/catalog.h#L336

Added line #L336 was not covered by tests

std::unique_ptr<WindowIterator> GetWindowIterator(const std::string& idx_name) override {

Check warning on line 338 in hybridse/include/vm/catalog.h

View check run for this annotation

Codecov / codecov/patch

hybridse/include/vm/catalog.h#L338

Added line #L338 was not covered by tests
return std::unique_ptr<WindowIterator>();
Expand Down
12 changes: 10 additions & 2 deletions hybridse/include/vm/physical_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -1678,14 +1678,22 @@ class PhysicalFilterNode : public PhysicalUnaryNode {
public:
PhysicalFilterNode(PhysicalOpNode *node, const node::ExprNode *condition)
: PhysicalUnaryNode(node, kPhysicalOpFilter, true), filter_(condition) {
output_type_ = node->GetOutputType();
if (node->GetOutputType() == kSchemaTypeGroup && filter_.index_key_.ValidKey()) {
output_type_ = kSchemaTypeTable;

Check warning on line 1682 in hybridse/include/vm/physical_op.h

View check run for this annotation

Codecov / codecov/patch

hybridse/include/vm/physical_op.h#L1682

Added line #L1682 was not covered by tests
} else {
output_type_ = node->GetOutputType();
}

fn_infos_.push_back(&filter_.condition_.fn_info());
fn_infos_.push_back(&filter_.index_key_.fn_info());
}
PhysicalFilterNode(PhysicalOpNode *node, Filter filter)
: PhysicalUnaryNode(node, kPhysicalOpFilter, true), filter_(filter) {
output_type_ = node->GetOutputType();
if (node->GetOutputType() == kSchemaTypeGroup && filter_.index_key_.ValidKey()) {
output_type_ = kSchemaTypeTable;
} else {
output_type_ = node->GetOutputType();
}

fn_infos_.push_back(&filter_.condition_.fn_info());
fn_infos_.push_back(&filter_.index_key_.fn_info());
Expand Down
5 changes: 2 additions & 3 deletions hybridse/src/vm/runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -731,10 +731,9 @@ std::shared_ptr<DataHandler> RequestJoinRunner::Run(
auto left_part = std::dynamic_pointer_cast<PartitionHandler>(left);
auto right_part = std::dynamic_pointer_cast<PartitionHandler>(right);
return join_gen_->LazyJoinOptimized(left_part, right_part, ctx.GetParameterRow());
} else {
return join_gen_->LazyJoin(left, right, ctx.GetParameterRow());
}

LOG(WARNING) << "skip due to performance: left source of request join is table handler (unoptimized)";
return std::shared_ptr<DataHandler>();
}

std::shared_ptr<DataHandler> JoinRunner::Run(RunnerContext& ctx,
Expand Down
54 changes: 21 additions & 33 deletions hybridse/src/vm/runner_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,11 @@
*/

#include "vm/runner_builder.h"
#include "vm/physical_op.h"

namespace hybridse {
namespace vm {

static bool IsPartitionProvider(vm::PhysicalOpNode* n) {
switch (n->GetOpType()) {
case kPhysicalOpSimpleProject:
case kPhysicalOpRename:
case kPhysicalOpRequestJoin:
return IsPartitionProvider(n->GetProducer(0));
case kPhysicalOpDataProvider:
return dynamic_cast<vm::PhysicalDataProviderNode*>(n)->provider_type_ == kProviderTypePartition;
default:
return false;
}
}

static vm::PhysicalDataProviderNode* request_node(vm::PhysicalOpNode* n) {
switch (n->GetOpType()) {
case kPhysicalOpDataProvider:
Expand Down Expand Up @@ -297,13 +285,12 @@ ClusterTask RunnerBuilder::Build(PhysicalOpNode* node, Status& status) {
}
}
if (support_cluster_optimized_) {
if (IsPartitionProvider(node->GetProducer(0))) {
if (node->GetOutputType() == kSchemaTypeGroup) {
// route by index of the left source, and it should uncompleted
auto& route_info = left_task.GetRouteInfo();
runner->AddProducer(left_task.GetRoot());
runner->AddProducer(right_task.GetRoot());
return RegisterTask(node,
UnCompletedClusterTask(runner, route_info.table_handler_, route_info.index_));
return RegisterTask(node, ClusterTask(runner, {}, route_info));
}
}
return RegisterTask(node, BinaryInherit(left_task, right_task, runner, index_key, kRightBias));
Expand Down Expand Up @@ -338,22 +325,25 @@ ClusterTask RunnerBuilder::Build(PhysicalOpNode* node, Status& status) {
op->output_right_only());

if (support_cluster_optimized_) {
if (IsPartitionProvider(node->GetProducer(0))) {
// Partion left join partition, route by index of the left source, and it should uncompleted
auto& route_info = left_task.GetRouteInfo();
runner->AddProducer(left_task.GetRoot());
runner->AddProducer(right_task.GetRoot());
return RegisterTask(
node, UnCompletedClusterTask(runner, route_info.table_handler_, route_info.index_));
}

if (right_task.IsCompletedClusterTask() && right_task.GetRouteInfo().lazy_route_ &&
!op->join_.index_key_.ValidKey()) {
// join (.., filter<optimized>)
auto& route_info = right_task.GetRouteInfo();
if (node->GetOutputType() == kSchemaTypeRow) {
// complete cluster task from right
if (op->join().index_key().ValidKey()) {
// optimize key in this node
return RegisterTask(node, BinaryInherit(left_task, right_task, runner,
op->join().index_key(), kLeftBias));
} else {
// optimize happens before, in left node
auto right_route_info = right_task.GetRouteInfo();
runner->AddProducer(left_task.GetRoot());
runner->AddProducer(right_task.GetRoot());
return RegisterTask(node, ClusterTask(runner, {}, right_route_info));
}
} else {
// uncomplete/lazify cluster task from left
auto left_route_info = left_task.GetRouteInfo();
runner->AddProducer(left_task.GetRoot());
runner->AddProducer(right_task.GetRoot());
return RegisterTask(node, ClusterTask(runner, {}, route_info));
return RegisterTask(node, ClusterTask(runner, {}, left_route_info));
}
}

Expand All @@ -372,9 +362,7 @@ ClusterTask RunnerBuilder::Build(PhysicalOpNode* node, Status& status) {
}

// concat join (any(tx), any(tx)), tx is not request table
auto left = request_node(node->GetProducer(0));
// auto right = request_node(node->GetProducer(1));
if (left->provider_type_ == kProviderTypePartition) {
if (node->GetOutputType() != kSchemaTypeRow) {
runner->AddProducer(left_task.GetRoot());
runner->AddProducer(right_task.GetRoot());
return RegisterTask(node, ClusterTask(runner, {}, left_task.GetRouteInfo()));
Expand Down

0 comments on commit 927744e

Please sign in to comment.