Skip to content

Commit

Permalink
diff: fix send on closed channel panic (#374)
Browse files Browse the repository at this point in the history
  • Loading branch information
WangXiangUSTC authored Jul 15, 2020
1 parent 04f1c03 commit eae8a56
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 15 deletions.
24 changes: 21 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,27 @@ PACKAGE_LIST := go list ./...
PACKAGES := $$($(PACKAGE_LIST))
FAIL_ON_STDOUT := awk '{ print } END { if (NR > 0) { exit 1 } }'

FAILPOINT_DIR := $$(for p in $(PACKAGES); do echo $${p\#"github.com/pingcap/tidb-tools/"}; done)
FAILPOINT := retool do failpoint-ctl
FAILPOINT_ENABLE := $$(echo $(FAILPOINT_DIR) | xargs $(FAILPOINT) enable >/dev/null)
FAILPOINT_DISABLE := $$(find $(FAILPOINT_DIR) | xargs $(FAILPOINT) disable >/dev/null)

define run_unit_test
@echo "running unit test for packages:" $(1)
$(FAILPOINT_ENABLE)
@export log_level=error; \
$(GOTEST) -cover $(1) \
|| { $(FAILPOINT_DISABLE); exit 1; }
$(FAILPOINT_DISABLE)
endef

build: prepare version check importer sync_diff_inspector ddl_checker finish

retool_setup:
@echo "setup retool"
go get github.com/twitchtv/retool
GO111MODULE=off retool sync

version:
$(GO) version

Expand All @@ -47,9 +66,8 @@ sync_diff_inspector:
ddl_checker:
$(GO) build -ldflags '$(LDFLAGS)' -o bin/ddl_checker ./ddl_checker

test: version
@export log_level=error; \
$(GOTEST) -cover $(PACKAGES)
test: version retool_setup
$(call run_unit_test,$(PACKAGES))

integration_test: build
@which bin/tidb-server
Expand Down
1 change: 1 addition & 0 deletions go.mod1
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errcode v0.3.0 // indirect
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011
github.com/pingcap/failpoint v0.0.0-20200210140405-f8f9fb234798
github.com/pingcap/kvproto v0.0.0-20200409034505-a5af800ca2ef
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd
github.com/pingcap/parser v0.0.0-20200410065024-81f3db8e6095
Expand Down
53 changes: 41 additions & 12 deletions pkg/diff/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,19 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb-tools/pkg/utils"
"go.uber.org/zap"
)

var (
// cancel the context for `Equal`, only used in test
cancelEqualFunc context.CancelFunc
)

// TableInstance record a table instance
type TableInstance struct {
Conn *sql.DB `json:"-"`
Expand Down Expand Up @@ -151,7 +157,7 @@ func (t *TableDiff) Equal(ctx context.Context, writeFixSQL func(string) error) (

dataEqual, err = t.CheckTableData(ctx)
if err != nil {
return false, false, errors.Trace(err)
return structEqual, false, errors.Trace(err)
}

select {
Expand Down Expand Up @@ -258,10 +264,15 @@ func (t *TableDiff) CheckTableData(ctx context.Context) (equal bool, err error)
checkResultCh := make(chan bool, t.CheckThreadCount)
defer close(checkResultCh)

var checkWg sync.WaitGroup
checkWorkerCh := make([]chan *ChunkRange, 0, t.CheckThreadCount)
for i := 0; i < t.CheckThreadCount; i++ {
checkWorkerCh = append(checkWorkerCh, make(chan *ChunkRange, 10))
go t.checkChunksDataEqual(ctx, t.Sample < 100 && !fromCheckpoint, checkWorkerCh[i], checkResultCh)
checkWg.Add(1)
go func(j int) {
defer checkWg.Done()
t.checkChunksDataEqual(ctx, t.Sample < 100 && !fromCheckpoint, checkWorkerCh[j], checkResultCh)
}(i)
}

go func() {
Expand Down Expand Up @@ -295,9 +306,12 @@ CheckResult:
break CheckResult
}
case <-ctx.Done():
return equal, nil
equal = false
break CheckResult
}
}
checkWg.Wait()

return equal, nil
}

Expand Down Expand Up @@ -365,25 +379,30 @@ func (t *TableDiff) getSourceTableChecksum(ctx context.Context, chunk *ChunkRang
}

func (t *TableDiff) checkChunksDataEqual(ctx context.Context, filterByRand bool, chunks chan *ChunkRange, resultCh chan bool) {
var err error
for {
select {
case chunk, ok := <-chunks:
if !ok {
return
}
eq := false
if chunk.State == successState || chunk.State == ignoreState {
resultCh <- true
continue
}
eq, err := t.checkChunkDataEqual(ctx, filterByRand, chunk)
if err != nil {
log.Error("check chunk data equal failed", zap.String("chunk", chunk.String()), zap.Error(err))
resultCh <- false
eq = true
} else {
if !eq {
eq, err = t.checkChunkDataEqual(ctx, filterByRand, chunk)
if err != nil {
log.Error("check chunk data equal failed", zap.String("chunk", chunk.String()), zap.Error(err))
eq = false
} else if !eq {
log.Warn("check chunk data not equal", zap.String("chunk", chunk.String()))
}
resultCh <- eq
}

select {
case resultCh <- eq:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
Expand All @@ -392,6 +411,16 @@ func (t *TableDiff) checkChunksDataEqual(ctx context.Context, filterByRand bool,
}

func (t *TableDiff) checkChunkDataEqual(ctx context.Context, filterByRand bool, chunk *ChunkRange) (equal bool, err error) {
failpoint.Inject("CancelCheckChunkDataEqual", func(val failpoint.Value) {
chunkID := val.(int)
if chunkID != chunk.ID {
return
}

log.Info("check chunk data equal failed", zap.String("failpoint", "CancelCheckChunkDataEqual"))
cancelEqualFunc()
})

update := func() {
ctx1, cancel1 := context.WithTimeout(ctx, dbutil.DefaultTimeout)
defer cancel1()
Expand Down
11 changes: 11 additions & 0 deletions pkg/diff/diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

_ "github.com/go-sql-driver/mysql"
. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb-tools/pkg/importer"
)
Expand Down Expand Up @@ -249,6 +250,16 @@ func testDataEqual(ctx context.Context, conn *sql.DB, schema string, sourceTable
c.Assert(err, IsNil)
c.Assert(structEqual, Equals, true)
c.Assert(dataEqual, Equals, true)

// cancel `Equal`, dataEqual will be false, and will not panic
ctx1, cancel1 := context.WithCancel(ctx)
cancelEqualFunc = cancel1
c.Assert(failpoint.Enable("github.com/pingcap/tidb-tools/pkg/diff/CancelCheckChunkDataEqual", `return(2)`), IsNil)
defer failpoint.Disable("github.com/pingcap/tidb-tools/pkg/diff/CancelCheckChunkDataEqual")
structEqual, dataEqual, err = tableDiff.Equal(ctx1, writeSqls)
c.Assert(err, IsNil)
c.Assert(structEqual, Equals, true)
c.Assert(dataEqual, Equals, false)
}

func createTableDiff(conn *sql.DB, schema string, sourceTableNames []string, targetTableName string) *TableDiff {
Expand Down
9 changes: 9 additions & 0 deletions tools.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"Tools": [
{
"Repository": "github.com/pingcap/failpoint/failpoint-ctl",
"Commit": "e7b1061e6e81d44326c449cf518992e0e2d47e98"
}
],
"RetoolVersion": "1.3.7"
}

0 comments on commit eae8a56

Please sign in to comment.