Skip to content

Commit

Permalink
add ut for event service (#271)
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 authored Sep 9, 2024
1 parent 08e122b commit f6ad359
Show file tree
Hide file tree
Showing 11 changed files with 367 additions and 176 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/google/btree v1.1.2
github.com/google/uuid v1.6.0
github.com/imdario/mergo v0.3.16
github.com/json-iterator/go v1.1.12
github.com/linkedin/goavro/v2 v2.11.1
github.com/mailru/easyjson v0.7.7
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
Expand All @@ -38,7 +39,6 @@ require (
github.com/spf13/cobra v1.8.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.9.0
github.com/thanhpk/randstr v1.0.6
github.com/tikv/client-go/v2 v2.0.8-0.20240703095801-d73cc1ed6503
github.com/tikv/pd v1.1.0-beta.0.20240407022249-7179657d129b
github.com/tikv/pd/client v0.0.0-20240717053728-5ec6af403019
Expand All @@ -50,6 +50,7 @@ require (
go.etcd.io/etcd/client/v3 v3.5.12
go.etcd.io/etcd/server/v3 v3.5.12
go.uber.org/atomic v1.11.0
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
golang.org/x/net v0.27.0
golang.org/x/sync v0.7.0
Expand Down Expand Up @@ -193,7 +194,6 @@ require (
github.com/joho/sqltocsv v0.0.0-20210428211105-a6d6801d59df // indirect
github.com/jonboulle/clockwork v0.4.0 // indirect
github.com/joomcode/errorx v1.0.1 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.17.8 // indirect
github.com/klauspost/cpuid v1.3.1 // indirect
Expand Down
10 changes: 10 additions & 0 deletions pkg/common/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package common

import (
"encoding/binary"
"encoding/json"
"fmt"

"github.com/pingcap/tidb/pkg/parser/model"
Expand Down Expand Up @@ -400,6 +401,15 @@ func (e *DDLEvent) GetDDLType() model.ActionType {
return e.Job.Type
}

func (t DDLEvent) Marshal() ([]byte, error) {
// TODO: optimize it
return json.Marshal(t)
}

func (t *DDLEvent) Unmarshal(data []byte) error {
return json.Unmarshal(data, t)
}

type InfluenceType int

const (
Expand Down
40 changes: 18 additions & 22 deletions pkg/eventservice/event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/flowbehappy/tigate/logservice/eventstore"
"github.com/flowbehappy/tigate/logservice/schemastore"
"github.com/flowbehappy/tigate/pkg/common"
appcontext "github.com/flowbehappy/tigate/pkg/common/context"
"github.com/flowbehappy/tigate/pkg/filter"
"github.com/flowbehappy/tigate/pkg/messaging"
"github.com/flowbehappy/tigate/pkg/metrics"
Expand Down Expand Up @@ -86,7 +85,7 @@ func newEventBroker(
tidbClusterID: id,
eventStore: eventStore,
mounter: mounter.NewMounter(tz),
schemaStore: appcontext.GetService[schemastore.SchemaStore](appcontext.SchemaStore),
schemaStore: schemaStore,
dispatchers: sync.Map{},
tableTriggerDispatchers: sync.Map{},
spans: make(map[common.TableID]*spanSubscription),
Expand Down Expand Up @@ -179,7 +178,7 @@ func (c *eventBroker) tickTableTriggerDispatchers(ctx context.Context) {
}

func (c *eventBroker) sendDDL(remoteID messaging.ServerId, e common.DDLEvent, d *dispatcherStat) {
c.messageCh <- newWrapDDLEvent(remoteID, e)
c.messageCh <- newWrapDDLEvent(remoteID, &e)
d.metricEventServiceSendDDLCount.Inc()
}

Expand Down Expand Up @@ -231,7 +230,7 @@ func (c *eventBroker) doScan(task *scanTask) {
// 3. Get the events from the iterator and send them to the dispatcher.
sendTxn := func(t *common.DMLEvent) {
if t != nil {
if len(ddlEvents) > 0 && t.CommitTs > ddlEvents[0].CommitTS {
for len(ddlEvents) > 0 && t.CommitTs > ddlEvents[0].CommitTS {
c.sendDDL(remoteID, ddlEvents[0], task.dispatcherStat)
ddlEvents = ddlEvents[1:]
}
Expand Down Expand Up @@ -260,7 +259,7 @@ func (c *eventBroker) doScan(task *scanTask) {
if isNewTxn {
sendTxn(txnEvent)
tableID := task.dispatcherStat.info.GetTableSpan().TableID
tableInfo, err := c.schemaStore.GetTableInfo(int64(tableID), e.CRTs-1)
tableInfo, err := c.schemaStore.GetTableInfo(tableID, e.CRTs-1)
if err != nil {
// FIXME handle the error
log.Panic("get table info failed", zap.Error(err))
Expand Down Expand Up @@ -292,7 +291,7 @@ func (c *eventBroker) runSendMessageWorker(ctx context.Context) {
tMsg := messaging.NewSingleTargetMessage(
m.serverID,
messaging.EventCollectorTopic,
m.txnEvent)
m.e)
c.flushResolvedTs(ctx, m.serverID)
c.sendMsg(ctx, tMsg)
case <-flushResolvedTsTicker.C:
Expand All @@ -304,15 +303,15 @@ func (c *eventBroker) runSendMessageWorker(ctx context.Context) {
}()
}

func (c *eventBroker) handleResolvedTs(ctx context.Context, e wrapEvent) {
cache, ok := c.resolvedTsCaches[e.serverID]
func (c *eventBroker) handleResolvedTs(ctx context.Context, m wrapEvent) {
cache, ok := c.resolvedTsCaches[m.serverID]
if !ok {
cache = newResolvedTsCache(resolvedTsCacheSize)
c.resolvedTsCaches[e.serverID] = cache
c.resolvedTsCaches[m.serverID] = cache
}
cache.add(e.resolvedEvent)
cache.add(*m.e.(*common.ResolvedEvent))
if cache.isFull() {
c.flushResolvedTs(ctx, e.serverID)
c.flushResolvedTs(ctx, m.serverID)
}
}

Expand Down Expand Up @@ -643,33 +642,30 @@ func (p *scanTaskQueue) popTask(chanIndex int) <-chan *scanTask {

type wrapEvent struct {
serverID messaging.ServerId
// TODO: change the type of the txnEvent to common.TEvent
txnEvent *common.DMLEvent
resolvedEvent common.ResolvedEvent
ddlEvent *common.DDLEvent
msgType int
e messaging.IOTypeT
msgType int
}

func newWrapTxnEvent(serverID messaging.ServerId, e *common.DMLEvent) wrapEvent {
return wrapEvent{
serverID: serverID,
txnEvent: e,
e: e,
msgType: common.TypeDMLEvent,
}
}

func newWrapResolvedEvent(serverID messaging.ServerId, e common.ResolvedEvent) wrapEvent {
return wrapEvent{
serverID: serverID,
resolvedEvent: e,
msgType: common.TypeResolvedEvent,
serverID: serverID,
e: &e,
msgType: common.TypeResolvedEvent,
}
}

func newWrapDDLEvent(serverID messaging.ServerId, e common.DDLEvent) wrapEvent {
func newWrapDDLEvent(serverID messaging.ServerId, e *common.DDLEvent) wrapEvent {
return wrapEvent{
serverID: serverID,
ddlEvent: &e,
e: e,
msgType: common.TypeDDLEvent,
}
}
Expand Down
118 changes: 112 additions & 6 deletions pkg/eventservice/event_broker_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package eventservice

import (
"context"
"fmt"
"math/rand"
"sync"
"testing"
"time"

"github.com/flowbehappy/tigate/heartbeatpb"
"github.com/flowbehappy/tigate/pkg/common"
"github.com/flowbehappy/tigate/pkg/messaging"
"github.com/flowbehappy/tigate/pkg/mounter"
"github.com/pingcap/log"
tconfig "github.com/pingcap/tiflow/pkg/config"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -181,6 +185,109 @@ func TestResolvedTsCache(t *testing.T) {
require.False(t, rc.isFull())
}

func genEvents(helper *mounter.EventTestHelper, t *testing.T, ddl string, dmls ...string) (common.DDLEvent, []*common.RawKVEntry) {
job := helper.DDL2Job(ddl)
schema := job.SchemaName
table := job.TableName
kvEvents1 := helper.DML2RawKv(schema, table, dmls...)
for _, e := range kvEvents1 {
require.Equal(t, job.BinlogInfo.TableInfo.UpdateTS-1, e.StartTs)
require.Equal(t, job.BinlogInfo.TableInfo.UpdateTS+1, e.CRTs)
}
return common.DDLEvent{
CommitTS: job.BinlogInfo.TableInfo.UpdateTS,
Job: job,
}, kvEvents1
}

func TestSendEvents(t *testing.T) {
helper := mounter.NewEventTestHelper(t)
defer helper.Close()

helper.Tk().MustExec("use test")
ddlEvent, kvEvents := genEvents(helper, t, `create table test.t(id int primary key, c char(50))`, []string{
`insert into test.t(id,c) values (0, "c0")`,
`insert into test.t(id,c) values (1, "c1")`,
`insert into test.t(id,c) values (2, "c2")`,
}...)
require.NotNil(t, kvEvents)

ddlEvent1, _ := genEvents(helper, t, `alter table test.t add column dummy int default 0`)
require.Less(t, ddlEvent.CommitTS, ddlEvent1.CommitTS)

ddlEvent2, kvEvents2 := genEvents(helper, t, `alter table test.t add column d int`, []string{
`insert into test.t(id,c,d) values (10, "c10", 10)`,
`insert into test.t(id,c,d) values (11, "c11", 11)`,
`insert into test.t(id,c,d) values (12, "c12", 12)`,
}...)
require.NotNil(t, kvEvents2)
require.Less(t, kvEvents[0].CRTs, kvEvents2[1].CRTs)
require.Less(t, ddlEvent.CommitTS, ddlEvent2.CommitTS)

wg := &sync.WaitGroup{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

eventStore := newMockEventStore()
schemaStore := newMockSchemaStore()
msgCh := make(chan *messaging.TargetMessage, 1024)
mc := &mockMessageCenter{messageCh: msgCh}

wg.Add(1)
go func() {
defer wg.Done()
expected := []struct {
t int // type
commitTs common.Ts
}{
{t: common.TypeDDLEvent, commitTs: ddlEvent.CommitTS},
{t: common.TypeDMLEvent, commitTs: kvEvents[0].CRTs},
{t: common.TypeDDLEvent, commitTs: ddlEvent1.CommitTS},
{t: common.TypeDDLEvent, commitTs: ddlEvent2.CommitTS},
{t: common.TypeDMLEvent, commitTs: kvEvents2[0].CRTs},
{t: common.TypeBatchResolvedEvent, commitTs: common.Ts(0)},
}
cnt := 0
for {
select {
case <-ctx.Done():
return
case msgs := <-msgCh:
for _, msg := range msgs.Message {
event, ok := msg.(common.Event)
require.True(t, ok)
fmt.Printf("cnt: %d -> %+v\n", cnt, event)
require.Equal(t, expected[cnt].t, event.GetType(), "cnt: %d, e: %+v", cnt, event)
require.Equal(t, expected[cnt].commitTs, event.GetCommitTs(), "cnt: %d, e: %+v", cnt, event)
cnt++

if cnt == len(expected) {
return
}
}
}
}
}()

s := newEventBroker(ctx, 1, eventStore, schemaStore, mc, time.Local)
defer s.close()

// Register the dispatcher
tableID := ddlEvent.Job.TableID
info := newMockAcceptorInfo(common.NewDispatcherID(), tableID)
s.addDispatcher(info)
_, ok := s.spans[tableID]
require.True(t, ok)

schemaStore.AppendDDLEvent(tableID, ddlEvent, ddlEvent1, ddlEvent2)

span, ok := eventStore.spans[tableID]
require.True(t, ok)
span.update(ddlEvent2.CommitTS+1, append(kvEvents, kvEvents2...)...)

wg.Wait()
}

// mockDispatcherInfo is a mock implementation of the AcceptorInfo interface
type mockDispatcherInfo struct {
clusterID uint64
Expand Down Expand Up @@ -249,17 +356,16 @@ func (m *mockDispatcherInfo) GetFilterConfig() *tconfig.FilterConfig {
type mockSpanStats struct {
startTs uint64
watermark uint64
pendingEvents []*common.DMLEvent
pendingEvents []*common.RawKVEntry
onUpdate func(watermark uint64)
onEvent func(event *common.RawKVEntry)
}

func (m *mockSpanStats) update(event []*common.DMLEvent, watermark uint64) {
m.pendingEvents = append(m.pendingEvents, event...)
func (m *mockSpanStats) update(watermark uint64, events ...*common.RawKVEntry) {
m.pendingEvents = append(m.pendingEvents, events...)
m.watermark = watermark
for _, e := range event {
_ = e
m.onEvent(&common.RawKVEntry{})
for _, e := range events {
m.onEvent(e)
}
m.onUpdate(watermark)
}
2 changes: 1 addition & 1 deletion pkg/eventservice/event_service_performance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestEventServiceOneMillionTable(t *testing.T) {
if !ok {
continue
}
sub.update(nil, sub.watermark+1)
sub.update(sub.watermark + 1)
}
log.Info("send resolvedTs events for 1 million tables", zap.Duration("cost", time.Since(sendStart)), zap.Any("round", round))
round++
Expand Down
Loading

0 comments on commit f6ad359

Please sign in to comment.