Skip to content

Commit

Permalink
feat(sql): support left join
Browse files Browse the repository at this point in the history
  • Loading branch information
aceforeverd committed Oct 31, 2023
1 parent d5bb465 commit 1437ce9
Show file tree
Hide file tree
Showing 17 changed files with 573 additions and 34 deletions.
64 changes: 63 additions & 1 deletion cases/plan/join_query.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,69 @@ cases:
- id: 2
desc: 简单SELECT LEFT JOIN
sql: SELECT t1.COL1, t1.COL2, t2.COL1, t2.COL2 FROM t1 left join t2 on t1.col1 = t2.col2;
mode: physical-plan-unsupport
expect:
node_tree_str: |
+-node[kQuery]: kQuerySelect
+-distinct_opt: false
+-where_expr: null
+-group_expr_list: null
+-having_expr: null
+-order_expr_list: null
+-limit: null
+-select_list[list]:
| +-0:
| | +-node[kResTarget]
| | +-val:
| | | +-expr[column ref]
| | | +-relation_name: t1
| | | +-column_name: COL1
| | +-name: <nil>
| +-1:
| | +-node[kResTarget]
| | +-val:
| | | +-expr[column ref]
| | | +-relation_name: t1
| | | +-column_name: COL2
| | +-name: <nil>
| +-2:
| | +-node[kResTarget]
| | +-val:
| | | +-expr[column ref]
| | | +-relation_name: t2
| | | +-column_name: COL1
| | +-name: <nil>
| +-3:
| +-node[kResTarget]
| +-val:
| | +-expr[column ref]
| | +-relation_name: t2
| | +-column_name: COL2
| +-name: <nil>
+-tableref_list[list]:
| +-0:
| +-node[kTableRef]: kJoin
| +-join_type: LeftJoin
| +-left:
| | +-node[kTableRef]: kTable
| | +-table: t1
| | +-alias: <nil>
| +-right:
| +-node[kTableRef]: kTable
| +-table: t2
| +-alias: <nil>
| +-order_expressions: null
| +-on:
| +-expr[binary]
| +-=[list]:
| +-0:
| | +-expr[column ref]
| | +-relation_name: t1
| | +-column_name: col1
| +-1:
| +-expr[column ref]
| +-relation_name: t2
| +-column_name: col2
+-window_list: []
- id: 3
desc: 简单SELECT LAST JOIN
sql: SELECT t1.COL1, t1.COL2, t2.COL1, t2.COL2 FROM t1 last join t2 order by t2.col5 on t1.col1 = t2.col2;
Expand Down
68 changes: 68 additions & 0 deletions cases/query/left_join.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
cases:
- id: 0
desc: last join to a left join subquery
inputs:
- name: t1
columns: ["c1 string","c2 int","c4 timestamp"]
indexs: ["index1:c1:c4"]
rows:
- ["aa",20,1000]
- ["bb",30,1000]
- ["cc",40,1000]
- ["dd",50,1000]
- name: t2
columns: ["c1 string","c4 timestamp"]
indexs: ["index1:c1:c4"]
rows:
- ["aa",2000]
- ["bb",2000]
- ["cc",3000]
- name: t3
columns: ["c1 string","c2 int","c3 bigint","c4 timestamp"]
indexs: ["index1:c1:c4"]
rows:
- ["aa",19,13,3000]
- ["aa",21,13,3000]
- ["bb",34,131,3000]
- ["bb",21,131,3000]
sql: |
select
t1.c1,
tx.c1 as c1l,
tx.c1r,
tx.c2r
from t1 last join
(
select t2.c1 as c1,
t3.c1 as c1r,
t3.c2 as c2r
from t2 left join t3
on t2.c1 = t3.c1
) tx
on t1.c1 = tx.c1 and t1.c2 > tx.c2r
batch_plan: |
SIMPLE_PROJECT(sources=(t1.c1, tx.c1 -> c1l, tx.c1r, tx.c2r))
JOIN(type=LastJoin, condition=t1.c2 > tx.c2r, left_keys=(), right_keys=(), index_keys=(t1.c1))
DATA_PROVIDER(table=t1)
RENAME(name=tx)
SIMPLE_PROJECT(sources=(t2.c1, t3.c1 -> c1r, t3.c2 -> c2r))
JOIN(type=LeftJoin, condition=, left_keys=(), right_keys=(), index_keys=(t2.c1))
DATA_PROVIDER(type=Partition, table=t2, index=index1)
DATA_PROVIDER(type=Partition, table=t3, index=index1)
request_plan: |
SIMPLE_PROJECT(sources=(t1.c1, tx.c1 -> c1l, tx.c1r, tx.c2r))
REQUEST_JOIN(type=LastJoin, condition=t1.c2 > tx.c2r, left_keys=(), right_keys=(), index_keys=(t1.c1))
DATA_PROVIDER(request=t1)
RENAME(name=tx)
SIMPLE_PROJECT(sources=(t2.c1, t3.c1 -> c1r, t3.c2 -> c2r))
REQUEST_JOIN(type=LeftJoin, condition=, left_keys=(), right_keys=(), index_keys=(t2.c1))
DATA_PROVIDER(type=Partition, table=t2, index=index1)
DATA_PROVIDER(type=Partition, table=t3, index=index1)
expect:
order: c1
columns: ["c1 string", "c1l string", "c1r string", "c2r int"]
data: |
aa, aa, aa, 19
bb, bb, bb, 21
cc, NULL, NULL, NULL
dd, NULL, NULL, NULL
3 changes: 3 additions & 0 deletions hybridse/include/codec/fe_row_codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ class RowView {
const Schema* GetSchema() const { return &schema_; }

inline bool IsNULL(const int8_t* row, uint32_t idx) const {
if (row == nullptr) {
return true;
}
const int8_t* ptr = row + HEADER_LENGTH + (idx >> 3);
return *(reinterpret_cast<const uint8_t*>(ptr)) & (1 << (idx & 0x07));
}
Expand Down
2 changes: 1 addition & 1 deletion hybridse/include/node/node_enum.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ enum JoinType {
kJoinTypeRight,
kJoinTypeInner,
kJoinTypeConcat,
kJoinTypeComma
kJoinTypeCross, // AKA commma join
};

enum UnionType { kUnionTypeDistinct, kUnionTypeAll };
Expand Down
89 changes: 67 additions & 22 deletions hybridse/include/vm/physical_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,7 @@ class PhysicalConstProjectNode : public PhysicalOpNode {
public:
explicit PhysicalConstProjectNode(const ColumnProjects &project)
: PhysicalOpNode(kPhysicalOpConstProject, true), project_(project) {
output_type_ = kSchemaTypeRow;
fn_infos_.push_back(&project_.fn_info());
}
virtual ~PhysicalConstProjectNode() {}
Expand Down Expand Up @@ -1183,23 +1184,25 @@ class PhysicalWindowAggrerationNode : public PhysicalProjectNode {

class PhysicalJoinNode : public PhysicalBinaryNode {
public:
static constexpr PhysicalOpType kConcreteNodeKind = kPhysicalOpJoin;

PhysicalJoinNode(PhysicalOpNode *left, PhysicalOpNode *right,
const node::JoinType join_type)
: PhysicalBinaryNode(left, right, kPhysicalOpJoin, false),
: PhysicalBinaryNode(left, right, kConcreteNodeKind, false),
join_(join_type),
joined_schemas_ctx_(this),
output_right_only_(false) {
output_type_ = left->GetOutputType();
InitOuptput();
}
PhysicalJoinNode(PhysicalOpNode *left, PhysicalOpNode *right,
const node::JoinType join_type,
const node::OrderByNode *orders,
const node::ExprNode *condition)
: PhysicalBinaryNode(left, right, kPhysicalOpJoin, false),
: PhysicalBinaryNode(left, right, kConcreteNodeKind, false),
join_(join_type, orders, condition),
joined_schemas_ctx_(this),
output_right_only_(false) {
output_type_ = left->GetOutputType();
InitOuptput();

RegisterFunctionInfo();
}
Expand All @@ -1208,11 +1211,11 @@ class PhysicalJoinNode : public PhysicalBinaryNode {
const node::ExprNode *condition,
const node::ExprListNode *left_keys,
const node::ExprListNode *right_keys)
: PhysicalBinaryNode(left, right, kPhysicalOpJoin, false),
: PhysicalBinaryNode(left, right, kConcreteNodeKind, false),
join_(join_type, condition, left_keys, right_keys),
joined_schemas_ctx_(this),
output_right_only_(false) {
output_type_ = left->GetOutputType();
InitOuptput();

RegisterFunctionInfo();
}
Expand All @@ -1222,31 +1225,31 @@ class PhysicalJoinNode : public PhysicalBinaryNode {
const node::ExprNode *condition,
const node::ExprListNode *left_keys,
const node::ExprListNode *right_keys)
: PhysicalBinaryNode(left, right, kPhysicalOpJoin, false),
: PhysicalBinaryNode(left, right, kConcreteNodeKind, false),
join_(join_type, orders, condition, left_keys, right_keys),
joined_schemas_ctx_(this),
output_right_only_(false) {
output_type_ = left->GetOutputType();
InitOuptput();

RegisterFunctionInfo();
}
PhysicalJoinNode(PhysicalOpNode *left, PhysicalOpNode *right,
const Join &join)
: PhysicalBinaryNode(left, right, kPhysicalOpJoin, false),
: PhysicalBinaryNode(left, right, kConcreteNodeKind, false),
join_(join),
joined_schemas_ctx_(this),
output_right_only_(false) {
output_type_ = left->GetOutputType();
InitOuptput();

RegisterFunctionInfo();
}
PhysicalJoinNode(PhysicalOpNode *left, PhysicalOpNode *right,
const Join &join, const bool output_right_only)
: PhysicalBinaryNode(left, right, kPhysicalOpJoin, false),
: PhysicalBinaryNode(left, right, kConcreteNodeKind, false),
join_(join),
joined_schemas_ctx_(this),
output_right_only_(output_right_only) {
output_type_ = left->GetOutputType();
InitOuptput();

RegisterFunctionInfo();
}
Expand Down Expand Up @@ -1275,37 +1278,59 @@ class PhysicalJoinNode : public PhysicalBinaryNode {
Join join_;
SchemasContext joined_schemas_ctx_;
const bool output_right_only_;

private:
void InitOuptput() {
switch (join_.join_type_) {
case node::kJoinTypeLast:
case node::kJoinTypeConcat: {
output_type_ = GetProducer(0)->GetOutputType();
break;
}
default: {
// standard SQL JOINs, always treat as a table output
if (GetProducer(0)->GetOutputType() == kSchemaTypeGroup) {
output_type_ = kSchemaTypeGroup;
} else {
output_type_ = kSchemaTypeTable;
}
break;
}
}
}
};

class PhysicalRequestJoinNode : public PhysicalBinaryNode {
public:
static constexpr PhysicalOpType kConcreteNodeKind = kPhysicalOpRequestJoin;

PhysicalRequestJoinNode(PhysicalOpNode *left, PhysicalOpNode *right,
const node::JoinType join_type)
: PhysicalBinaryNode(left, right, kPhysicalOpRequestJoin, false),
: PhysicalBinaryNode(left, right, kConcreteNodeKind, false),
join_(join_type),
joined_schemas_ctx_(this),
output_right_only_(false) {
output_type_ = left->GetOutputType();
InitOuptput();
RegisterFunctionInfo();
}
PhysicalRequestJoinNode(PhysicalOpNode *left, PhysicalOpNode *right,
const node::JoinType join_type,
const node::OrderByNode *orders,
const node::ExprNode *condition)
: PhysicalBinaryNode(left, right, kPhysicalOpRequestJoin, false),
: PhysicalBinaryNode(left, right, kConcreteNodeKind, false),
join_(join_type, orders, condition),
joined_schemas_ctx_(this),
output_right_only_(false) {
output_type_ = left->GetOutputType();
InitOuptput();
RegisterFunctionInfo();
}
PhysicalRequestJoinNode(PhysicalOpNode *left, PhysicalOpNode *right,
const Join &join, const bool output_right_only)
: PhysicalBinaryNode(left, right, kPhysicalOpRequestJoin, false),
: PhysicalBinaryNode(left, right, kConcreteNodeKind, false),
join_(join),
joined_schemas_ctx_(this),
output_right_only_(output_right_only) {
output_type_ = left->GetOutputType();
InitOuptput();
RegisterFunctionInfo();
}

Expand All @@ -1315,11 +1340,11 @@ class PhysicalRequestJoinNode : public PhysicalBinaryNode {
const node::ExprNode *condition,
const node::ExprListNode *left_keys,
const node::ExprListNode *right_keys)
: PhysicalBinaryNode(left, right, kPhysicalOpRequestJoin, false),
: PhysicalBinaryNode(left, right, kConcreteNodeKind, false),
join_(join_type, condition, left_keys, right_keys),
joined_schemas_ctx_(this),
output_right_only_(false) {
output_type_ = left->GetOutputType();
InitOuptput();
RegisterFunctionInfo();
}
PhysicalRequestJoinNode(PhysicalOpNode *left, PhysicalOpNode *right,
Expand All @@ -1328,11 +1353,11 @@ class PhysicalRequestJoinNode : public PhysicalBinaryNode {
const node::ExprNode *condition,
const node::ExprListNode *left_keys,
const node::ExprListNode *right_keys)
: PhysicalBinaryNode(left, right, kPhysicalOpRequestJoin, false),
: PhysicalBinaryNode(left, right, kConcreteNodeKind, false),
join_(join_type, orders, condition, left_keys, right_keys),
joined_schemas_ctx_(this),
output_right_only_(false) {
output_type_ = left->GetOutputType();
InitOuptput();
RegisterFunctionInfo();
}

Expand Down Expand Up @@ -1363,6 +1388,26 @@ class PhysicalRequestJoinNode : public PhysicalBinaryNode {
Join join_;
SchemasContext joined_schemas_ctx_;
const bool output_right_only_;

private:
void InitOuptput() {
switch (join_.join_type_) {
case node::kJoinTypeLast:
case node::kJoinTypeConcat: {
output_type_ = GetProducer(0)->GetOutputType();
break;
}
default: {
// standard SQL JOINs, always treat as a table output
if (GetProducer(0)->GetOutputType() == kSchemaTypeGroup) {
output_type_ = kSchemaTypeGroup;
} else {
output_type_ = kSchemaTypeTable;
}
break;
}
}
}
};

class PhysicalUnionNode : public PhysicalBinaryNode {
Expand Down
2 changes: 1 addition & 1 deletion hybridse/src/base/fe_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ void RefCountedSlice::Release() {
if (this->ref_cnt_ != nullptr) {
auto& cnt = *this->ref_cnt_;
cnt -= 1;
if (cnt == 0) {
if (cnt == 0 && buf() != nullptr) {
// memset in case the buf is still used after free
memset(buf(), 0, size());
free(buf());
Expand Down
3 changes: 3 additions & 0 deletions hybridse/src/passes/physical/batch_request_optimize_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ INSTANTIATE_TEST_SUITE_P(
INSTANTIATE_TEST_SUITE_P(
BatchRequestLastJoinQuery, BatchRequestOptimizeTest,
testing::ValuesIn(sqlcase::InitCases("cases/query/last_join_query.yaml")));
INSTANTIATE_TEST_SUITE_P(
BatchRequestLeftJoin, BatchRequestOptimizeTest,
testing::ValuesIn(sqlcase::InitCases("cases/query/left_join.yml")));
INSTANTIATE_TEST_SUITE_P(
BatchRequestLastJoinWindowQuery, BatchRequestOptimizeTest,
testing::ValuesIn(sqlcase::InitCases("cases/query/last_join_window_query.yaml")));
Expand Down
Loading

0 comments on commit 1437ce9

Please sign in to comment.