diff --git a/internal/datanode/clustering_compactor.go b/internal/datanode/clustering_compactor.go index da0625fff6210..13da49f4c59cb 100644 --- a/internal/datanode/clustering_compactor.go +++ b/internal/datanode/clustering_compactor.go @@ -19,7 +19,6 @@ package datanode import ( "context" "fmt" - iter "github.com/milvus-io/milvus/internal/datanode/iterators" "math" "path" "sort" @@ -39,6 +38,7 @@ import ( "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/datanode/compaction" "github.com/milvus-io/milvus/internal/datanode/io" + iter "github.com/milvus-io/milvus/internal/datanode/iterators" "github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/datanode/syncmgr" "github.com/milvus-io/milvus/internal/metastore/kv/binlog" @@ -63,10 +63,6 @@ import ( var _ compaction.Compactor = (*clusteringCompactionTask)(nil) -var ( - errIllegalCompactionPlan = errors.New("compaction plan illegal") -) - type clusteringCompactionTask struct { io io.BinlogIO allocator allocator.Allocator @@ -106,20 +102,14 @@ type clusteringCompactionTask struct { } type ClusterBuffer struct { - id int - bufferSize atomic.Int64 - buffer *InsertData - writer *compaction.SegmentWriter + id int - bufferRowNum atomic.Int64 - bufferTimeStampFrom int64 - bufferTimeStampTo int64 + writer *compaction.SegmentWriter + bufferSize atomic.Int64 + bufferRowNum atomic.Int64 - currentSegmentID int64 - currentSpillSize int64 - currentSpillRowNum int64 - currentSpillBinlogs map[UniqueID]*datapb.FieldBinlog - currentPKStats *storage.PrimaryKeyStats + flushedRowNum int64 + flushedBinlogs map[UniqueID]*datapb.FieldBinlog uploadedSegments []*datapb.CompactionSegment uploadedSegmentStats map[UniqueID]storage.SegmentStats @@ -211,7 +201,7 @@ func (t *clusteringCompactionTask) Compact() (*datapb.CompactionPlanResult, erro if t.plan.GetType() != datapb.CompactionType_ClusteringCompaction { // this shouldn't be reached log.Warn("compact wrong, illegal compaction type") - return nil, errIllegalCompactionPlan + return nil, merr.WrapErrIllegalCompactionPlan() } log.Info("Clustering compaction", zap.Duration("wait in queue elapse", t.tr.RecordSpan())) if !funcutil.CheckCtxValid(ctx) { @@ -306,10 +296,6 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e for _, key := range bucket { fieldStats.UpdateMinMax(storage.NewScalarFieldValue(t.clusteringKeyField.DataType, key)) } - //writeBuffer, err := storage.NewInsertData(t.collectionMeta.GetSchema()) - //if err != nil { - // return err - //} buffer := &ClusterBuffer{ id: id, uploadedSegments: make([]*datapb.CompactionSegment, 0), @@ -439,7 +425,7 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context, return resultSegments, resultPartitionStats, nil } -// read insert log of one segment, mappingSegment it into buckets according to partitionKey. Spill data to file when necessary +// read insert log of one segment, mappingSegment into buckets according to clusteringKey. flush data to file when necessary func (t *clusteringCompactionTask) mappingSegment( ctx context.Context, segment *datapb.CompactionSegmentBinlogs, @@ -495,7 +481,7 @@ func (t *clusteringCompactionTask) mappingSegment( // Unable to deal with all empty segments cases, so return error if binlogNum == 0 { log.Warn("compact wrong, all segments' binlogs are empty") - return errIllegalCompactionPlan + return merr.WrapErrIllegalCompactionPlan() } for idx := 0; idx < binlogNum; idx++ { var ps []string @@ -508,8 +494,10 @@ func (t *clusteringCompactionTask) mappingSegment( for _, path := range fieldBinlogPaths { bytesArr, err := t.io.Download(ctx, path) blobs := make([]*Blob, len(bytesArr)) + var segmentSize int64 for i := range bytesArr { blobs[i] = &Blob{Value: bytesArr[i]} + segmentSize = segmentSize + int64(len(bytesArr[i])) } if err != nil { log.Warn("download insertlogs wrong", zap.Strings("path", path), zap.Error(err)) @@ -522,6 +510,7 @@ func (t *clusteringCompactionTask) mappingSegment( return err } + averageRowSize := pkIter.DataSize() / pkIter.RowNum() var offset int64 = -1 for pkIter.HasNext() { vInter, _ := pkIter.Next() @@ -557,7 +546,7 @@ func (t *clusteringCompactionTask) mappingSegment( } else { clusterBuffer = t.keyToBufferFunc(clusteringKey) } - err = t.writeToBuffer(ctx, clusterBuffer, v) + err = t.writeToBuffer(ctx, clusterBuffer, v, averageRowSize) if err != nil { return err } @@ -606,15 +595,7 @@ func (t *clusteringCompactionTask) mappingSegment( return nil } -func (t *clusteringCompactionTask) writeToBuffer(ctx context.Context, clusterBuffer *ClusterBuffer, value *storage.Value) error { - pk := value.PK - timestamp := value.Timestamp - row, ok := value.Value.(map[UniqueID]interface{}) - if !ok { - log.Warn("transfer interface to map wrong") - return errors.New("unexpected error") - } - +func (t *clusteringCompactionTask) writeToBuffer(ctx context.Context, clusterBuffer *ClusterBuffer, value *storage.Value, rowSize int) error { t.clusterBufferLocks.Lock(clusterBuffer.id) defer t.clusterBufferLocks.Unlock(clusterBuffer.id) // prepare @@ -624,40 +605,10 @@ func (t *clusteringCompactionTask) writeToBuffer(ctx context.Context, clusterBuf return err } } - //if clusterBuffer.currentSegmentID == 0 { - // segmentID, err := t.allocator.AllocOne() - // if err != nil { - // return err - // } - // clusterBuffer.currentSegmentID = segmentID - // clusterBuffer.currentSpillBinlogs = make(map[UniqueID]*datapb.FieldBinlog, 0) - // clusterBuffer.bufferTimeStampFrom = -1 - // clusterBuffer.bufferTimeStampTo = -1 - //} - if clusterBuffer.currentPKStats == nil { - stats, err := storage.NewPrimaryKeyStats(t.primaryKeyField.FieldID, int64(t.primaryKeyField.DataType), 1) - if err != nil { - return err - } - clusterBuffer.currentPKStats = stats - } - - // Update timestampFrom, timestampTo - if timestamp < clusterBuffer.bufferTimeStampFrom || clusterBuffer.bufferTimeStampFrom == -1 { - clusterBuffer.bufferTimeStampFrom = timestamp - } - if timestamp > clusterBuffer.bufferTimeStampTo || clusterBuffer.bufferTimeStampFrom == -1 { - clusterBuffer.bufferTimeStampTo = timestamp - } - if err := clusterBuffer.buffer.Append(row); err != nil { - return err - } - rowSize := clusterBuffer.buffer.GetRowSize(clusterBuffer.buffer.GetRowNum() - 1) clusterBuffer.bufferSize.Add(int64(rowSize)) clusterBuffer.bufferRowNum.Add(1) - clusterBuffer.currentPKStats.Update(pk) t.totalBufferSize.Add(int64(rowSize)) - return nil + return clusterBuffer.writer.Write(value) } func (t *clusteringCompactionTask) getWorkerPoolSize() int { @@ -745,7 +696,7 @@ func (t *clusteringCompactionTask) spillAll(ctx context.Context) error { log.Error("spill fail") return err } - err = t.packBuffersToSegments(ctx, buffer) + err = t.packBufferToSegment(ctx, buffer) return err }() if err != nil { @@ -756,15 +707,15 @@ func (t *clusteringCompactionTask) spillAll(ctx context.Context) error { return nil } -func (t *clusteringCompactionTask) packBuffersToSegments(ctx context.Context, buffer *ClusterBuffer) error { - if len(buffer.currentSpillBinlogs) == 0 { +func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buffer *ClusterBuffer) error { + if len(buffer.flushedBinlogs) == 0 { return nil } insertLogs := make([]*datapb.FieldBinlog, 0) - for _, fieldBinlog := range buffer.currentSpillBinlogs { + for _, fieldBinlog := range buffer.flushedBinlogs { insertLogs = append(insertLogs, fieldBinlog) } - statPaths, err := t.statSerializeWrite(ctx, buffer.writer, buffer.currentSpillRowNum) + statPaths, err := t.statSerializeWrite(ctx, buffer.writer, buffer.flushedRowNum) if err != nil { return err } @@ -772,8 +723,8 @@ func (t *clusteringCompactionTask) packBuffersToSegments(ctx context.Context, bu // pack current spill data into a segment seg := &datapb.CompactionSegment{ PlanID: t.plan.GetPlanID(), - SegmentID: buffer.currentSegmentID, - NumOfRows: buffer.currentSpillRowNum, + SegmentID: buffer.writer.GetSegmentID(), + NumOfRows: buffer.flushedRowNum, InsertLogs: insertLogs, Field2StatslogPaths: []*datapb.FieldBinlog{statPaths}, Channel: t.plan.GetChannel(), @@ -788,20 +739,14 @@ func (t *clusteringCompactionTask) packBuffersToSegments(ctx context.Context, bu BF: buffer.clusteringKeyFieldStats.BF, Centroids: buffer.clusteringKeyFieldStats.Centroids, }}, - NumRows: int(buffer.currentSpillRowNum), + NumRows: int(buffer.flushedRowNum), } - buffer.uploadedSegmentStats[buffer.currentSegmentID] = segmentStats + buffer.uploadedSegmentStats[buffer.writer.GetSegmentID()] = segmentStats // refresh - buffer.currentSpillRowNum = 0 - buffer.currentSpillSize = 0 - buffer.currentPKStats = nil - segmentID, err := t.allocator.AllocOne() - if err != nil { - return err - } - buffer.currentSegmentID = segmentID - buffer.currentSpillBinlogs = make(map[UniqueID]*datapb.FieldBinlog, 0) - log.Debug("finish pack segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", buffer.currentSegmentID), zap.String("seg", seg.String()), zap.Any("segStats", segmentStats)) + t.refreshBufferWriter(buffer) + buffer.flushedRowNum = 0 + buffer.flushedBinlogs = make(map[UniqueID]*datapb.FieldBinlog, 0) + log.Debug("finish pack segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", buffer.writer.GetSegmentID()), zap.String("seg", seg.String()), zap.Any("segStats", segmentStats)) return nil } @@ -820,32 +765,25 @@ func (t *clusteringCompactionTask) spill(ctx context.Context, buffer *ClusterBuf } for fID, path := range partialBinlogs { - for _, binlog := range path.GetBinlogs() { - binlog.TimestampTo = uint64(buffer.bufferTimeStampTo) - binlog.TimestampFrom = uint64(buffer.bufferTimeStampFrom) - } - tmpBinlog, ok := buffer.currentSpillBinlogs[fID] + tmpBinlog, ok := buffer.flushedBinlogs[fID] if !ok { tmpBinlog = path } else { tmpBinlog.Binlogs = append(tmpBinlog.Binlogs, path.GetBinlogs()...) } - buffer.currentSpillBinlogs[fID] = tmpBinlog + buffer.flushedBinlogs[fID] = tmpBinlog } - buffer.currentSpillRowNum = buffer.currentSpillRowNum + buffer.bufferRowNum.Load() + buffer.flushedRowNum = buffer.flushedRowNum + buffer.bufferRowNum.Load() // clean buffer t.totalBufferSize.Add(-buffer.bufferSize.Load()) - err = t.refreshBufferWriter(buffer) if err != nil { return err } - buffer.bufferSize.Store(0) - buffer.bufferRowNum.Store(0) - if buffer.currentSpillRowNum > t.plan.GetMaxSegmentRows() { - if err := t.packBuffersToSegments(ctx, buffer); err != nil { + if buffer.flushedRowNum > t.plan.GetMaxSegmentRows() { + if err := t.packBufferToSegment(ctx, buffer); err != nil { return err } } @@ -971,7 +909,7 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment( // Unable to deal with all empty segments cases, so return error if binlogNum == 0 { log.Warn("compact wrong, all segments' binlogs are empty") - return nil, errIllegalCompactionPlan + return nil, merr.WrapErrIllegalCompactionPlan() } log.Debug("binlogNum", zap.Int("binlogNum", binlogNum)) for idx := 0; idx < binlogNum; idx++ { @@ -1094,8 +1032,10 @@ func (t *clusteringCompactionTask) refreshBufferWriter(buffer *ClusterBuffer) er if err != nil { return err } - writer, err := compaction.NewSegmentWriter(t.plan.GetSchema(), 0, segmentID, t.partitionID, t.collectionID) + writer, err := compaction.NewSegmentWriter(t.plan.GetSchema(), t.plan.MaxSegmentRows, segmentID, t.partitionID, t.collectionID) buffer.writer = writer + buffer.bufferSize.Store(0) + buffer.bufferRowNum.Store(0) return nil } diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index 9db2fbc77ca45..1fe9af2e25c42 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -173,14 +173,13 @@ var ( // Compaction ErrCompactionReadDeltaLogErr = newMilvusError("fail to read delta log", 2300, false) - ErrClusteringCompactionClusterNotSupport = newMilvusError("milvus cluster not support clustering compaction", 2301, false) - ErrClusteringCompactionCollectionNotSupport = newMilvusError("collection not support clustering compaction", 2302, false) - ErrClusteringCompactionCollectionIsCompacting = newMilvusError("collection is compacting", 2303, false) - ErrClusteringCompactionNotSupportVector = newMilvusError("vector field clustering compaction is not supported", 2304, false) - ErrClusteringCompactionSubmitTaskFail = newMilvusError("fail to submit task", 2305, true) - ErrClusteringCompactionCompactionTaskLost = newMilvusError("compaction task lost", 2306, true) - ErrClusteringCompactionGetCollectionFail = newMilvusError("fail to get collection in compaction", 2307, true) - ErrClusteringCompactionMetaError = newMilvusError("fail to update meta in clustering compaction", 2308, true) + ErrIllegalCompactionPlan = newMilvusError("compaction plan illegal", 2301, false) + ErrClusteringCompactionClusterNotSupport = newMilvusError("milvus cluster not support clustering compaction", 2302, false) + ErrClusteringCompactionCollectionNotSupport = newMilvusError("collection not support clustering compaction", 2303, false) + ErrClusteringCompactionCollectionIsCompacting = newMilvusError("collection is compacting", 2304, false) + ErrClusteringCompactionNotSupportVector = newMilvusError("vector field clustering compaction is not supported", 2305, false) + ErrClusteringCompactionSubmitTaskFail = newMilvusError("fail to submit task", 2306, true) + ErrClusteringCompactionMetaError = newMilvusError("fail to update meta in clustering compaction", 2307, true) // General ErrOperationNotSupported = newMilvusError("unsupported operation", 3000, false) diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index fd97738f7d6de..0db310888b0fb 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -1048,6 +1048,14 @@ func WrapErrCompactionReadDeltaLogErr(msg ...string) error { return err } +func WrapErrIllegalCompactionPlan(msg ...string) error { + err := error(ErrIllegalCompactionPlan) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "->")) + } + return err +} + func WrapErrClusteringCompactionClusterNotSupport(msg ...string) error { err := error(ErrClusteringCompactionClusterNotSupport) if len(msg) > 0 { @@ -1079,14 +1087,6 @@ func WrapErrClusteringCompactionSubmitTaskFail(taskType string, err error) error return wrapFieldsWithDesc(ErrClusteringCompactionSubmitTaskFail, err.Error(), value("taskType", taskType)) } -func WrapErrClusteringCompactionCompactionTaskLost(planID int64) error { - return wrapFields(ErrClusteringCompactionCompactionTaskLost, value("planID", planID)) -} - -func WrapErrClusteringCompactionGetCollectionFail(collectionID int64, err error) error { - return wrapFieldsWithDesc(ErrClusteringCompactionGetCollectionFail, err.Error(), value("collectionID", collectionID)) -} - func WrapErrClusteringCompactionMetaError(operation string, err error) error { return wrapFieldsWithDesc(ErrClusteringCompactionMetaError, err.Error(), value("operation", operation)) }