Skip to content

Commit

Permalink
executor: reuse chunk in hash join v2 during restoring (#56936) (#58018)
Browse files Browse the repository at this point in the history
close #56828
  • Loading branch information
ti-chi-bot authored Dec 24, 2024
1 parent 4fc3123 commit ef0a693
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 42 deletions.
15 changes: 15 additions & 0 deletions pkg/executor/join/base_join_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func (j *baseJoinProbe) SetChunkForProbe(chk *chunk.Chunk) (err error) {
return errors.New("Previous chunk is not probed yet")
}
}

j.currentChunk = chk
logicalRows := chk.NumRows()
// if chk.sel != nil, then physicalRows is different from logicalRows
Expand Down Expand Up @@ -312,6 +313,20 @@ func (j *baseJoinProbe) SetChunkForProbe(chk *chunk.Chunk) (err error) {
}

func (j *baseJoinProbe) SetRestoredChunkForProbe(chk *chunk.Chunk) error {
defer func() {
if j.ctx.spillHelper.areAllPartitionsSpilled() {
// We will not call `Probe` function when all partitions are spilled.
// So it's necessary to manually set `currentProbeRow` to avoid check fail.
j.currentProbeRow = j.chunkRows
}
}()

if j.currentChunk != nil {
if j.currentProbeRow < j.chunkRows {
return errors.New("Previous chunk is not probed yet")
}
}

hashValueCol := chk.Column(0)
serializedKeysCol := chk.Column(1)
colNum := chk.NumCols()
Expand Down
17 changes: 17 additions & 0 deletions pkg/executor/join/hash_join_spill_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type hashJoinSpillHelper struct {
spillTriggedInBuildingStageForTest bool
spillTriggeredBeforeBuildingHashTableForTest bool
allPartitionsSpilledForTest bool
skipProbeInRestoreForTest bool
}

func newHashJoinSpillHelper(hashJoinExec *HashJoinV2Exec, partitionNum int, probeFieldTypes []*types.FieldType) *hashJoinSpillHelper {
Expand Down Expand Up @@ -373,6 +374,18 @@ func (h *hashJoinSpillHelper) init() {

h.buildRowsInDisk = make([][]*chunk.DataInDiskByChunks, h.hashJoinExec.Concurrency)
h.probeRowsInDisk = make([][]*chunk.DataInDiskByChunks, h.hashJoinExec.Concurrency)

for _, worker := range h.hashJoinExec.BuildWorkers {
if worker.restoredChkBuf == nil {
worker.restoredChkBuf = chunk.NewEmptyChunk(h.buildSpillChkFieldTypes)
}
}

for _, worker := range h.hashJoinExec.ProbeWorkers {
if worker.restoredChkBuf == nil {
worker.restoredChkBuf = chunk.NewEmptyChunk(h.probeSpillFieldTypes)
}
}
}
}

Expand Down Expand Up @@ -540,6 +553,10 @@ func (h *hashJoinSpillHelper) initTmpSpillBuildSideChunks() {
}
}

func (h *hashJoinSpillHelper) isProbeSkippedInRestoreForTest() bool {
return h.skipProbeInRestoreForTest
}

func (h *hashJoinSpillHelper) isRespillTriggeredForTest() bool {
return h.spillRoundForTest > 1
}
Expand Down
27 changes: 15 additions & 12 deletions pkg/executor/join/hash_join_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/disk"
"github.com/pingcap/tidb/pkg/util/execdetails"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/memory"
)

Expand Down Expand Up @@ -359,6 +360,8 @@ type ProbeWorkerV2 struct {
// We build individual joinProbe for each join worker when use chunk-based
// execution, to avoid the concurrency of joiner.chk and joiner.selected.
JoinProbe ProbeV2

restoredChkBuf *chunk.Chunk
}

