From 1849b1941c79febc8f945098c9aa955d5491e92f Mon Sep 17 00:00:00 2001 From: wayblink Date: Mon, 27 May 2024 21:02:24 +0800 Subject: [PATCH] fix collection schema --- .../datacoord/clustering_compaction_task.go | 1 + internal/datacoord/compaction_task.go | 319 +----------------- internal/datacoord/compaction_task_l0.go | 2 + internal/datacoord/compaction_task_mix.go | 1 + internal/datacoord/compaction_trigger.go | 7 +- internal/datacoord/compaction_trigger_v2.go | 33 +- internal/datacoord/mock_channel_store.go | 88 ++--- internal/datacoord/mock_channelmanager.go | 52 --- internal/datacoord/mock_cluster.go | 2 +- internal/datacoord/mock_compaction_meta.go | 301 +++++++++++++++++ .../datacoord/mock_compaction_plan_context.go | 77 +++-- internal/datacoord/mock_scheduler.go | 47 ++- internal/datacoord/mock_session_manager.go | 2 +- internal/datacoord/mock_trigger_manager.go | 119 ++++++- internal/proto/data_coord.proto | 12 +- 15 files changed, 542 insertions(+), 521 deletions(-) diff --git a/internal/datacoord/clustering_compaction_task.go b/internal/datacoord/clustering_compaction_task.go index 92a3fb2a05e12..7e8209f7dbbc0 100644 --- a/internal/datacoord/clustering_compaction_task.go +++ b/internal/datacoord/clustering_compaction_task.go @@ -324,6 +324,7 @@ func (task *clusteringCompactionTask) BuildCompactionRequest(handler *compaction Channel: task.GetChannel(), CollectionTtl: task.GetCollectionTtl(), TotalRows: task.GetTotalRows(), + Schema: task.GetSchema(), ClusteringKeyField: task.GetClusteringKeyField().GetFieldID(), MaxSegmentRows: task.GetMaxSegmentRows(), PreferSegmentRows: task.GetPreferSegmentRows(), diff --git a/internal/datacoord/compaction_task.go b/internal/datacoord/compaction_task.go index 0a62cd76b53b5..b1dde8faf4ba0 100644 --- a/internal/datacoord/compaction_task.go +++ b/internal/datacoord/compaction_task.go @@ -17,17 +17,9 @@ package datacoord import ( - "github.com/cockroachdb/errors" - "github.com/samber/lo" - "go.opentelemetry.io/otel/trace" - "go.uber.org/zap" - - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/util/merr" - "github.com/milvus-io/milvus/pkg/util/tsoutil" + "go.opentelemetry.io/otel/trace" ) type CompactionTask interface { @@ -113,312 +105,3 @@ func cleanLogPath() compactionTaskOpt { task.CleanLogPath() } } - -var _ CompactionTask = (*defaultCompactionTask)(nil) - -type defaultCompactionTask struct { - *datapb.CompactionTask - // deprecated - triggerInfo *compactionSignal - plan *datapb.CompactionPlan - dataNodeID int64 - result *datapb.CompactionPlanResult - span trace.Span -} - -func (task *defaultCompactionTask) ProcessTask(handler *compactionPlanHandler) error { - switch task.GetState() { - case datapb.CompactionTaskState_failed: - return nil - case datapb.CompactionTaskState_completed: - return nil - case datapb.CompactionTaskState_pipelining: - return task.processPipeliningTask(handler) - case datapb.CompactionTaskState_executing: - return task.processExecutingTask(handler) - case datapb.CompactionTaskState_timeout: - return task.processTimeoutTask(handler) - default: - return errors.New("not supported state") - } - return nil -} - -func (task *defaultCompactionTask) processPipeliningTask(handler *compactionPlanHandler) error { - handler.scheduler.Submit(task) - handler.plans[task.GetPlanID()] = task.ShadowClone(setState(datapb.CompactionTaskState_executing)) - log.Info("Compaction plan submited") - return nil -} - -func (task *defaultCompactionTask) processExecutingTask(handler *compactionPlanHandler) error { - nodePlan, exist := handler.compactionResults[task.GetPlanID()] - if !exist { - // compaction task in DC but not found in DN means the compaction plan has failed - log.Info("compaction failed") - handler.plans[task.GetPlanID()] = task.ShadowClone(setState(datapb.CompactionTaskState_failed), endSpan()) - handler.setSegmentsCompacting(task, false) - handler.scheduler.Finish(task.GetPlanID(), task) - return nil - } - planResult := nodePlan.B - switch planResult.GetState() { - case commonpb.CompactionState_Completed: - // channels are balanced to other nodes, yet the old datanode still have the compaction results - // task.dataNodeID == planState.A, but - // task.dataNodeID not match with channel - // Mark this compaction as failure and skip processing the meta - if !handler.chManager.Match(task.GetNodeID(), task.GetChannel()) { - // Sync segments without CompactionFrom segmentsIDs to make sure DN clear the task - // without changing the meta - log.Warn("compaction failed for channel nodeID not match") - err := handler.sessions.SyncSegments(task.GetNodeID(), &datapb.SyncSegmentsRequest{PlanID: task.GetPlanID()}) - if err != nil { - log.Warn("compaction failed to sync segments with node", zap.Error(err)) - return err - } - handler.setSegmentsCompacting(task, false) - handler.plans[task.GetPlanID()] = task.ShadowClone(setState(datapb.CompactionTaskState_failed), cleanLogPath(), endSpan()) - handler.scheduler.Finish(task.GetNodeID(), task) - return nil - } - switch task.GetType() { - case datapb.CompactionType_MergeCompaction, datapb.CompactionType_MixCompaction: - if err := handler.handleMergeCompactionResult(task.GetPlan(), planResult); err != nil { - return err - } - case datapb.CompactionType_Level0DeleteCompaction: - if err := handler.handleL0CompactionResult(task.GetPlan(), planResult); err != nil { - return err - } - } - UpdateCompactionSegmentSizeMetrics(planResult.GetSegments()) - handler.plans[task.GetPlanID()] = task.ShadowClone(setState(datapb.CompactionTaskState_completed), setResult(planResult), cleanLogPath(), endSpan()) - handler.scheduler.Finish(task.GetNodeID(), task) - case commonpb.CompactionState_Executing: - ts := tsoutil.GetCurrentTime() - if isTimeout(ts, task.GetStartTime(), task.GetTimeoutInSeconds()) { - log.Warn("compaction timeout", - zap.Int32("timeout in seconds", task.GetTimeoutInSeconds()), - zap.Uint64("startTime", task.GetStartTime()), - zap.Uint64("now", ts), - ) - handler.plans[task.GetPlanID()] = task.ShadowClone(setState(datapb.CompactionTaskState_timeout), endSpan()) - } - } - return nil -} - -func (task *defaultCompactionTask) processTimeoutTask(handler *compactionPlanHandler) error { - log := log.With( - zap.Int64("planID", task.GetPlanID()), - zap.Int64("nodeID", task.GetNodeID()), - zap.String("channel", task.GetChannel()), - ) - planID := task.GetPlanID() - if nodePlan, ok := handler.compactionResults[task.GetPlanID()]; ok { - if nodePlan.B.GetState() == commonpb.CompactionState_Executing { - log.RatedInfo(1, "compaction timeout in DataCoord yet DataNode is still running") - } - } else { - // compaction task in DC but not found in DN means the compaction plan has failed - log.Info("compaction failed for timeout") - handler.plans[planID] = task.ShadowClone(setState(datapb.CompactionTaskState_failed), endSpan()) - handler.setSegmentsCompacting(task, false) - handler.scheduler.Finish(task.GetNodeID(), task) - } - return nil -} - -func (task *defaultCompactionTask) GetCollectionID() int64 { - return task.CompactionTask.GetCollectionID() -} - -func (task *defaultCompactionTask) GetPartitionID() int64 { - return task.CompactionTask.GetPartitionID() -} - -func (task *defaultCompactionTask) GetTriggerID() int64 { - return task.CompactionTask.GetTriggerID() -} - -func (task *defaultCompactionTask) GetSpan() trace.Span { - return task.span -} - -func (task *defaultCompactionTask) GetType() datapb.CompactionType { - return task.CompactionTask.GetType() -} - -func (task *defaultCompactionTask) GetChannel() string { - return task.CompactionTask.GetChannel() -} - -func (task *defaultCompactionTask) GetPlanID() int64 { - return task.CompactionTask.GetPlanID() -} - -func (task *defaultCompactionTask) GetResult() *datapb.CompactionPlanResult { - return task.result -} - -func (task *defaultCompactionTask) GetNodeID() int64 { - return task.dataNodeID -} - -func (task *defaultCompactionTask) GetState() datapb.CompactionTaskState { - return task.CompactionTask.GetState() -} - -func (task *defaultCompactionTask) GetPlan() *datapb.CompactionPlan { - return task.plan -} - -func (task *defaultCompactionTask) ShadowClone(opts ...compactionTaskOpt) CompactionTask { - taskClone := &defaultCompactionTask{ - CompactionTask: task.CompactionTask, - plan: task.plan, - dataNodeID: task.dataNodeID, - span: task.span, - result: task.result, - } - for _, opt := range opts { - opt(taskClone) - } - return taskClone -} - -func (task *defaultCompactionTask) EndSpan() { - if task.span != nil { - task.span.End() - } -} - -func (task *defaultCompactionTask) SetNodeID(nodeID int64) { - task.dataNodeID = nodeID -} - -func (task *defaultCompactionTask) SetState(state datapb.CompactionTaskState) { - task.State = state -} - -func (task *defaultCompactionTask) SetTask(ct *datapb.CompactionTask) { - task.CompactionTask = ct -} - -func (task *defaultCompactionTask) SetStartTime(startTime uint64) { - task.StartTime = startTime -} - -func (task *defaultCompactionTask) SetResult(result *datapb.CompactionPlanResult) { - task.result = result -} - -func (task *defaultCompactionTask) SetSpan(span trace.Span) { - task.span = span -} - -func (task *defaultCompactionTask) SetPlan(plan *datapb.CompactionPlan) { - task.plan = plan -} - -func (task *defaultCompactionTask) CleanLogPath() { - if task.plan.GetSegmentBinlogs() != nil { - for _, binlogs := range task.plan.GetSegmentBinlogs() { - binlogs.FieldBinlogs = nil - binlogs.Field2StatslogPaths = nil - binlogs.Deltalogs = nil - } - } - if task.result.GetSegments() != nil { - for _, segment := range task.result.GetSegments() { - segment.InsertLogs = nil - segment.Deltalogs = nil - segment.Field2StatslogPaths = nil - } - } -} - -func (task *defaultCompactionTask) BuildCompactionRequest(handler *compactionPlanHandler) (*datapb.CompactionPlan, error) { - plan := &datapb.CompactionPlan{ - PlanID: task.GetPlanID(), - StartTime: task.GetStartTime(), - TimeoutInSeconds: task.GetTimeoutInSeconds(), - Type: task.GetType(), - Channel: task.GetChannel(), - CollectionTtl: task.GetCollectionTtl(), - TotalRows: task.GetTotalRows(), - } - log := log.With(zap.Int64("taskID", task.GetTriggerID()), zap.Int64("planID", plan.GetPlanID())) - if task.GetType() == datapb.CompactionType_Level0DeleteCompaction { - for _, segID := range task.GetInputSegments() { - segInfo := handler.meta.GetHealthySegment(segID) - if segInfo == nil { - return nil, merr.WrapErrSegmentNotFound(segID) - } - plan.SegmentBinlogs = append(plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{ - SegmentID: segID, - CollectionID: segInfo.GetCollectionID(), - PartitionID: segInfo.GetPartitionID(), - Level: segInfo.GetLevel(), - InsertChannel: segInfo.GetInsertChannel(), - Deltalogs: segInfo.GetDeltalogs(), - }) - } - - // Select sealed L1 segments for LevelZero compaction that meets the condition: - // dmlPos < triggerInfo.pos - sealedSegments := handler.meta.SelectSegments(WithCollection(task.GetCollectionID()), SegmentFilterFunc(func(info *SegmentInfo) bool { - return (task.GetPartitionID() == -1 || info.GetPartitionID() == task.GetPartitionID()) && - info.GetInsertChannel() == plan.GetChannel() && - isFlushState(info.GetState()) && - !info.isCompacting && - !info.GetIsImporting() && - info.GetLevel() != datapb.SegmentLevel_L0 && - info.GetDmlPosition().GetTimestamp() < task.GetPos().GetTimestamp() - })) - if len(sealedSegments) == 0 { - return nil, errors.Errorf("Selected zero L1/L2 segments for the position=%v", task.GetPos()) - } - - sealedSegBinlogs := lo.Map(sealedSegments, func(segInfo *SegmentInfo, _ int) *datapb.CompactionSegmentBinlogs { - return &datapb.CompactionSegmentBinlogs{ - SegmentID: segInfo.GetID(), - Level: segInfo.GetLevel(), - CollectionID: segInfo.GetCollectionID(), - PartitionID: segInfo.GetPartitionID(), - // no need for binlogs info, as L0Compaction only append deltalogs on existing segments - } - }) - - plan.SegmentBinlogs = append(plan.SegmentBinlogs, sealedSegBinlogs...) - log.Info("Compaction handler refreshed level zero compaction plan", - zap.Any("target position", task.GetPos()), - zap.Any("target segments count", len(sealedSegBinlogs))) - return nil, nil - } - - if task.GetType() == datapb.CompactionType_MixCompaction { - segIDMap := make(map[int64][]*datapb.FieldBinlog, len(plan.SegmentBinlogs)) - for _, segID := range task.GetInputSegments() { - segInfo := handler.meta.GetHealthySegment(segID) - if segInfo == nil { - return nil, merr.WrapErrSegmentNotFound(segID) - } - plan.SegmentBinlogs = append(plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{ - SegmentID: segID, - CollectionID: segInfo.GetCollectionID(), - PartitionID: segInfo.GetPartitionID(), - Level: segInfo.GetLevel(), - InsertChannel: segInfo.GetInsertChannel(), - FieldBinlogs: segInfo.GetBinlogs(), - Field2StatslogPaths: segInfo.GetStatslogs(), - Deltalogs: segInfo.GetDeltalogs(), - }) - segIDMap[segID] = segInfo.GetDeltalogs() - } - log.Info("Compaction handler refreshed mix compaction plan", zap.Any("segID2DeltaLogs", segIDMap)) - } - handler.plans[task.GetPlanID()] = task.ShadowClone(setPlan(plan)) - return plan, nil -} diff --git a/internal/datacoord/compaction_task_l0.go b/internal/datacoord/compaction_task_l0.go index 438109b7998af..ef95dd565394d 100644 --- a/internal/datacoord/compaction_task_l0.go +++ b/internal/datacoord/compaction_task_l0.go @@ -237,7 +237,9 @@ func (task *l0CompactionTask) BuildCompactionRequest(handler *compactionPlanHand Channel: task.GetChannel(), CollectionTtl: task.GetCollectionTtl(), TotalRows: task.GetTotalRows(), + Schema: task.GetSchema(), } + log := log.With(zap.Int64("taskID", task.GetTriggerID()), zap.Int64("planID", plan.GetPlanID())) for _, segID := range task.GetInputSegments() { segInfo := handler.meta.GetHealthySegment(segID) diff --git a/internal/datacoord/compaction_task_mix.go b/internal/datacoord/compaction_task_mix.go index 2479728602b90..36108136af183 100644 --- a/internal/datacoord/compaction_task_mix.go +++ b/internal/datacoord/compaction_task_mix.go @@ -238,6 +238,7 @@ func (task *mixCompactionTask) BuildCompactionRequest(handler *compactionPlanHan Channel: task.GetChannel(), CollectionTtl: task.GetCollectionTtl(), TotalRows: task.GetTotalRows(), + Schema: task.GetSchema(), } log := log.With(zap.Int64("taskID", task.GetTriggerID()), zap.Int64("planID", plan.GetPlanID())) diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 84b2f22405ee5..7ddf07a836d50 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -446,7 +446,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error { start := time.Now() planID := currentID currentID++ - task := &defaultCompactionTask{ + task := &mixCompactionTask{ CompactionTask: &datapb.CompactionTask{ PlanID: planID, TriggerID: signal.id, @@ -460,6 +460,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error { Channel: group.channelName, InputSegments: segIDs, TotalRows: totalRows, + Schema: coll.Schema, }, } err := t.compactionHandler.enqueueCompaction(task) @@ -564,7 +565,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) { start := time.Now() planID := currentID currentID++ - if err := t.compactionHandler.enqueueCompaction(&defaultCompactionTask{ + if err := t.compactionHandler.enqueueCompaction(&mixCompactionTask{ CompactionTask: &datapb.CompactionTask{ PlanID: planID, TriggerID: signal.id, @@ -578,7 +579,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) { Channel: channel, InputSegments: segmentIDS, TotalRows: totalRows, - // collectionSchema + Schema: coll.Schema, }, }); err != nil { log.Warn("failed to execute compaction task", diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index b70eeaa706a9f..0d9748464354f 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -2,13 +2,10 @@ package datacoord import ( "context" - "sync" - "time" - "github.com/samber/lo" "go.uber.org/zap" + "sync" - "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/lock" @@ -189,6 +186,12 @@ func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(ctx context.Context, return segView.ID }) + collection, err := m.handler.GetCollection(ctx, view.GetGroupLabel().CollectionID) + if err != nil { + log.Warn("fail to submit compaction view to scheduler because get collection fail", zap.String("view", view.String())) + return + } + task := &datapb.CompactionTask{ TriggerID: taskID, // inner trigger, use task id as trigger id PlanID: taskID, @@ -199,7 +202,7 @@ func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(ctx context.Context, PartitionID: view.GetGroupLabel().PartitionID, Pos: view.(*LevelZeroSegmentsView).earliestGrowingSegmentPos, TimeoutInSeconds: Params.DataCoordCfg.CompactionTimeoutInSeconds.GetAsInt32(), - // collectionSchema todo wayblink + Schema: collection.Schema, } err = m.compactionHandler.enqueueCompaction(&l0CompactionTask{ @@ -225,6 +228,11 @@ func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.C return } view.GetSegmentsView() + collection, err := m.handler.GetCollection(ctx, view.GetGroupLabel().CollectionID) + if err != nil { + log.Warn("fail to submit compaction view to scheduler because get collection fail", zap.String("view", view.String())) + return + } _, totalRows, maxSegmentRows, preferSegmentRows := calculateClusteringCompactionConfig(view) task := &datapb.CompactionTask{ PlanID: taskID, @@ -237,6 +245,7 @@ func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.C CollectionID: view.GetGroupLabel().CollectionID, PartitionID: view.GetGroupLabel().PartitionID, Channel: view.GetGroupLabel().Channel, + Schema: collection.Schema, ClusteringKeyField: view.(*ClusteringSegmentsView).clusteringKeyField, InputSegments: lo.Map(view.GetSegmentsView(), func(segmentView *SegmentView, _ int) int64 { return segmentView.ID }), MaxSegmentRows: maxSegmentRows, @@ -280,17 +289,3 @@ type chanPartSegments struct { channelName string segments []*SegmentInfo } - -func fillOriginPlan(schema *schemapb.CollectionSchema, alloc allocator, plan *datapb.CompactionPlan) error { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - id, err := alloc.allocID(ctx) - if err != nil { - return err - } - - plan.PlanID = id - plan.TimeoutInSeconds = Params.DataCoordCfg.CompactionTimeoutInSeconds.GetAsInt32() - plan.Schema = schema - return nil -} diff --git a/internal/datacoord/mock_channel_store.go b/internal/datacoord/mock_channel_store.go index fc7cb51ef3e92..bbcd2921123f4 100644 --- a/internal/datacoord/mock_channel_store.go +++ b/internal/datacoord/mock_channel_store.go @@ -179,50 +179,6 @@ func (_c *MockRWChannelStore_GetNodeChannelCount_Call) RunAndReturn(run func(int return _c } -// GetNodeChannelsByCollectionID provides a mock function with given fields: collectionID -func (_m *MockRWChannelStore) GetNodeChannelsByCollectionID(collectionID int64) map[int64][]string { - ret := _m.Called(collectionID) - - var r0 map[int64][]string - if rf, ok := ret.Get(0).(func(int64) map[int64][]string); ok { - r0 = rf(collectionID) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(map[int64][]string) - } - } - - return r0 -} - -// MockRWChannelStore_GetNodeChannelsByCollectionID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNodeChannelsByCollectionID' -type MockRWChannelStore_GetNodeChannelsByCollectionID_Call struct { - *mock.Call -} - -// GetNodeChannelsByCollectionID is a helper method to define mock.On call -// - collectionID int64 -func (_e *MockRWChannelStore_Expecter) GetNodeChannelsByCollectionID(collectionID interface{}) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call { - return &MockRWChannelStore_GetNodeChannelsByCollectionID_Call{Call: _e.mock.On("GetNodeChannelsByCollectionID", collectionID)} -} - -func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) Run(run func(collectionID int64)) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(int64)) - }) - return _c -} - -func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) Return(_a0 map[int64][]string) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) RunAndReturn(run func(int64) map[int64][]string) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call { - _c.Call.Return(run) - return _c -} - // GetNodeChannelsBy provides a mock function with given fields: nodeSelector, channelSelectors func (_m *MockRWChannelStore) GetNodeChannelsBy(nodeSelector NodeSelector, channelSelectors ...ChannelSelector) []*NodeChannelInfo { _va := make([]interface{}, len(channelSelectors)) @@ -282,6 +238,50 @@ func (_c *MockRWChannelStore_GetNodeChannelsBy_Call) RunAndReturn(run func(NodeS return _c } +// GetNodeChannelsByCollectionID provides a mock function with given fields: collectionID +func (_m *MockRWChannelStore) GetNodeChannelsByCollectionID(collectionID int64) map[int64][]string { + ret := _m.Called(collectionID) + + var r0 map[int64][]string + if rf, ok := ret.Get(0).(func(int64) map[int64][]string); ok { + r0 = rf(collectionID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[int64][]string) + } + } + + return r0 +} + +// MockRWChannelStore_GetNodeChannelsByCollectionID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNodeChannelsByCollectionID' +type MockRWChannelStore_GetNodeChannelsByCollectionID_Call struct { + *mock.Call +} + +// GetNodeChannelsByCollectionID is a helper method to define mock.On call +// - collectionID int64 +func (_e *MockRWChannelStore_Expecter) GetNodeChannelsByCollectionID(collectionID interface{}) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call { + return &MockRWChannelStore_GetNodeChannelsByCollectionID_Call{Call: _e.mock.On("GetNodeChannelsByCollectionID", collectionID)} +} + +func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) Run(run func(collectionID int64)) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64)) + }) + return _c +} + +func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) Return(_a0 map[int64][]string) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) RunAndReturn(run func(int64) map[int64][]string) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call { + _c.Call.Return(run) + return _c +} + // GetNodes provides a mock function with given fields: func (_m *MockRWChannelStore) GetNodes() []int64 { ret := _m.Called() diff --git a/internal/datacoord/mock_channelmanager.go b/internal/datacoord/mock_channelmanager.go index e8b4ebe897a41..5239ab6e910be 100644 --- a/internal/datacoord/mock_channelmanager.go +++ b/internal/datacoord/mock_channelmanager.go @@ -376,58 +376,6 @@ func (_c *MockChannelManager_GetNodeChannelsByCollectionID_Call) RunAndReturn(ru return _c } -// GetNodeIDByChannelName provides a mock function with given fields: channel -func (_m *MockChannelManager) GetNodeIDByChannelName(channel string) (int64, bool) { - ret := _m.Called(channel) - - var r0 int64 - var r1 bool - if rf, ok := ret.Get(0).(func(string) (int64, bool)); ok { - return rf(channel) - } - if rf, ok := ret.Get(0).(func(string) int64); ok { - r0 = rf(channel) - } else { - r0 = ret.Get(0).(int64) - } - - if rf, ok := ret.Get(1).(func(string) bool); ok { - r1 = rf(channel) - } else { - r1 = ret.Get(1).(bool) - } - - return r0, r1 -} - -// MockChannelManager_GetNodeIDByChannelName_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNodeIDByChannelName' -type MockChannelManager_GetNodeIDByChannelName_Call struct { - *mock.Call -} - -// GetNodeIDByChannelName is a helper method to define mock.On call -// - channel string -func (_e *MockChannelManager_Expecter) GetNodeIDByChannelName(channel interface{}) *MockChannelManager_GetNodeIDByChannelName_Call { - return &MockChannelManager_GetNodeIDByChannelName_Call{Call: _e.mock.On("GetNodeIDByChannelName", channel)} -} - -func (_c *MockChannelManager_GetNodeIDByChannelName_Call) Run(run func(channel string)) *MockChannelManager_GetNodeIDByChannelName_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(string)) - }) - return _c -} - -func (_c *MockChannelManager_GetNodeIDByChannelName_Call) Return(_a0 int64, _a1 bool) *MockChannelManager_GetNodeIDByChannelName_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *MockChannelManager_GetNodeIDByChannelName_Call) RunAndReturn(run func(string) (int64, bool)) *MockChannelManager_GetNodeIDByChannelName_Call { - _c.Call.Return(run) - return _c -} - // Match provides a mock function with given fields: nodeID, channel func (_m *MockChannelManager) Match(nodeID int64, channel string) bool { ret := _m.Called(nodeID, channel) diff --git a/internal/datacoord/mock_cluster.go b/internal/datacoord/mock_cluster.go index e92ae8ecb3c28..886de279abf8f 100644 --- a/internal/datacoord/mock_cluster.go +++ b/internal/datacoord/mock_cluster.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.1. DO NOT EDIT. +// Code generated by mockery v2.32.4. DO NOT EDIT. package datacoord diff --git a/internal/datacoord/mock_compaction_meta.go b/internal/datacoord/mock_compaction_meta.go index b905124d7d4cc..7f5a868d9d45d 100644 --- a/internal/datacoord/mock_compaction_meta.go +++ b/internal/datacoord/mock_compaction_meta.go @@ -20,6 +20,48 @@ func (_m *MockCompactionMeta) EXPECT() *MockCompactionMeta_Expecter { return &MockCompactionMeta_Expecter{mock: &_m.Mock} } +// CheckAndSetSegmentsCompacting provides a mock function with given fields: segmentIDs +func (_m *MockCompactionMeta) CheckAndSetSegmentsCompacting(segmentIDs []int64) bool { + ret := _m.Called(segmentIDs) + + var r0 bool + if rf, ok := ret.Get(0).(func([]int64) bool); ok { + r0 = rf(segmentIDs) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// MockCompactionMeta_CheckAndSetSegmentsCompacting_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckAndSetSegmentsCompacting' +type MockCompactionMeta_CheckAndSetSegmentsCompacting_Call struct { + *mock.Call +} + +// CheckAndSetSegmentsCompacting is a helper method to define mock.On call +// - segmentIDs []int64 +func (_e *MockCompactionMeta_Expecter) CheckAndSetSegmentsCompacting(segmentIDs interface{}) *MockCompactionMeta_CheckAndSetSegmentsCompacting_Call { + return &MockCompactionMeta_CheckAndSetSegmentsCompacting_Call{Call: _e.mock.On("CheckAndSetSegmentsCompacting", segmentIDs)} +} + +func (_c *MockCompactionMeta_CheckAndSetSegmentsCompacting_Call) Run(run func(segmentIDs []int64)) *MockCompactionMeta_CheckAndSetSegmentsCompacting_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].([]int64)) + }) + return _c +} + +func (_c *MockCompactionMeta_CheckAndSetSegmentsCompacting_Call) Return(_a0 bool) *MockCompactionMeta_CheckAndSetSegmentsCompacting_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockCompactionMeta_CheckAndSetSegmentsCompacting_Call) RunAndReturn(run func([]int64) bool) *MockCompactionMeta_CheckAndSetSegmentsCompacting_Call { + _c.Call.Return(run) + return _c +} + // CompleteCompactionMutation provides a mock function with given fields: plan, result func (_m *MockCompactionMeta) CompleteCompactionMutation(plan *datapb.CompactionPlan, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) { ret := _m.Called(plan, result) @@ -84,6 +126,179 @@ func (_c *MockCompactionMeta_CompleteCompactionMutation_Call) RunAndReturn(run f return _c } +// DropClusteringCompactionTask provides a mock function with given fields: task +func (_m *MockCompactionMeta) DropClusteringCompactionTask(task *datapb.CompactionTask) error { + ret := _m.Called(task) + + var r0 error + if rf, ok := ret.Get(0).(func(*datapb.CompactionTask) error); ok { + r0 = rf(task) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockCompactionMeta_DropClusteringCompactionTask_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropClusteringCompactionTask' +type MockCompactionMeta_DropClusteringCompactionTask_Call struct { + *mock.Call +} + +// DropClusteringCompactionTask is a helper method to define mock.On call +// - task *datapb.CompactionTask +func (_e *MockCompactionMeta_Expecter) DropClusteringCompactionTask(task interface{}) *MockCompactionMeta_DropClusteringCompactionTask_Call { + return &MockCompactionMeta_DropClusteringCompactionTask_Call{Call: _e.mock.On("DropClusteringCompactionTask", task)} +} + +func (_c *MockCompactionMeta_DropClusteringCompactionTask_Call) Run(run func(task *datapb.CompactionTask)) *MockCompactionMeta_DropClusteringCompactionTask_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*datapb.CompactionTask)) + }) + return _c +} + +func (_c *MockCompactionMeta_DropClusteringCompactionTask_Call) Return(_a0 error) *MockCompactionMeta_DropClusteringCompactionTask_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockCompactionMeta_DropClusteringCompactionTask_Call) RunAndReturn(run func(*datapb.CompactionTask) error) *MockCompactionMeta_DropClusteringCompactionTask_Call { + _c.Call.Return(run) + return _c +} + +// GetClusteringCompactionTasks provides a mock function with given fields: +func (_m *MockCompactionMeta) GetClusteringCompactionTasks() map[int64][]*datapb.CompactionTask { + ret := _m.Called() + + var r0 map[int64][]*datapb.CompactionTask + if rf, ok := ret.Get(0).(func() map[int64][]*datapb.CompactionTask); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[int64][]*datapb.CompactionTask) + } + } + + return r0 +} + +// MockCompactionMeta_GetClusteringCompactionTasks_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetClusteringCompactionTasks' +type MockCompactionMeta_GetClusteringCompactionTasks_Call struct { + *mock.Call +} + +// GetClusteringCompactionTasks is a helper method to define mock.On call +func (_e *MockCompactionMeta_Expecter) GetClusteringCompactionTasks() *MockCompactionMeta_GetClusteringCompactionTasks_Call { + return &MockCompactionMeta_GetClusteringCompactionTasks_Call{Call: _e.mock.On("GetClusteringCompactionTasks")} +} + +func (_c *MockCompactionMeta_GetClusteringCompactionTasks_Call) Run(run func()) *MockCompactionMeta_GetClusteringCompactionTasks_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockCompactionMeta_GetClusteringCompactionTasks_Call) Return(_a0 map[int64][]*datapb.CompactionTask) *MockCompactionMeta_GetClusteringCompactionTasks_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockCompactionMeta_GetClusteringCompactionTasks_Call) RunAndReturn(run func() map[int64][]*datapb.CompactionTask) *MockCompactionMeta_GetClusteringCompactionTasks_Call { + _c.Call.Return(run) + return _c +} + +// GetClusteringCompactionTasksByCollection provides a mock function with given fields: collectionID +func (_m *MockCompactionMeta) GetClusteringCompactionTasksByCollection(collectionID int64) map[int64][]*datapb.CompactionTask { + ret := _m.Called(collectionID) + + var r0 map[int64][]*datapb.CompactionTask + if rf, ok := ret.Get(0).(func(int64) map[int64][]*datapb.CompactionTask); ok { + r0 = rf(collectionID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[int64][]*datapb.CompactionTask) + } + } + + return r0 +} + +// MockCompactionMeta_GetClusteringCompactionTasksByCollection_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetClusteringCompactionTasksByCollection' +type MockCompactionMeta_GetClusteringCompactionTasksByCollection_Call struct { + *mock.Call +} + +// GetClusteringCompactionTasksByCollection is a helper method to define mock.On call +// - collectionID int64 +func (_e *MockCompactionMeta_Expecter) GetClusteringCompactionTasksByCollection(collectionID interface{}) *MockCompactionMeta_GetClusteringCompactionTasksByCollection_Call { + return &MockCompactionMeta_GetClusteringCompactionTasksByCollection_Call{Call: _e.mock.On("GetClusteringCompactionTasksByCollection", collectionID)} +} + +func (_c *MockCompactionMeta_GetClusteringCompactionTasksByCollection_Call) Run(run func(collectionID int64)) *MockCompactionMeta_GetClusteringCompactionTasksByCollection_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64)) + }) + return _c +} + +func (_c *MockCompactionMeta_GetClusteringCompactionTasksByCollection_Call) Return(_a0 map[int64][]*datapb.CompactionTask) *MockCompactionMeta_GetClusteringCompactionTasksByCollection_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockCompactionMeta_GetClusteringCompactionTasksByCollection_Call) RunAndReturn(run func(int64) map[int64][]*datapb.CompactionTask) *MockCompactionMeta_GetClusteringCompactionTasksByCollection_Call { + _c.Call.Return(run) + return _c +} + +// GetClusteringCompactionTasksByTriggerID provides a mock function with given fields: triggerID +func (_m *MockCompactionMeta) GetClusteringCompactionTasksByTriggerID(triggerID int64) []*datapb.CompactionTask { + ret := _m.Called(triggerID) + + var r0 []*datapb.CompactionTask + if rf, ok := ret.Get(0).(func(int64) []*datapb.CompactionTask); ok { + r0 = rf(triggerID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*datapb.CompactionTask) + } + } + + return r0 +} + +// MockCompactionMeta_GetClusteringCompactionTasksByTriggerID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetClusteringCompactionTasksByTriggerID' +type MockCompactionMeta_GetClusteringCompactionTasksByTriggerID_Call struct { + *mock.Call +} + +// GetClusteringCompactionTasksByTriggerID is a helper method to define mock.On call +// - triggerID int64 +func (_e *MockCompactionMeta_Expecter) GetClusteringCompactionTasksByTriggerID(triggerID interface{}) *MockCompactionMeta_GetClusteringCompactionTasksByTriggerID_Call { + return &MockCompactionMeta_GetClusteringCompactionTasksByTriggerID_Call{Call: _e.mock.On("GetClusteringCompactionTasksByTriggerID", triggerID)} +} + +func (_c *MockCompactionMeta_GetClusteringCompactionTasksByTriggerID_Call) Run(run func(triggerID int64)) *MockCompactionMeta_GetClusteringCompactionTasksByTriggerID_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64)) + }) + return _c +} + +func (_c *MockCompactionMeta_GetClusteringCompactionTasksByTriggerID_Call) Return(_a0 []*datapb.CompactionTask) *MockCompactionMeta_GetClusteringCompactionTasksByTriggerID_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockCompactionMeta_GetClusteringCompactionTasksByTriggerID_Call) RunAndReturn(run func(int64) []*datapb.CompactionTask) *MockCompactionMeta_GetClusteringCompactionTasksByTriggerID_Call { + _c.Call.Return(run) + return _c +} + // GetHealthySegment provides a mock function with given fields: segID func (_m *MockCompactionMeta) GetHealthySegment(segID int64) *SegmentInfo { ret := _m.Called(segID) @@ -128,6 +343,92 @@ func (_c *MockCompactionMeta_GetHealthySegment_Call) RunAndReturn(run func(int64 return _c } +// GetSegment provides a mock function with given fields: segID +func (_m *MockCompactionMeta) GetSegment(segID int64) *SegmentInfo { + ret := _m.Called(segID) + + var r0 *SegmentInfo + if rf, ok := ret.Get(0).(func(int64) *SegmentInfo); ok { + r0 = rf(segID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*SegmentInfo) + } + } + + return r0 +} + +// MockCompactionMeta_GetSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSegment' +type MockCompactionMeta_GetSegment_Call struct { + *mock.Call +} + +// GetSegment is a helper method to define mock.On call +// - segID int64 +func (_e *MockCompactionMeta_Expecter) GetSegment(segID interface{}) *MockCompactionMeta_GetSegment_Call { + return &MockCompactionMeta_GetSegment_Call{Call: _e.mock.On("GetSegment", segID)} +} + +func (_c *MockCompactionMeta_GetSegment_Call) Run(run func(segID int64)) *MockCompactionMeta_GetSegment_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64)) + }) + return _c +} + +func (_c *MockCompactionMeta_GetSegment_Call) Return(_a0 *SegmentInfo) *MockCompactionMeta_GetSegment_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockCompactionMeta_GetSegment_Call) RunAndReturn(run func(int64) *SegmentInfo) *MockCompactionMeta_GetSegment_Call { + _c.Call.Return(run) + return _c +} + +// SaveClusteringCompactionTask provides a mock function with given fields: task +func (_m *MockCompactionMeta) SaveClusteringCompactionTask(task *datapb.CompactionTask) error { + ret := _m.Called(task) + + var r0 error + if rf, ok := ret.Get(0).(func(*datapb.CompactionTask) error); ok { + r0 = rf(task) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockCompactionMeta_SaveClusteringCompactionTask_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SaveClusteringCompactionTask' +type MockCompactionMeta_SaveClusteringCompactionTask_Call struct { + *mock.Call +} + +// SaveClusteringCompactionTask is a helper method to define mock.On call +// - task *datapb.CompactionTask +func (_e *MockCompactionMeta_Expecter) SaveClusteringCompactionTask(task interface{}) *MockCompactionMeta_SaveClusteringCompactionTask_Call { + return &MockCompactionMeta_SaveClusteringCompactionTask_Call{Call: _e.mock.On("SaveClusteringCompactionTask", task)} +} + +func (_c *MockCompactionMeta_SaveClusteringCompactionTask_Call) Run(run func(task *datapb.CompactionTask)) *MockCompactionMeta_SaveClusteringCompactionTask_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*datapb.CompactionTask)) + }) + return _c +} + +func (_c *MockCompactionMeta_SaveClusteringCompactionTask_Call) Return(_a0 error) *MockCompactionMeta_SaveClusteringCompactionTask_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockCompactionMeta_SaveClusteringCompactionTask_Call) RunAndReturn(run func(*datapb.CompactionTask) error) *MockCompactionMeta_SaveClusteringCompactionTask_Call { + _c.Call.Return(run) + return _c +} + // SelectSegments provides a mock function with given fields: filters func (_m *MockCompactionMeta) SelectSegments(filters ...SegmentFilter) []*SegmentInfo { _va := make([]interface{}, len(filters)) diff --git a/internal/datacoord/mock_compaction_plan_context.go b/internal/datacoord/mock_compaction_plan_context.go index 6af810078bd53..d6eed38ea98ee 100644 --- a/internal/datacoord/mock_compaction_plan_context.go +++ b/internal/datacoord/mock_compaction_plan_context.go @@ -1,11 +1,8 @@ -// Code generated by mockery v2.30.1. DO NOT EDIT. +// Code generated by mockery v2.32.4. DO NOT EDIT. package datacoord -import ( - datapb "github.com/milvus-io/milvus/internal/proto/datapb" - mock "github.com/stretchr/testify/mock" -) +import mock "github.com/stretchr/testify/mock" // MockCompactionPlanContext is an autogenerated mock type for the compactionPlanContext type type MockCompactionPlanContext struct { @@ -20,50 +17,58 @@ func (_m *MockCompactionPlanContext) EXPECT() *MockCompactionPlanContext_Expecte return &MockCompactionPlanContext_Expecter{mock: &_m.Mock} } -// execCompactionPlan provides a mock function with given fields: signal, plan -func (_m *MockCompactionPlanContext) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) { - _m.Called(signal, plan) +// enqueueCompaction provides a mock function with given fields: task +func (_m *MockCompactionPlanContext) enqueueCompaction(task CompactionTask) error { + ret := _m.Called(task) + + var r0 error + if rf, ok := ret.Get(0).(func(CompactionTask) error); ok { + r0 = rf(task) + } else { + r0 = ret.Error(0) + } + + return r0 } -// MockCompactionPlanContext_execCompactionPlan_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'execCompactionPlan' -type MockCompactionPlanContext_execCompactionPlan_Call struct { +// MockCompactionPlanContext_enqueueCompaction_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'enqueueCompaction' +type MockCompactionPlanContext_enqueueCompaction_Call struct { *mock.Call } -// execCompactionPlan is a helper method to define mock.On call -// - signal *compactionSignal -// - plan *datapb.CompactionPlan -func (_e *MockCompactionPlanContext_Expecter) execCompactionPlan(signal interface{}, plan interface{}) *MockCompactionPlanContext_execCompactionPlan_Call { - return &MockCompactionPlanContext_execCompactionPlan_Call{Call: _e.mock.On("execCompactionPlan", signal, plan)} +// enqueueCompaction is a helper method to define mock.On call +// - task CompactionTask +func (_e *MockCompactionPlanContext_Expecter) enqueueCompaction(task interface{}) *MockCompactionPlanContext_enqueueCompaction_Call { + return &MockCompactionPlanContext_enqueueCompaction_Call{Call: _e.mock.On("enqueueCompaction", task)} } -func (_c *MockCompactionPlanContext_execCompactionPlan_Call) Run(run func(signal *compactionSignal, plan *datapb.CompactionPlan)) *MockCompactionPlanContext_execCompactionPlan_Call { +func (_c *MockCompactionPlanContext_enqueueCompaction_Call) Run(run func(task CompactionTask)) *MockCompactionPlanContext_enqueueCompaction_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(*compactionSignal), args[1].(*datapb.CompactionPlan)) + run(args[0].(CompactionTask)) }) return _c } -func (_c *MockCompactionPlanContext_execCompactionPlan_Call) Return() *MockCompactionPlanContext_execCompactionPlan_Call { - _c.Call.Return() +func (_c *MockCompactionPlanContext_enqueueCompaction_Call) Return(_a0 error) *MockCompactionPlanContext_enqueueCompaction_Call { + _c.Call.Return(_a0) return _c } -func (_c *MockCompactionPlanContext_execCompactionPlan_Call) RunAndReturn(run func(*compactionSignal, *datapb.CompactionPlan)) *MockCompactionPlanContext_execCompactionPlan_Call { +func (_c *MockCompactionPlanContext_enqueueCompaction_Call) RunAndReturn(run func(CompactionTask) error) *MockCompactionPlanContext_enqueueCompaction_Call { _c.Call.Return(run) return _c } // getCompaction provides a mock function with given fields: planID -func (_m *MockCompactionPlanContext) getCompaction(planID int64) *defaultCompactionTask { +func (_m *MockCompactionPlanContext) getCompaction(planID int64) CompactionTask { ret := _m.Called(planID) - var r0 *defaultCompactionTask - if rf, ok := ret.Get(0).(func(int64) *defaultCompactionTask); ok { + var r0 CompactionTask + if rf, ok := ret.Get(0).(func(int64) CompactionTask); ok { r0 = rf(planID) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*defaultCompactionTask) + r0 = ret.Get(0).(CompactionTask) } } @@ -76,7 +81,7 @@ type MockCompactionPlanContext_getCompaction_Call struct { } // getCompaction is a helper method to define mock.On call -// - planID int64 +// - planID int64 func (_e *MockCompactionPlanContext_Expecter) getCompaction(planID interface{}) *MockCompactionPlanContext_getCompaction_Call { return &MockCompactionPlanContext_getCompaction_Call{Call: _e.mock.On("getCompaction", planID)} } @@ -88,26 +93,26 @@ func (_c *MockCompactionPlanContext_getCompaction_Call) Run(run func(planID int6 return _c } -func (_c *MockCompactionPlanContext_getCompaction_Call) Return(_a0 *defaultCompactionTask) *MockCompactionPlanContext_getCompaction_Call { +func (_c *MockCompactionPlanContext_getCompaction_Call) Return(_a0 CompactionTask) *MockCompactionPlanContext_getCompaction_Call { _c.Call.Return(_a0) return _c } -func (_c *MockCompactionPlanContext_getCompaction_Call) RunAndReturn(run func(int64) *defaultCompactionTask) *MockCompactionPlanContext_getCompaction_Call { +func (_c *MockCompactionPlanContext_getCompaction_Call) RunAndReturn(run func(int64) CompactionTask) *MockCompactionPlanContext_getCompaction_Call { _c.Call.Return(run) return _c } // getCompactionTasksBySignalID provides a mock function with given fields: signalID -func (_m *MockCompactionPlanContext) getCompactionTasksBySignalID(signalID int64) []*defaultCompactionTask { +func (_m *MockCompactionPlanContext) getCompactionTasksBySignalID(signalID int64) []CompactionTask { ret := _m.Called(signalID) - var r0 []*defaultCompactionTask - if rf, ok := ret.Get(0).(func(int64) []*defaultCompactionTask); ok { + var r0 []CompactionTask + if rf, ok := ret.Get(0).(func(int64) []CompactionTask); ok { r0 = rf(signalID) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).([]*defaultCompactionTask) + r0 = ret.Get(0).([]CompactionTask) } } @@ -120,7 +125,7 @@ type MockCompactionPlanContext_getCompactionTasksBySignalID_Call struct { } // getCompactionTasksBySignalID is a helper method to define mock.On call -// - signalID int64 +// - signalID int64 func (_e *MockCompactionPlanContext_Expecter) getCompactionTasksBySignalID(signalID interface{}) *MockCompactionPlanContext_getCompactionTasksBySignalID_Call { return &MockCompactionPlanContext_getCompactionTasksBySignalID_Call{Call: _e.mock.On("getCompactionTasksBySignalID", signalID)} } @@ -132,12 +137,12 @@ func (_c *MockCompactionPlanContext_getCompactionTasksBySignalID_Call) Run(run f return _c } -func (_c *MockCompactionPlanContext_getCompactionTasksBySignalID_Call) Return(_a0 []*defaultCompactionTask) *MockCompactionPlanContext_getCompactionTasksBySignalID_Call { +func (_c *MockCompactionPlanContext_getCompactionTasksBySignalID_Call) Return(_a0 []CompactionTask) *MockCompactionPlanContext_getCompactionTasksBySignalID_Call { _c.Call.Return(_a0) return _c } -func (_c *MockCompactionPlanContext_getCompactionTasksBySignalID_Call) RunAndReturn(run func(int64) []*defaultCompactionTask) *MockCompactionPlanContext_getCompactionTasksBySignalID_Call { +func (_c *MockCompactionPlanContext_getCompactionTasksBySignalID_Call) RunAndReturn(run func(int64) []CompactionTask) *MockCompactionPlanContext_getCompactionTasksBySignalID_Call { _c.Call.Return(run) return _c } @@ -194,7 +199,7 @@ type MockCompactionPlanContext_removeTasksByChannel_Call struct { } // removeTasksByChannel is a helper method to define mock.On call -// - channel string +// - channel string func (_e *MockCompactionPlanContext_Expecter) removeTasksByChannel(channel interface{}) *MockCompactionPlanContext_removeTasksByChannel_Call { return &MockCompactionPlanContext_removeTasksByChannel_Call{Call: _e.mock.On("removeTasksByChannel", channel)} } @@ -300,7 +305,7 @@ type MockCompactionPlanContext_updateCompaction_Call struct { } // updateCompaction is a helper method to define mock.On call -// - ts uint64 +// - ts uint64 func (_e *MockCompactionPlanContext_Expecter) updateCompaction(ts interface{}) *MockCompactionPlanContext_updateCompaction_Call { return &MockCompactionPlanContext_updateCompaction_Call{Call: _e.mock.On("updateCompaction", ts)} } diff --git a/internal/datacoord/mock_scheduler.go b/internal/datacoord/mock_scheduler.go index 44c64b0545f5d..2dd1efa432987 100644 --- a/internal/datacoord/mock_scheduler.go +++ b/internal/datacoord/mock_scheduler.go @@ -2,10 +2,7 @@ package datacoord -import ( - datapb "github.com/milvus-io/milvus/internal/proto/datapb" - mock "github.com/stretchr/testify/mock" -) +import mock "github.com/stretchr/testify/mock" // MockScheduler is an autogenerated mock type for the Scheduler type type MockScheduler struct { @@ -20,9 +17,9 @@ func (_m *MockScheduler) EXPECT() *MockScheduler_Expecter { return &MockScheduler_Expecter{mock: &_m.Mock} } -// Finish provides a mock function with given fields: nodeID, plan -func (_m *MockScheduler) Finish(nodeID int64, plan *datapb.CompactionPlan) { - _m.Called(nodeID, plan) +// Finish provides a mock function with given fields: nodeID, task +func (_m *MockScheduler) Finish(nodeID int64, task CompactionTask) { + _m.Called(nodeID, task) } // MockScheduler_Finish_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Finish' @@ -32,14 +29,14 @@ type MockScheduler_Finish_Call struct { // Finish is a helper method to define mock.On call // - nodeID int64 -// - plan *datapb.CompactionPlan -func (_e *MockScheduler_Expecter) Finish(nodeID interface{}, plan interface{}) *MockScheduler_Finish_Call { - return &MockScheduler_Finish_Call{Call: _e.mock.On("Finish", nodeID, plan)} +// - task CompactionTask +func (_e *MockScheduler_Expecter) Finish(nodeID interface{}, task interface{}) *MockScheduler_Finish_Call { + return &MockScheduler_Finish_Call{Call: _e.mock.On("Finish", nodeID, task)} } -func (_c *MockScheduler_Finish_Call) Run(run func(nodeID int64, plan *datapb.CompactionPlan)) *MockScheduler_Finish_Call { +func (_c *MockScheduler_Finish_Call) Run(run func(nodeID int64, task CompactionTask)) *MockScheduler_Finish_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(int64), args[1].(*datapb.CompactionPlan)) + run(args[0].(int64), args[1].(CompactionTask)) }) return _c } @@ -49,7 +46,7 @@ func (_c *MockScheduler_Finish_Call) Return() *MockScheduler_Finish_Call { return _c } -func (_c *MockScheduler_Finish_Call) RunAndReturn(run func(int64, *datapb.CompactionPlan)) *MockScheduler_Finish_Call { +func (_c *MockScheduler_Finish_Call) RunAndReturn(run func(int64, CompactionTask)) *MockScheduler_Finish_Call { _c.Call.Return(run) return _c } @@ -128,15 +125,15 @@ func (_c *MockScheduler_LogStatus_Call) RunAndReturn(run func()) *MockScheduler_ } // Schedule provides a mock function with given fields: -func (_m *MockScheduler) Schedule() []*defaultCompactionTask { +func (_m *MockScheduler) Schedule() []CompactionTask { ret := _m.Called() - var r0 []*defaultCompactionTask - if rf, ok := ret.Get(0).(func() []*defaultCompactionTask); ok { + var r0 []CompactionTask + if rf, ok := ret.Get(0).(func() []CompactionTask); ok { r0 = rf() } else { if ret.Get(0) != nil { - r0 = ret.Get(0).([]*defaultCompactionTask) + r0 = ret.Get(0).([]CompactionTask) } } @@ -160,18 +157,18 @@ func (_c *MockScheduler_Schedule_Call) Run(run func()) *MockScheduler_Schedule_C return _c } -func (_c *MockScheduler_Schedule_Call) Return(_a0 []*defaultCompactionTask) *MockScheduler_Schedule_Call { +func (_c *MockScheduler_Schedule_Call) Return(_a0 []CompactionTask) *MockScheduler_Schedule_Call { _c.Call.Return(_a0) return _c } -func (_c *MockScheduler_Schedule_Call) RunAndReturn(run func() []*defaultCompactionTask) *MockScheduler_Schedule_Call { +func (_c *MockScheduler_Schedule_Call) RunAndReturn(run func() []CompactionTask) *MockScheduler_Schedule_Call { _c.Call.Return(run) return _c } // Submit provides a mock function with given fields: t -func (_m *MockScheduler) Submit(t ...*defaultCompactionTask) { +func (_m *MockScheduler) Submit(t ...CompactionTask) { _va := make([]interface{}, len(t)) for _i := range t { _va[_i] = t[_i] @@ -187,18 +184,18 @@ type MockScheduler_Submit_Call struct { } // Submit is a helper method to define mock.On call -// - t ...*defaultCompactionTask +// - t ...CompactionTask func (_e *MockScheduler_Expecter) Submit(t ...interface{}) *MockScheduler_Submit_Call { return &MockScheduler_Submit_Call{Call: _e.mock.On("Submit", append([]interface{}{}, t...)...)} } -func (_c *MockScheduler_Submit_Call) Run(run func(t ...*defaultCompactionTask)) *MockScheduler_Submit_Call { +func (_c *MockScheduler_Submit_Call) Run(run func(t ...CompactionTask)) *MockScheduler_Submit_Call { _c.Call.Run(func(args mock.Arguments) { - variadicArgs := make([]*defaultCompactionTask, len(args)-0) + variadicArgs := make([]CompactionTask, len(args)-0) for i, a := range args[0:] { if a != nil { - variadicArgs[i] = a.(*defaultCompactionTask) + variadicArgs[i] = a.(CompactionTask) } } run(variadicArgs...) @@ -211,7 +208,7 @@ func (_c *MockScheduler_Submit_Call) Return() *MockScheduler_Submit_Call { return _c } -func (_c *MockScheduler_Submit_Call) RunAndReturn(run func(...*defaultCompactionTask)) *MockScheduler_Submit_Call { +func (_c *MockScheduler_Submit_Call) RunAndReturn(run func(...CompactionTask)) *MockScheduler_Submit_Call { _c.Call.Return(run) return _c } diff --git a/internal/datacoord/mock_session_manager.go b/internal/datacoord/mock_session_manager.go index aea14b219ce03..75821392e4747 100644 --- a/internal/datacoord/mock_session_manager.go +++ b/internal/datacoord/mock_session_manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.1. DO NOT EDIT. +// Code generated by mockery v2.32.4. DO NOT EDIT. package datacoord diff --git a/internal/datacoord/mock_trigger_manager.go b/internal/datacoord/mock_trigger_manager.go index 362dfe0807f28..6342dc66aab7d 100644 --- a/internal/datacoord/mock_trigger_manager.go +++ b/internal/datacoord/mock_trigger_manager.go @@ -2,7 +2,11 @@ package datacoord -import mock "github.com/stretchr/testify/mock" +import ( + context "context" + + mock "github.com/stretchr/testify/mock" +) // MockTriggerManager is an autogenerated mock type for the TriggerManager type type MockTriggerManager struct { @@ -17,37 +21,120 @@ func (_m *MockTriggerManager) EXPECT() *MockTriggerManager_Expecter { return &MockTriggerManager_Expecter{mock: &_m.Mock} } -// Notify provides a mock function with given fields: _a0, _a1, _a2 -func (_m *MockTriggerManager) Notify(_a0 int64, _a1 CompactionTriggerType, _a2 []CompactionView) { - _m.Called(_a0, _a1, _a2) +// ManualTrigger provides a mock function with given fields: ctx, collectionID, clusteringCompaction +func (_m *MockTriggerManager) ManualTrigger(ctx context.Context, collectionID int64, clusteringCompaction bool) (int64, error) { + ret := _m.Called(ctx, collectionID, clusteringCompaction) + + var r0 int64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, int64, bool) (int64, error)); ok { + return rf(ctx, collectionID, clusteringCompaction) + } + if rf, ok := ret.Get(0).(func(context.Context, int64, bool) int64); ok { + r0 = rf(ctx, collectionID, clusteringCompaction) + } else { + r0 = ret.Get(0).(int64) + } + + if rf, ok := ret.Get(1).(func(context.Context, int64, bool) error); ok { + r1 = rf(ctx, collectionID, clusteringCompaction) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockTriggerManager_ManualTrigger_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ManualTrigger' +type MockTriggerManager_ManualTrigger_Call struct { + *mock.Call +} + +// ManualTrigger is a helper method to define mock.On call +// - ctx context.Context +// - collectionID int64 +// - clusteringCompaction bool +func (_e *MockTriggerManager_Expecter) ManualTrigger(ctx interface{}, collectionID interface{}, clusteringCompaction interface{}) *MockTriggerManager_ManualTrigger_Call { + return &MockTriggerManager_ManualTrigger_Call{Call: _e.mock.On("ManualTrigger", ctx, collectionID, clusteringCompaction)} +} + +func (_c *MockTriggerManager_ManualTrigger_Call) Run(run func(ctx context.Context, collectionID int64, clusteringCompaction bool)) *MockTriggerManager_ManualTrigger_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64), args[2].(bool)) + }) + return _c +} + +func (_c *MockTriggerManager_ManualTrigger_Call) Return(_a0 int64, _a1 error) *MockTriggerManager_ManualTrigger_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockTriggerManager_ManualTrigger_Call) RunAndReturn(run func(context.Context, int64, bool) (int64, error)) *MockTriggerManager_ManualTrigger_Call { + _c.Call.Return(run) + return _c +} + +// Start provides a mock function with given fields: +func (_m *MockTriggerManager) Start() { + _m.Called() +} + +// MockTriggerManager_Start_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Start' +type MockTriggerManager_Start_Call struct { + *mock.Call +} + +// Start is a helper method to define mock.On call +func (_e *MockTriggerManager_Expecter) Start() *MockTriggerManager_Start_Call { + return &MockTriggerManager_Start_Call{Call: _e.mock.On("Start")} +} + +func (_c *MockTriggerManager_Start_Call) Run(run func()) *MockTriggerManager_Start_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockTriggerManager_Start_Call) Return() *MockTriggerManager_Start_Call { + _c.Call.Return() + return _c +} + +func (_c *MockTriggerManager_Start_Call) RunAndReturn(run func()) *MockTriggerManager_Start_Call { + _c.Call.Return(run) + return _c +} + +// Stop provides a mock function with given fields: +func (_m *MockTriggerManager) Stop() { + _m.Called() } -// MockTriggerManager_Notify_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Notify' -type MockTriggerManager_Notify_Call struct { +// MockTriggerManager_Stop_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Stop' +type MockTriggerManager_Stop_Call struct { *mock.Call } -// Notify is a helper method to define mock.On call -// - _a0 int64 -// - _a1 CompactionTriggerType -// - _a2 []CompactionView -func (_e *MockTriggerManager_Expecter) Notify(_a0 interface{}, _a1 interface{}, _a2 interface{}) *MockTriggerManager_Notify_Call { - return &MockTriggerManager_Notify_Call{Call: _e.mock.On("Notify", _a0, _a1, _a2)} +// Stop is a helper method to define mock.On call +func (_e *MockTriggerManager_Expecter) Stop() *MockTriggerManager_Stop_Call { + return &MockTriggerManager_Stop_Call{Call: _e.mock.On("Stop")} } -func (_c *MockTriggerManager_Notify_Call) Run(run func(_a0 int64, _a1 CompactionTriggerType, _a2 []CompactionView)) *MockTriggerManager_Notify_Call { +func (_c *MockTriggerManager_Stop_Call) Run(run func()) *MockTriggerManager_Stop_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(int64), args[1].(CompactionTriggerType), args[2].([]CompactionView)) + run() }) return _c } -func (_c *MockTriggerManager_Notify_Call) Return() *MockTriggerManager_Notify_Call { +func (_c *MockTriggerManager_Stop_Call) Return() *MockTriggerManager_Stop_Call { _c.Call.Return() return _c } -func (_c *MockTriggerManager_Notify_Call) RunAndReturn(run func(int64, CompactionTriggerType, []CompactionView)) *MockTriggerManager_Notify_Call { +func (_c *MockTriggerManager_Stop_Call) RunAndReturn(run func()) *MockTriggerManager_Stop_Call { _c.Call.Return(run) return _c } diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index b66c4fba75a81..46bdf8c1e3618 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -882,12 +882,12 @@ message CompactionTask { uint64 end_time = 7; int32 timeout_in_seconds = 8; CompactionType type = 9; - uint64 timetravel = 10; - int64 collection_ttl = 11; - int64 total_rows = 12; - int64 collectionID = 13; - int64 partitionID = 14; - string channel = 15; + int64 collection_ttl = 10; + int64 total_rows = 11; + int64 collectionID = 12; + int64 partitionID = 13; + string channel = 14; + schema.CollectionSchema schema = 15; schema.FieldSchema clustering_key_field = 16; int64 max_segment_rows = 17; int64 prefer_segment_rows = 18;