Skip to content

Commit

Permalink
Add coordinator scheduling test (#203)
Browse files Browse the repository at this point in the history
* add coordinator scheduling test
  • Loading branch information
sdojjy authored Aug 13, 2024
1 parent 43f6faa commit fed9530
Show file tree
Hide file tree
Showing 4 changed files with 291 additions and 13 deletions.
7 changes: 3 additions & 4 deletions coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package coordinator

import (
"context"
"fmt"
"math"
"sync"
"time"
Expand Down Expand Up @@ -263,6 +262,9 @@ func (c *coordinator) saveChangefeedStatus() {
return true
}
cf := value.(*changefeed)
if cf.State == nil {
return true
}
if !shouldRunChangefeed(model.FeedState(cf.State.FeedState)) {
cfState.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
info.State = model.FeedState(cf.State.FeedState)
Expand Down Expand Up @@ -426,7 +428,6 @@ func (c *coordinator) printStatus() {
absentTask := 0
commitTask := 0
removingTask := 0
var taskDistribution string
c.supervisor.StateMachines.Ascend(func(key scheduler.InferiorID, value *scheduler.StateMachine) bool {
switch value.State {
case scheduler.SchedulerStatusAbsent:
Expand All @@ -440,11 +441,9 @@ func (c *coordinator) printStatus() {
case scheduler.SchedulerStatusRemoving:
removingTask++
}
taskDistribution = fmt.Sprintf("%s, %s==>%s", taskDistribution, value.ID.String(), value.Primary)
return true
})
log.Info("changefeed status",
zap.String("distribution", taskDistribution),
zap.Int("absent", absentTask),
zap.Int("prepare", prepareTask),
zap.Int("commit", commitTask),
Expand Down
289 changes: 284 additions & 5 deletions coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,296 @@
package coordinator

import (
"context"
"encoding/json"
"flag"
"fmt"
"net/http"
"net/http/pprof"
"strconv"
"sync"
"testing"
"time"

"github.com/flowbehappy/tigate/heartbeatpb"
"github.com/flowbehappy/tigate/pkg/common"
appcontext "github.com/flowbehappy/tigate/pkg/common/context"
"github.com/flowbehappy/tigate/pkg/config"
"github.com/flowbehappy/tigate/pkg/messaging"
"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
config2 "github.com/pingcap/tiflow/pkg/config"
mock_etcd "github.com/pingcap/tiflow/pkg/etcd/mock"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/pdutil"
pd "github.com/tikv/pd/client"
"go.uber.org/atomic"
"go.uber.org/zap"
)

type CFID model.ChangeFeedID
type mockPdClient struct {
pd.Client
}

func (m *mockPdClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
return safePoint, nil
}

type mockMaintainerManager struct {
mc messaging.MessageCenter
msgCh chan *messaging.TargetMessage
coordinatorVersion int64
coordinatorID messaging.ServerId
maintainers sync.Map
}

func NewMaintainerManager() *mockMaintainerManager {
mc := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter)
m := &mockMaintainerManager{
mc: mc,
maintainers: sync.Map{},
msgCh: make(chan *messaging.TargetMessage, 1024),
}
mc.RegisterHandler(messaging.MaintainerManagerTopic, m.recvMessages)
return m
}

func (m *mockMaintainerManager) Run(ctx context.Context) error {
tick := time.NewTicker(time.Millisecond * 500)
for {
select {
case <-ctx.Done():
return ctx.Err()
case msg := <-m.msgCh:
m.handleMessage(msg)
case <-tick.C:
//1. try to send heartbeat to coordinator
m.sendHeartbeat()
}
}
}

func (m *mockMaintainerManager) handleMessage(msg *messaging.TargetMessage) {
switch msg.Type {
case messaging.TypeCoordinatorBootstrapRequest:
m.onCoordinatorBootstrapRequest(msg)
case messaging.TypeDispatchMaintainerRequest:
absent := m.onDispatchMaintainerRequest(msg)
if m.coordinatorVersion > 0 {
response := &heartbeatpb.MaintainerHeartbeat{}
for _, id := range absent {
response.Statuses = append(response.Statuses, &heartbeatpb.MaintainerStatus{
ChangefeedID: id,
State: heartbeatpb.ComponentState_Absent,
})
}
if len(response.Statuses) != 0 {
m.sendMessages(response)
}
}
}
}
func (m *mockMaintainerManager) sendMessages(msg *heartbeatpb.MaintainerHeartbeat) {
target := messaging.NewTargetMessage(
m.coordinatorID,
messaging.CoordinatorTopic,
msg,
)
err := m.mc.SendCommand(target)
if err != nil {
log.Warn("send command failed", zap.Error(err))
}
}
func (m *mockMaintainerManager) recvMessages(ctx context.Context, msg *messaging.TargetMessage) error {
switch msg.Type {
// receive message from coordinator
case messaging.TypeDispatchMaintainerRequest:
fallthrough
case messaging.TypeCoordinatorBootstrapRequest:
select {
case <-ctx.Done():
return ctx.Err()
case m.msgCh <- msg:
}
return nil
default:
log.Panic("unknown message type", zap.Any("message", msg.Message))
}
return nil
}
func (m *mockMaintainerManager) onCoordinatorBootstrapRequest(msg *messaging.TargetMessage) {
req := msg.Message.(*heartbeatpb.CoordinatorBootstrapRequest)
if m.coordinatorVersion > req.Version {
log.Warn("ignore invalid coordinator version",
zap.Int64("version", req.Version))
return
}
m.coordinatorID = msg.From
m.coordinatorVersion = req.Version

response := &heartbeatpb.CoordinatorBootstrapResponse{}
err := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter).SendCommand(messaging.NewTargetMessage(
m.coordinatorID,
messaging.CoordinatorTopic,
response,
))
if err != nil {
log.Warn("send command failed", zap.Error(err))
}
log.Info("New coordinator online",
zap.Int64("version", m.coordinatorVersion))
}
func (m *mockMaintainerManager) onDispatchMaintainerRequest(
msg *messaging.TargetMessage,
) []string {
request := msg.Message.(*heartbeatpb.DispatchMaintainerRequest)
if m.coordinatorID != msg.From {
log.Warn("ignore invalid coordinator id",
zap.Any("request", request),
zap.Any("coordinator", msg.From))
return nil
}
absent := make([]string, 0)
for _, req := range request.AddMaintainers {
cfID := model.DefaultChangeFeedID(req.GetId())
cf, ok := m.maintainers.Load(cfID)
if !ok {
cfConfig := &model.ChangeFeedInfo{}
err := json.Unmarshal(req.Config, cfConfig)
if err != nil {
log.Panic("decode changefeed fail", zap.Error(err))
}
cf = &Maintainer{config: cfConfig}
m.maintainers.Store(cfID, cf)
}
cf.(*Maintainer).isSecondary.Store(req.IsSecondary)
}

for _, req := range request.RemoveMaintainers {
cfID := model.DefaultChangeFeedID(req.GetId())
cf, ok := m.maintainers.Load(cfID)
if !ok {
log.Warn("ignore remove maintainer request, "+
"since the maintainer not found",
zap.String("changefeed", cfID.String()),
zap.Any("request", req))
absent = append(absent, req.GetId())
continue
}
cf.(*Maintainer).removing.Store(true)
cf.(*Maintainer).cascadeRemoving.Store(req.Cascade)
}
return absent
}
func (m *mockMaintainerManager) sendHeartbeat() {
if m.coordinatorVersion > 0 {
response := &heartbeatpb.MaintainerHeartbeat{}
m.maintainers.Range(func(key, value interface{}) bool {
cfMaintainer := value.(*Maintainer)
if cfMaintainer.statusChanged.Load() || time.Since(cfMaintainer.lastReportTime) > time.Second*2 {
response.Statuses = append(response.Statuses, cfMaintainer.GetMaintainerStatus())
cfMaintainer.statusChanged.Store(false)
cfMaintainer.lastReportTime = time.Now()
}
return true
})
if len(response.Statuses) != 0 {
m.sendMessages(response)
}
}
}

type Maintainer struct {
statusChanged atomic.Bool
lastReportTime time.Time
removing atomic.Bool
cascadeRemoving atomic.Bool
isSecondary atomic.Bool

config *model.ChangeFeedInfo
}

func (c CFID) Less(t any) bool {
cf := t.(CFID)
return c.ID < cf.ID
func (m *Maintainer) GetMaintainerStatus() *heartbeatpb.MaintainerStatus {
return &heartbeatpb.MaintainerStatus{
ChangefeedID: m.config.ID,
State: heartbeatpb.ComponentState_Working,
}
}

func TestCoordinatorRun(t *testing.T) {
func TestCoordinatorScheduling(t *testing.T) {
mux := http.NewServeMux()
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
go func() {
t.Fatal(http.ListenAndServe(":8300", mux))
}()

ctx := context.Background()
node := &common.NodeInfo{ID: uuid.New().String()}
etcdClient := mock_etcd.NewMockCDCEtcdClient(gomock.NewController(t))
etcdClient.EXPECT().GetGCServiceID().Return("default").AnyTimes()
appcontext.SetService(appcontext.MessageCenter, messaging.NewMessageCenter(ctx,
messaging.ServerId(node.ID), 100, config.NewDefaultMessageCenterConfig()))
m := NewMaintainerManager()
go m.Run(ctx)

cr := NewCoordinator(node, &mockPdClient{}, pdutil.NewClock4Test(), etcdClient, 100)
var metadata orchestrator.ReactorState

cfs := map[model.ChangeFeedID]*orchestrator.ChangefeedReactorState{}

if !flag.Parsed() {
flag.Parse()
}

argList := flag.Args()
if len(argList) > 1 {
t.Fatal("unexpected args", argList)
}
cfSize := 100
if len(argList) == 1 {
cfSize, _ = strconv.Atoi(argList[0])
}

for i := 0; i < cfSize; i++ {
cfID := model.DefaultChangeFeedID(fmt.Sprintf("%d", i))
cfs[cfID] = &orchestrator.ChangefeedReactorState{
ID: cfID,
Info: &model.ChangeFeedInfo{
ID: cfID.ID,
Namespace: cfID.Namespace,
Config: config2.GetDefaultReplicaConfig(),
State: model.StateNormal,
},
Status: &model.ChangeFeedStatus{CheckpointTs: 10, MinTableBarrierTs: 10},
}
}
metadata = &orchestrator.GlobalReactorState{
Captures: map[model.CaptureID]*model.CaptureInfo{
node.ID: {
ID: node.ID,
AdvertiseAddr: "127.0.0.1:8300",
},
},
Changefeeds: cfs,
}

tick := time.NewTicker(time.Millisecond * 50)
var err error
for {
select {
case <-tick.C:
metadata, err = cr.Tick(ctx, metadata)
if err != nil {
t.Fatal(err)
}
case <-ctx.Done():
return
}
}
}
5 changes: 2 additions & 3 deletions downstream_performance.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
appcontext "github.com/flowbehappy/tigate/pkg/common/context"
"github.com/flowbehappy/tigate/pkg/config"
"github.com/flowbehappy/tigate/pkg/messaging"
"github.com/flowbehappy/tigate/server/watcher"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"go.uber.org/zap"
Expand All @@ -27,7 +26,7 @@ const dispatcherCount = 1000
const databaseCount = 20

func initContext(serverId messaging.ServerId) {
appcontext.SetService(appcontext.MessageCenter, messaging.NewMessageCenter(context.Background(), serverId, watcher.TempEpoch, config.NewDefaultMessageCenterConfig()))
appcontext.SetService(appcontext.MessageCenter, messaging.NewMessageCenter(context.Background(), serverId, 100, config.NewDefaultMessageCenterConfig()))
appcontext.SetService(appcontext.EventCollector, eventcollector.NewEventCollector(100*1024*1024*1024, serverId)) // 100GB for demo
appcontext.SetService(appcontext.HeartbeatCollector, dispatchermanager.NewHeartBeatCollector(serverId))
}
Expand Down Expand Up @@ -88,7 +87,7 @@ func main() {
managerMap := make(map[int]*dispatchermanager.EventDispatcherManager)

for db_index := 0; db_index < databaseCount; db_index++ {
changefeedConfig := model.ChangefeedConfig{
changefeedConfig := config.ChangefeedConfig{
SinkURI: "tidb://root:@127.0.0.1:4000",
}
changefeedID := model.DefaultChangeFeedID("test" + strconv.Itoa(db_index))
Expand Down
3 changes: 2 additions & 1 deletion scheduler/state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ func NewStateMachine(
sm.State = SchedulerStatusRemoving
}
log.Info("initialize state machine",
zap.Any("statemachine", sm))
zap.Any("id", sm.ID),
zap.Any("state", sm.State))

return sm, nil
}
Expand Down

0 comments on commit fed9530

Please sign in to comment.