func (w *ProbeWorkerV2) updateProbeStatistic(start time.Time, probeTime int64) {
Expand Down Expand Up @@ -392,8 +395,7 @@ func (w *ProbeWorkerV2) restoreAndProbe(inDisk *chunk.DataInDiskByChunks) {
}
failpoint.Inject("ConsumeRandomPanic", nil)

// TODO reuse chunk
chk, err := inDisk.GetChunk(i)
err := inDisk.FillChunk(i, w.restoredChkBuf)
if err != nil {
joinResult.err = err
break
Expand All @@ -407,7 +409,7 @@ func (w *ProbeWorkerV2) restoreAndProbe(inDisk *chunk.DataInDiskByChunks) {

start := time.Now()
waitTime := int64(0)
ok, waitTime, joinResult = w.processOneRestoredProbeChunk(chk, joinResult)
ok, waitTime, joinResult = w.processOneRestoredProbeChunk(joinResult)
probeTime += int64(time.Since(start)) - waitTime
if !ok {
break
Expand All @@ -434,6 +436,7 @@ type BuildWorkerV2 struct {
HasNullableKey bool
WorkerID uint
builder *rowTableBuilder
restoredChkBuf *chunk.Chunk
}

func (b *BuildWorkerV2) getSegmentsInRowTable(partID int) []*rowTableSegment {
Expand All @@ -449,9 +452,9 @@ func (b *BuildWorkerV2) updatePartitionData(cost int64) {
setMaxValue(&b.HashJoinCtx.stats.maxPartitionData, cost)
}

func (b *BuildWorkerV2) processOneRestoredChunk(chk *chunk.Chunk, cost *int64) error {
func (b *BuildWorkerV2) processOneRestoredChunk(cost *int64) error {
start := time.Now()
err := b.builder.processOneRestoredChunk(chk, b.HashJoinCtx, int(b.WorkerID), int(b.HashJoinCtx.partitionNumber))
err := b.builder.processOneRestoredChunk(b.restoredChkBuf, b.HashJoinCtx, int(b.WorkerID), int(b.HashJoinCtx.partitionNumber))
if err != nil {
return err
}
Expand All @@ -476,10 +479,7 @@ func (b *BuildWorkerV2) splitPartitionAndAppendToRowTableForRestoreImpl(i int, i
return nil
}

var chk *chunk.Chunk

// TODO reuse chunk
chk, err = inDisk.GetChunk(i)
err = inDisk.FillChunk(i, b.restoredChkBuf)
if err != nil {
return err
}
Expand All @@ -489,7 +489,7 @@ func (b *BuildWorkerV2) splitPartitionAndAppendToRowTableForRestoreImpl(i int, i
return err
}

err = b.processOneRestoredChunk(chk, cost)
err = b.processOneRestoredChunk(cost)
if err != nil {
return err
}
Expand Down Expand Up @@ -914,8 +914,8 @@ func (w *ProbeWorkerV2) scanRowTableAfterProbeDone() {
}
}

func (w *ProbeWorkerV2) processOneRestoredProbeChunk(probeChunk *chunk.Chunk, joinResult *hashjoinWorkerResult) (ok bool, waitTime int64, _ *hashjoinWorkerResult) {
joinResult.err = w.JoinProbe.SetRestoredChunkForProbe(probeChunk)
func (w *ProbeWorkerV2) processOneRestoredProbeChunk(joinResult *hashjoinWorkerResult) (ok bool, waitTime int64, _ *hashjoinWorkerResult) {
joinResult.err = w.JoinProbe.SetRestoredChunkForProbe(w.restoredChkBuf)
if joinResult.err != nil {
return false, 0, joinResult
}
Expand All @@ -932,6 +932,9 @@ func (w *ProbeWorkerV2) processOneProbeChunk(probeChunk *chunk.Chunk, joinResult

func (w *ProbeWorkerV2) probeAndSendResult(joinResult *hashjoinWorkerResult) (bool, int64, *hashjoinWorkerResult) {
if w.HashJoinCtx.spillHelper.areAllPartitionsSpilled() {
if intest.InTest && w.HashJoinCtx.spillHelper.hashJoinExec.inRestore {
w.HashJoinCtx.spillHelper.skipProbeInRestoreForTest = true
}
return true, 0, joinResult
}

Expand Down
17 changes: 9 additions & 8 deletions pkg/executor/join/inner_join_spill_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func testInnerJoinSpillCase4(t *testing.T, ctx *mock.Context, expectedResult []c
require.True(t, hashJoinExec.spillHelper.isSpillTriggedInBuildingStageForTest())
require.True(t, hashJoinExec.spillHelper.areAllPartitionsSpilledForTest())
require.True(t, hashJoinExec.spillHelper.isRespillTriggeredForTest())
require.True(t, hashJoinExec.spillHelper.isProbeSkippedInRestoreForTest())
checkResults(t, retTypes, result, expectedResult)
}

Expand Down Expand Up @@ -264,14 +265,14 @@ func TestInnerJoinSpillBasic(t *testing.T) {

params := []spillTestParam{
// Normal case
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
// rightUsed is empty
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 750000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 250000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
// leftUsed is empty
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 750000, 10000}},
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 250000, 10000}},
}

err := failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/slowWorkers", `return(true)`)
Expand Down Expand Up @@ -319,8 +320,8 @@ func TestInnerJoinSpillWithOtherCondition(t *testing.T) {
otherCondition = append(otherCondition, sf)

params := []spillTestParam{
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 500000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 500000, 10000}},
}

err = failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/slowWorkers", `return(true)`)
Expand Down
20 changes: 10 additions & 10 deletions pkg/executor/join/outer_join_spill_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,14 @@ func TestOuterJoinSpillBasic(t *testing.T) {

params := []spillTestParam{
// Normal case
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
// rightUsed is empty
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 750000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 250000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
// leftUsed is empty
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 750000, 10000}},
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 250000, 10000}},
}

err := failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/slowWorkers", `return(true)`)
Expand Down Expand Up @@ -146,8 +146,8 @@ func TestOuterJoinSpillWithSel(t *testing.T) {

params := []spillTestParam{
// Normal case
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{2000000, 1000000, 3000000, 500000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{2000000, 1000000, 3000000, 500000, 10000}},
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{2000000, 1000000, 3000000, 200000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{2000000, 1000000, 3000000, 200000, 10000}},
}

err := failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/slowWorkers", `return(true)`)
Expand Down Expand Up @@ -201,8 +201,8 @@ func TestOuterJoinSpillWithOtherCondition(t *testing.T) {
otherCondition = append(otherCondition, sf)

params := []spillTestParam{
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 500000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 500000, 10000}},
}

err = failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/slowWorkers", `return(true)`)
Expand Down
59 changes: 49 additions & 10 deletions pkg/util/chunk/chunk_in_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,9 @@ func (d *DataInDiskByChunks) getChunkSize(chkIdx int) int64 {
return d.offsetOfEachChunk[chkIdx+1] - d.offsetOfEachChunk[chkIdx]
}

// GetChunk gets a Chunk from the DataInDiskByChunks by chkIdx.
func (d *DataInDiskByChunks) GetChunk(chkIdx int) (*Chunk, error) {
func (d *DataInDiskByChunks) readFromFisk(chkIdx int) error {
if err := injectChunkInDiskRandomError(); err != nil {
return nil, err
return err
}

reader := d.dataFile.getSectionReader(d.offsetOfEachChunk[chkIdx])
Expand All @@ -145,19 +144,39 @@ func (d *DataInDiskByChunks) GetChunk(chkIdx int) (*Chunk, error) {

readByteNum, err := io.ReadFull(reader, d.buf)
if err != nil {
return nil, err
return err
}

if int64(readByteNum) != chkSize {
return nil, errors2.New("Fail to restore the spilled chunk")
return errors2.New("Fail to restore the spilled chunk")
}

return nil
}

// GetChunk gets a Chunk from the DataInDiskByChunks by chkIdx.
func (d *DataInDiskByChunks) GetChunk(chkIdx int) (*Chunk, error) {
err := d.readFromFisk(chkIdx)
if err != nil {
return nil, err
}

chk := NewEmptyChunk(d.fieldTypes)
d.deserializeDataToChunk(chk)

return chk, nil
}

// FillChunk fills a Chunk from the DataInDiskByChunks by chkIdx.
func (d *DataInDiskByChunks) FillChunk(srcChkIdx int, destChk *Chunk) error {
err := d.readFromFisk(srcChkIdx)
if err != nil {
return err
}

d.deserializeDataToChunk(destChk)
return nil
}

// Close releases the disk resource.
func (d *DataInDiskByChunks) Close() {
if d.dataFile.file != nil {
Expand Down Expand Up @@ -264,7 +283,11 @@ func (d *DataInDiskByChunks) deserializeColMeta(pos *int64) (length int64, nullM

func (d *DataInDiskByChunks) deserializeSel(chk *Chunk, pos *int64, selSize int) {
selLen := int64(selSize) / intLen
chk.sel = make([]int, selLen)
if int64(cap(chk.sel)) < selLen {
chk.sel = make([]int, selLen)
} else {
chk.sel = chk.sel[:selLen]
}
for i := range selLen {
chk.sel[i] = *(*int)(unsafe.Pointer(&d.buf[*pos]))
*pos += intLen
Expand Down Expand Up @@ -299,9 +322,25 @@ func (d *DataInDiskByChunks) deserializeOffsets(dst []int64, pos *int64) {
func (d *DataInDiskByChunks) deserializeColumns(chk *Chunk, pos *int64) {
for _, col := range chk.columns {
length, nullMapSize, dataSize, offsetSize := d.deserializeColMeta(pos)
col.nullBitmap = make([]byte, nullMapSize)
col.data = make([]byte, dataSize)
col.offsets = make([]int64, offsetSize/int64Len)

if int64(cap(col.nullBitmap)) < nullMapSize {
col.nullBitmap = make([]byte, nullMapSize)
} else {
col.nullBitmap = col.nullBitmap[:nullMapSize]
}

if int64(cap(col.data)) < dataSize {
col.data = make([]byte, dataSize)
} else {
col.data = col.data[:dataSize]
}

offsetsLen := offsetSize / int64Len
if int64(cap(col.offsets)) < offsetsLen {
col.offsets = make([]int64, offsetsLen)
} else {
col.offsets = col.offsets[:offsetsLen]
}

col.length = int(length)
copy(col.nullBitmap, d.buf[*pos:*pos+nullMapSize])
Expand Down
24 changes: 22 additions & 2 deletions pkg/util/chunk/chunk_in_disk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func checkChunk(t *testing.T, chk1, chk2 *Chunk) {
}
}

func TestDataInDiskByChunks(t *testing.T) {
func testImpl(t *testing.T, isNewChunk bool) {
numChk, numRow := 100, 1000
chks, fields := initChunks(numChk, numRow)
addAuxDataForChunks(chks)
Expand All @@ -78,9 +78,29 @@ func TestDataInDiskByChunks(t *testing.T) {
require.NoError(t, err)
}

chk := NewEmptyChunk(fields)
var err error
for i := range numChk {
chk, err := dataInDiskByChunks.GetChunk(i)
if isNewChunk {
chk, err = dataInDiskByChunks.GetChunk(i)
} else {
chk.Reset()
err = dataInDiskByChunks.FillChunk(i, chk)
}
require.NoError(t, err)
checkChunk(t, chk, chks[i])
}
}

func testGetChunk(t *testing.T) {
testImpl(t, true)
}

func testFillChunk(t *testing.T) {
testImpl(t, false)
}

func TestDataInDiskByChunks(t *testing.T) {
testGetChunk(t)
testFillChunk(t)
}

0 comments on commit ef0a693

Please sign in to comment.