Skip to content

Commit

Permalink
executor: rewrite merge join function
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass authored and BohuTANG committed May 17, 2019
1 parent 138118c commit e730dd0
Showing 1 changed file with 25 additions and 30 deletions.
55 changes: 25 additions & 30 deletions src/executor/merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"sync"

"github.com/xelabs/go-mysqlstack/sqlparser"
"github.com/xelabs/go-mysqlstack/sqlparser/depends/common"
"github.com/xelabs/go-mysqlstack/sqlparser/depends/sqltypes"
)

Expand Down Expand Up @@ -45,8 +44,8 @@ func sortMergeJoin(lres, rres, res *sqltypes.Result, node *planner.JoinNode) {

// mergeJoin used to join the sorted results.
func mergeJoin(lres, rres, res *sqltypes.Result, node *planner.JoinNode) {
lrows, lidx, lstr := fetchSameKeyRows(lres.Rows, node.LeftKeys, 0, "")
rrows, ridx, rstr := fetchSameKeyRows(rres.Rows, node.RightKeys, 0, "")
lrows, lidx := fetchSameKeyRows(lres.Rows, node.LeftKeys, 0)
rrows, ridx := fetchSameKeyRows(rres.Rows, node.RightKeys, 0)
for lrows != nil {
if rrows == nil {
concatLeftAndNil(lres.Rows[lidx-len(lrows):], node, res)
Expand All @@ -72,55 +71,51 @@ func mergeJoin(lres, rres, res *sqltypes.Result, node *planner.JoinNode) {
} else {
concatLeftAndRight(lrows, rrows, node, res)
}
lrows, lidx, lstr = fetchSameKeyRows(lres.Rows, node.LeftKeys, lidx, lstr)
rrows, ridx, rstr = fetchSameKeyRows(rres.Rows, node.RightKeys, ridx, rstr)
lrows, lidx = fetchSameKeyRows(lres.Rows, node.LeftKeys, lidx)
rrows, ridx = fetchSameKeyRows(rres.Rows, node.RightKeys, ridx)
} else if cmp > 0 {
rrows, ridx, rstr = fetchSameKeyRows(rres.Rows, node.RightKeys, ridx, rstr)
rrows, ridx = fetchSameKeyRows(rres.Rows, node.RightKeys, ridx)
} else {
concatLeftAndNil(lrows, node, res)
lrows, lidx, lstr = fetchSameKeyRows(lres.Rows, node.LeftKeys, lidx, lstr)
lrows, lidx = fetchSameKeyRows(lres.Rows, node.LeftKeys, lidx)
}
}
}

// fetchSameKeyRows used to fetch the same joinkey values' rows.
func fetchSameKeyRows(rows [][]sqltypes.Value, joins []planner.JoinKey, index int, str string) ([][]sqltypes.Value, int, string) {
func fetchSameKeyRows(rows [][]sqltypes.Value, joins []planner.JoinKey, index int) ([][]sqltypes.Value, int) {
var chunk [][]sqltypes.Value
var key string
if index >= len(rows) {
return nil, index, ""
return nil, index
}

if len(joins) == 0 {
return rows, len(rows), ""
return rows, len(rows)
}

if str == "" {
keySlice := []byte{0x01}
for _, join := range joins {
keySlice = append(keySlice, rows[index][join.Index].Raw()...)
keySlice = append(keySlice, 0x02)
}
str = common.BytesToString(keySlice)
}
chunk = append(chunk, rows[index])
current := rows[index]
chunk = append(chunk, current)
index++

for index < len(rows) {
keySlice := []byte{0x01}
for _, join := range joins {
keySlice = append(keySlice, rows[index][join.Index].Raw()...)
keySlice = append(keySlice, 0x02)
}
key = common.BytesToString(keySlice)

if key != str {
equal := keysEqual(current, rows[index], joins)
if !equal {
break
}

chunk = append(chunk, rows[index])
index++
}
return chunk, index, key
return chunk, index
}

func keysEqual(row1, row2 []sqltypes.Value, joins []planner.JoinKey) bool {
for _, join := range joins {
cmp := sqltypes.NullsafeCompare(row1[join.Index], row2[join.Index])
if cmp != 0 {
return false
}
}
return true
}

// concatLeftAndRight used to concat thle left and right results, handle otherJoinOn|rightNull|OtherFilter.
Expand Down

0 comments on commit e730dd0

Please sign in to comment.