Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

[WIP] Async Checkpoint with Causality Optimization #2262

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 91 additions & 16 deletions syncer/causality.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package syncer

import (
"math"
"time"

"go.uber.org/zap"
Expand All @@ -27,7 +28,7 @@ import (
// if some conflicts exist in more than one groups, causality generate a conflict job and reset.
// this mechanism meets quiescent consistency to ensure correctness.
type causality struct {
relations map[string]string
relations *rollingMap
outCh chan *job
inCh chan *job
logger log.Logger
Expand All @@ -40,7 +41,7 @@ type causality struct {
// causalityWrap creates and runs a causality instance.
func causalityWrap(inCh chan *job, syncer *Syncer) chan *job {
causality := &causality{
relations: make(map[string]string),
relations: newRollingMap(),
task: syncer.cfg.Name,
source: syncer.cfg.SourceID,
logger: syncer.tctx.Logger.WithFields(zap.String("component", "causality")),
Expand All @@ -63,17 +64,20 @@ func (c *causality) run() {
metrics.QueueSizeGauge.WithLabelValues(c.task, "causality_input", c.source).Set(float64(len(c.inCh)))

startTime := time.Now()
if j.tp == flush {
c.reset()
} else {
switch j.tp {
case flush:
c.relations.rotate()
case gc:
c.relations.gc(j.seq)
default:
keys := j.dml.identifyKeys()
// detectConflict before add
if c.detectConflict(keys) {
c.logger.Debug("meet causality key, will generate a conflict job to flush all sqls", zap.Strings("keys", keys))
c.outCh <- newConflictJob()
c.reset()
c.relations.clear()
}
j.dml.key = c.add(keys)
j.dml.key = c.add(keys, j.seq)
c.logger.Debug("key for keys", zap.String("key", j.dml.key), zap.Strings("keys", keys))
}
metrics.ConflictDetectDurationHistogram.WithLabelValues(c.task, c.source).Observe(time.Since(startTime).Seconds())
Expand All @@ -88,7 +92,7 @@ func (c *causality) close() {
}

// add adds keys relation and return the relation. The keys must `detectConflict` first to ensure correctness.
func (c *causality) add(keys []string) string {
func (c *causality) add(keys []string, version int64) string {
if len(keys) == 0 {
return ""
}
Expand All @@ -97,25 +101,20 @@ func (c *causality) add(keys []string) string {
selectedRelation := keys[0]
var nonExistKeys []string
for _, key := range keys {
if val, ok := c.relations[key]; ok {
if val, ok := c.relations.get(key); ok {
selectedRelation = val
} else {
nonExistKeys = append(nonExistKeys, key)
}
}
// set causal relations for those non-exist keys
for _, key := range nonExistKeys {
c.relations[key] = selectedRelation
c.relations.set(key, selectedRelation, version)
}

return selectedRelation
}

// reset resets relations.
func (c *causality) reset() {
c.relations = make(map[string]string)
}

// detectConflict detects whether there is a conflict.
func (c *causality) detectConflict(keys []string) bool {
if len(keys) == 0 {
Expand All @@ -124,7 +123,7 @@ func (c *causality) detectConflict(keys []string) bool {

var existedRelation string
for _, key := range keys {
if val, ok := c.relations[key]; ok {
if val, ok := c.relations.get(key); ok {
if existedRelation != "" && val != existedRelation {
return true
}
Expand All @@ -134,3 +133,79 @@ func (c *causality) detectConflict(keys []string) bool {

return false
}

type versionedMap struct {
data map[string]string
maxVer int64
}

// rollingMap is a map this contains multi map instances
type rollingMap struct {
maps []*versionedMap
// current map fro write
cur *versionedMap
}

func newRollingMap() *rollingMap {
m := &rollingMap{
maps: make([]*versionedMap, 0),
}
m.rotate()
return m
}

func (m *rollingMap) get(key string) (string, bool) {
for i := len(m.maps) - 1; i >= 0; i-- {
if v, ok := m.maps[i].data[key]; ok {
return v, true
}
}
return "", false
}

func (m *rollingMap) set(key string, val string, version int64) {
m.cur.data[key] = val
if m.cur.maxVer < version {
m.cur.maxVer = version
}
}

func (m *rollingMap) len() int {
cnt := 0
for _, d := range m.maps {
cnt += len(d.data)
}
return cnt
}

func (m *rollingMap) rotate() {
if len(m.maps) == 0 || len(m.maps[len(m.maps)-1].data) > 0 {
m.maps = append(m.maps, &versionedMap{
data: make(map[string]string),
})
m.cur = m.maps[len(m.maps)-1]
}
}

func (m *rollingMap) clear() {
m.gc(math.MaxInt64)
}

func (m *rollingMap) gc(version int64) {
idx := 0
for i, d := range m.maps {
if d.maxVer > 0 && d.maxVer <= version {
// set nil value to trigger go gc
m.maps[i] = nil
} else {
idx = i
break
}
}
if idx == len(m.maps)-1 {
m.maps = m.maps[:0]
m.rotate()
} else if idx > 0 {
m.maps = m.maps[idx:]
}
}
10 changes: 5 additions & 5 deletions syncer/causality_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

func (s *testSyncerSuite) TestDetectConflict(c *C) {
ca := &causality{
relations: make(map[string]string),
relations: newRollingMap(),
}
caseData := []string{"test_1", "test_2", "test_3"}
excepted := map[string]string{
Expand All @@ -39,16 +39,16 @@ func (s *testSyncerSuite) TestDetectConflict(c *C) {
"test_3": "test_1",
}
c.Assert(ca.detectConflict(caseData), IsFalse)
ca.add(caseData)
ca.add(caseData, 1)
c.Assert(ca.relations, DeepEquals, excepted)
c.Assert(ca.detectConflict([]string{"test_4"}), IsFalse)
ca.add([]string{"test_4"})
ca.add([]string{"test_4"}, 2)
excepted["test_4"] = "test_4"
c.Assert(ca.relations, DeepEquals, excepted)
conflictData := []string{"test_4", "test_3"}
c.Assert(ca.detectConflict(conflictData), IsTrue)
ca.reset()
c.Assert(ca.relations, HasLen, 0)
ca.relations.clear()
c.Assert(ca.relations.len(), Equals, 0)
}

func (s *testSyncerSuite) TestCasuality(c *C) {
Expand Down
Loading