Skip to content

Commit

Permalink
fix collection schema
Browse files Browse the repository at this point in the history
  • Loading branch information
wayblink committed May 27, 2024
1 parent 2fe7139 commit 1849b19
Show file tree
Hide file tree
Showing 15 changed files with 542 additions and 521 deletions.
1 change: 1 addition & 0 deletions internal/datacoord/clustering_compaction_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ func (task *clusteringCompactionTask) BuildCompactionRequest(handler *compaction
Channel: task.GetChannel(),
CollectionTtl: task.GetCollectionTtl(),
TotalRows: task.GetTotalRows(),
Schema: task.GetSchema(),
ClusteringKeyField: task.GetClusteringKeyField().GetFieldID(),
MaxSegmentRows: task.GetMaxSegmentRows(),
PreferSegmentRows: task.GetPreferSegmentRows(),
Expand Down
319 changes: 1 addition & 318 deletions internal/datacoord/compaction_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,9 @@
package datacoord

import (
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"go.opentelemetry.io/otel/trace"
)

type CompactionTask interface {
Expand Down Expand Up @@ -113,312 +105,3 @@ func cleanLogPath() compactionTaskOpt {
task.CleanLogPath()
}
}

var _ CompactionTask = (*defaultCompactionTask)(nil)

type defaultCompactionTask struct {
*datapb.CompactionTask
// deprecated
triggerInfo *compactionSignal
plan *datapb.CompactionPlan
dataNodeID int64
result *datapb.CompactionPlanResult
span trace.Span
}

func (task *defaultCompactionTask) ProcessTask(handler *compactionPlanHandler) error {
switch task.GetState() {
case datapb.CompactionTaskState_failed:
return nil
case datapb.CompactionTaskState_completed:
return nil
case datapb.CompactionTaskState_pipelining:
return task.processPipeliningTask(handler)
case datapb.CompactionTaskState_executing:
return task.processExecutingTask(handler)
case datapb.CompactionTaskState_timeout:
return task.processTimeoutTask(handler)
default:
return errors.New("not supported state")
}
return nil
}

func (task *defaultCompactionTask) processPipeliningTask(handler *compactionPlanHandler) error {
handler.scheduler.Submit(task)
handler.plans[task.GetPlanID()] = task.ShadowClone(setState(datapb.CompactionTaskState_executing))
log.Info("Compaction plan submited")
return nil
}

func (task *defaultCompactionTask) processExecutingTask(handler *compactionPlanHandler) error {
nodePlan, exist := handler.compactionResults[task.GetPlanID()]
if !exist {
// compaction task in DC but not found in DN means the compaction plan has failed
log.Info("compaction failed")
handler.plans[task.GetPlanID()] = task.ShadowClone(setState(datapb.CompactionTaskState_failed), endSpan())
handler.setSegmentsCompacting(task, false)
handler.scheduler.Finish(task.GetPlanID(), task)
return nil
}
planResult := nodePlan.B
switch planResult.GetState() {
case commonpb.CompactionState_Completed:
// channels are balanced to other nodes, yet the old datanode still have the compaction results
// task.dataNodeID == planState.A, but
// task.dataNodeID not match with channel
// Mark this compaction as failure and skip processing the meta
if !handler.chManager.Match(task.GetNodeID(), task.GetChannel()) {
// Sync segments without CompactionFrom segmentsIDs to make sure DN clear the task
// without changing the meta
log.Warn("compaction failed for channel nodeID not match")
err := handler.sessions.SyncSegments(task.GetNodeID(), &datapb.SyncSegmentsRequest{PlanID: task.GetPlanID()})
if err != nil {
log.Warn("compaction failed to sync segments with node", zap.Error(err))
return err
}
handler.setSegmentsCompacting(task, false)
handler.plans[task.GetPlanID()] = task.ShadowClone(setState(datapb.CompactionTaskState_failed), cleanLogPath(), endSpan())
handler.scheduler.Finish(task.GetNodeID(), task)
return nil
}
switch task.GetType() {
case datapb.CompactionType_MergeCompaction, datapb.CompactionType_MixCompaction:
if err := handler.handleMergeCompactionResult(task.GetPlan(), planResult); err != nil {
return err
}
case datapb.CompactionType_Level0DeleteCompaction:
if err := handler.handleL0CompactionResult(task.GetPlan(), planResult); err != nil {
return err
}
}
UpdateCompactionSegmentSizeMetrics(planResult.GetSegments())
handler.plans[task.GetPlanID()] = task.ShadowClone(setState(datapb.CompactionTaskState_completed), setResult(planResult), cleanLogPath(), endSpan())
handler.scheduler.Finish(task.GetNodeID(), task)
case commonpb.CompactionState_Executing:
ts := tsoutil.GetCurrentTime()
if isTimeout(ts, task.GetStartTime(), task.GetTimeoutInSeconds()) {
log.Warn("compaction timeout",
zap.Int32("timeout in seconds", task.GetTimeoutInSeconds()),
zap.Uint64("startTime", task.GetStartTime()),
zap.Uint64("now", ts),
)
handler.plans[task.GetPlanID()] = task.ShadowClone(setState(datapb.CompactionTaskState_timeout), endSpan())
}
}
return nil
}

func (task *defaultCompactionTask) processTimeoutTask(handler *compactionPlanHandler) error {
log := log.With(
zap.Int64("planID", task.GetPlanID()),
zap.Int64("nodeID", task.GetNodeID()),
zap.String("channel", task.GetChannel()),
)
planID := task.GetPlanID()
if nodePlan, ok := handler.compactionResults[task.GetPlanID()]; ok {
if nodePlan.B.GetState() == commonpb.CompactionState_Executing {
log.RatedInfo(1, "compaction timeout in DataCoord yet DataNode is still running")
}
} else {
// compaction task in DC but not found in DN means the compaction plan has failed
log.Info("compaction failed for timeout")
handler.plans[planID] = task.ShadowClone(setState(datapb.CompactionTaskState_failed), endSpan())
handler.setSegmentsCompacting(task, false)
handler.scheduler.Finish(task.GetNodeID(), task)
}
return nil
}

func (task *defaultCompactionTask) GetCollectionID() int64 {
return task.CompactionTask.GetCollectionID()
}

func (task *defaultCompactionTask) GetPartitionID() int64 {
return task.CompactionTask.GetPartitionID()
}

func (task *defaultCompactionTask) GetTriggerID() int64 {
return task.CompactionTask.GetTriggerID()
}

func (task *defaultCompactionTask) GetSpan() trace.Span {
return task.span
}

func (task *defaultCompactionTask) GetType() datapb.CompactionType {
return task.CompactionTask.GetType()
}

func (task *defaultCompactionTask) GetChannel() string {
return task.CompactionTask.GetChannel()
}

func (task *defaultCompactionTask) GetPlanID() int64 {
return task.CompactionTask.GetPlanID()
}

func (task *defaultCompactionTask) GetResult() *datapb.CompactionPlanResult {
return task.result
}

func (task *defaultCompactionTask) GetNodeID() int64 {
return task.dataNodeID
}

func (task *defaultCompactionTask) GetState() datapb.CompactionTaskState {
return task.CompactionTask.GetState()
}

func (task *defaultCompactionTask) GetPlan() *datapb.CompactionPlan {
return task.plan
}

func (task *defaultCompactionTask) ShadowClone(opts ...compactionTaskOpt) CompactionTask {
taskClone := &defaultCompactionTask{
CompactionTask: task.CompactionTask,
plan: task.plan,
dataNodeID: task.dataNodeID,
span: task.span,
result: task.result,
}
for _, opt := range opts {
opt(taskClone)
}
return taskClone
}

func (task *defaultCompactionTask) EndSpan() {
if task.span != nil {
task.span.End()
}
}

func (task *defaultCompactionTask) SetNodeID(nodeID int64) {
task.dataNodeID = nodeID
}

func (task *defaultCompactionTask) SetState(state datapb.CompactionTaskState) {
task.State = state
}

func (task *defaultCompactionTask) SetTask(ct *datapb.CompactionTask) {
task.CompactionTask = ct
}

func (task *defaultCompactionTask) SetStartTime(startTime uint64) {
task.StartTime = startTime
}

func (task *defaultCompactionTask) SetResult(result *datapb.CompactionPlanResult) {
task.result = result
}

func (task *defaultCompactionTask) SetSpan(span trace.Span) {
task.span = span
}

func (task *defaultCompactionTask) SetPlan(plan *datapb.CompactionPlan) {
task.plan = plan
}

func (task *defaultCompactionTask) CleanLogPath() {
if task.plan.GetSegmentBinlogs() != nil {
for _, binlogs := range task.plan.GetSegmentBinlogs() {
binlogs.FieldBinlogs = nil
binlogs.Field2StatslogPaths = nil
binlogs.Deltalogs = nil
}
}
if task.result.GetSegments() != nil {
for _, segment := range task.result.GetSegments() {
segment.InsertLogs = nil
segment.Deltalogs = nil
segment.Field2StatslogPaths = nil
}
}
}

func (task *defaultCompactionTask) BuildCompactionRequest(handler *compactionPlanHandler) (*datapb.CompactionPlan, error) {
plan := &datapb.CompactionPlan{
PlanID: task.GetPlanID(),
StartTime: task.GetStartTime(),
TimeoutInSeconds: task.GetTimeoutInSeconds(),
Type: task.GetType(),
Channel: task.GetChannel(),
CollectionTtl: task.GetCollectionTtl(),
TotalRows: task.GetTotalRows(),
}
log := log.With(zap.Int64("taskID", task.GetTriggerID()), zap.Int64("planID", plan.GetPlanID()))
if task.GetType() == datapb.CompactionType_Level0DeleteCompaction {
for _, segID := range task.GetInputSegments() {
segInfo := handler.meta.GetHealthySegment(segID)
if segInfo == nil {
return nil, merr.WrapErrSegmentNotFound(segID)
}
plan.SegmentBinlogs = append(plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{
SegmentID: segID,
CollectionID: segInfo.GetCollectionID(),
PartitionID: segInfo.GetPartitionID(),
Level: segInfo.GetLevel(),
InsertChannel: segInfo.GetInsertChannel(),
Deltalogs: segInfo.GetDeltalogs(),
})
}

// Select sealed L1 segments for LevelZero compaction that meets the condition:
// dmlPos < triggerInfo.pos
sealedSegments := handler.meta.SelectSegments(WithCollection(task.GetCollectionID()), SegmentFilterFunc(func(info *SegmentInfo) bool {
return (task.GetPartitionID() == -1 || info.GetPartitionID() == task.GetPartitionID()) &&
info.GetInsertChannel() == plan.GetChannel() &&
isFlushState(info.GetState()) &&
!info.isCompacting &&
!info.GetIsImporting() &&
info.GetLevel() != datapb.SegmentLevel_L0 &&
info.GetDmlPosition().GetTimestamp() < task.GetPos().GetTimestamp()
}))
if len(sealedSegments) == 0 {
return nil, errors.Errorf("Selected zero L1/L2 segments for the position=%v", task.GetPos())
}

sealedSegBinlogs := lo.Map(sealedSegments, func(segInfo *SegmentInfo, _ int) *datapb.CompactionSegmentBinlogs {
return &datapb.CompactionSegmentBinlogs{
SegmentID: segInfo.GetID(),
Level: segInfo.GetLevel(),
CollectionID: segInfo.GetCollectionID(),
PartitionID: segInfo.GetPartitionID(),
// no need for binlogs info, as L0Compaction only append deltalogs on existing segments
}
})

plan.SegmentBinlogs = append(plan.SegmentBinlogs, sealedSegBinlogs...)
log.Info("Compaction handler refreshed level zero compaction plan",
zap.Any("target position", task.GetPos()),
zap.Any("target segments count", len(sealedSegBinlogs)))
return nil, nil
}

if task.GetType() == datapb.CompactionType_MixCompaction {
segIDMap := make(map[int64][]*datapb.FieldBinlog, len(plan.SegmentBinlogs))
for _, segID := range task.GetInputSegments() {
segInfo := handler.meta.GetHealthySegment(segID)
if segInfo == nil {
return nil, merr.WrapErrSegmentNotFound(segID)
}
plan.SegmentBinlogs = append(plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{
SegmentID: segID,
CollectionID: segInfo.GetCollectionID(),
PartitionID: segInfo.GetPartitionID(),
Level: segInfo.GetLevel(),
InsertChannel: segInfo.GetInsertChannel(),
FieldBinlogs: segInfo.GetBinlogs(),
Field2StatslogPaths: segInfo.GetStatslogs(),
Deltalogs: segInfo.GetDeltalogs(),
})
segIDMap[segID] = segInfo.GetDeltalogs()
}
log.Info("Compaction handler refreshed mix compaction plan", zap.Any("segID2DeltaLogs", segIDMap))
}
handler.plans[task.GetPlanID()] = task.ShadowClone(setPlan(plan))
return plan, nil
}
2 changes: 2 additions & 0 deletions internal/datacoord/compaction_task_l0.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,9 @@ func (task *l0CompactionTask) BuildCompactionRequest(handler *compactionPlanHand
Channel: task.GetChannel(),
CollectionTtl: task.GetCollectionTtl(),
TotalRows: task.GetTotalRows(),
Schema: task.GetSchema(),
}

log := log.With(zap.Int64("taskID", task.GetTriggerID()), zap.Int64("planID", plan.GetPlanID()))
for _, segID := range task.GetInputSegments() {
segInfo := handler.meta.GetHealthySegment(segID)
Expand Down
1 change: 1 addition & 0 deletions internal/datacoord/compaction_task_mix.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ func (task *mixCompactionTask) BuildCompactionRequest(handler *compactionPlanHan
Channel: task.GetChannel(),
CollectionTtl: task.GetCollectionTtl(),
TotalRows: task.GetTotalRows(),
Schema: task.GetSchema(),
}
log := log.With(zap.Int64("taskID", task.GetTriggerID()), zap.Int64("planID", plan.GetPlanID()))

Expand Down
7 changes: 4 additions & 3 deletions internal/datacoord/compaction_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
start := time.Now()
planID := currentID
currentID++
task := &defaultCompactionTask{
task := &mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
PlanID: planID,
TriggerID: signal.id,
Expand All @@ -460,6 +460,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
Channel: group.channelName,
InputSegments: segIDs,
TotalRows: totalRows,
Schema: coll.Schema,
},
}
err := t.compactionHandler.enqueueCompaction(task)
Expand Down Expand Up @@ -564,7 +565,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
start := time.Now()
planID := currentID
currentID++
if err := t.compactionHandler.enqueueCompaction(&defaultCompactionTask{
if err := t.compactionHandler.enqueueCompaction(&mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
PlanID: planID,
TriggerID: signal.id,
Expand All @@ -578,7 +579,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
Channel: channel,
InputSegments: segmentIDS,
TotalRows: totalRows,
// collectionSchema
Schema: coll.Schema,
},
}); err != nil {
log.Warn("failed to execute compaction task",
Expand Down
Loading

0 comments on commit 1849b19

Please sign in to comment.