Skip to content

Commit

Permalink
fix compact error
Browse files Browse the repository at this point in the history
  • Loading branch information
wayblink committed May 27, 2024
1 parent 230c3e1 commit 2fe7139
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 114 deletions.
136 changes: 38 additions & 98 deletions internal/datanode/clustering_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package datanode
import (
"context"
"fmt"
iter "github.com/milvus-io/milvus/internal/datanode/iterators"
"math"
"path"
"sort"
Expand All @@ -39,6 +38,7 @@ import (
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/datanode/compaction"
"github.com/milvus-io/milvus/internal/datanode/io"
iter "github.com/milvus-io/milvus/internal/datanode/iterators"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
Expand All @@ -63,10 +63,6 @@ import (

var _ compaction.Compactor = (*clusteringCompactionTask)(nil)

var (
errIllegalCompactionPlan = errors.New("compaction plan illegal")
)

type clusteringCompactionTask struct {
io io.BinlogIO
allocator allocator.Allocator
Expand Down Expand Up @@ -106,20 +102,14 @@ type clusteringCompactionTask struct {
}

type ClusterBuffer struct {
id int
bufferSize atomic.Int64
buffer *InsertData
writer *compaction.SegmentWriter
id int

bufferRowNum atomic.Int64
bufferTimeStampFrom int64
bufferTimeStampTo int64
writer *compaction.SegmentWriter
bufferSize atomic.Int64
bufferRowNum atomic.Int64

currentSegmentID int64
currentSpillSize int64
currentSpillRowNum int64
currentSpillBinlogs map[UniqueID]*datapb.FieldBinlog
currentPKStats *storage.PrimaryKeyStats
flushedRowNum int64
flushedBinlogs map[UniqueID]*datapb.FieldBinlog

uploadedSegments []*datapb.CompactionSegment
uploadedSegmentStats map[UniqueID]storage.SegmentStats
Expand Down Expand Up @@ -211,7 +201,7 @@ func (t *clusteringCompactionTask) Compact() (*datapb.CompactionPlanResult, erro
if t.plan.GetType() != datapb.CompactionType_ClusteringCompaction {
// this shouldn't be reached
log.Warn("compact wrong, illegal compaction type")
return nil, errIllegalCompactionPlan
return nil, merr.WrapErrIllegalCompactionPlan()
}
log.Info("Clustering compaction", zap.Duration("wait in queue elapse", t.tr.RecordSpan()))
if !funcutil.CheckCtxValid(ctx) {
Expand Down Expand Up @@ -306,10 +296,6 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e
for _, key := range bucket {
fieldStats.UpdateMinMax(storage.NewScalarFieldValue(t.clusteringKeyField.DataType, key))
}
//writeBuffer, err := storage.NewInsertData(t.collectionMeta.GetSchema())
//if err != nil {
// return err
//}
buffer := &ClusterBuffer{
id: id,
uploadedSegments: make([]*datapb.CompactionSegment, 0),
Expand Down Expand Up @@ -439,7 +425,7 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context,
return resultSegments, resultPartitionStats, nil
}

// read insert log of one segment, mappingSegment it into buckets according to partitionKey. Spill data to file when necessary
// read insert log of one segment, mappingSegment into buckets according to clusteringKey. flush data to file when necessary
func (t *clusteringCompactionTask) mappingSegment(
ctx context.Context,
segment *datapb.CompactionSegmentBinlogs,
Expand Down Expand Up @@ -495,7 +481,7 @@ func (t *clusteringCompactionTask) mappingSegment(
// Unable to deal with all empty segments cases, so return error
if binlogNum == 0 {
log.Warn("compact wrong, all segments' binlogs are empty")
return errIllegalCompactionPlan
return merr.WrapErrIllegalCompactionPlan()
}
for idx := 0; idx < binlogNum; idx++ {
var ps []string
Expand All @@ -508,8 +494,10 @@ func (t *clusteringCompactionTask) mappingSegment(
for _, path := range fieldBinlogPaths {
bytesArr, err := t.io.Download(ctx, path)
blobs := make([]*Blob, len(bytesArr))
var segmentSize int64
for i := range bytesArr {
blobs[i] = &Blob{Value: bytesArr[i]}
segmentSize = segmentSize + int64(len(bytesArr[i]))
}
if err != nil {
log.Warn("download insertlogs wrong", zap.Strings("path", path), zap.Error(err))
Expand All @@ -522,6 +510,7 @@ func (t *clusteringCompactionTask) mappingSegment(
return err
}

averageRowSize := pkIter.DataSize() / pkIter.RowNum()
var offset int64 = -1
for pkIter.HasNext() {
vInter, _ := pkIter.Next()
Expand Down Expand Up @@ -557,7 +546,7 @@ func (t *clusteringCompactionTask) mappingSegment(
} else {
clusterBuffer = t.keyToBufferFunc(clusteringKey)
}
err = t.writeToBuffer(ctx, clusterBuffer, v)
err = t.writeToBuffer(ctx, clusterBuffer, v, averageRowSize)
if err != nil {
return err
}
Expand Down Expand Up @@ -606,15 +595,7 @@ func (t *clusteringCompactionTask) mappingSegment(
return nil
}

func (t *clusteringCompactionTask) writeToBuffer(ctx context.Context, clusterBuffer *ClusterBuffer, value *storage.Value) error {
pk := value.PK
timestamp := value.Timestamp
row, ok := value.Value.(map[UniqueID]interface{})
if !ok {
log.Warn("transfer interface to map wrong")
return errors.New("unexpected error")
}

func (t *clusteringCompactionTask) writeToBuffer(ctx context.Context, clusterBuffer *ClusterBuffer, value *storage.Value, rowSize int) error {
t.clusterBufferLocks.Lock(clusterBuffer.id)
defer t.clusterBufferLocks.Unlock(clusterBuffer.id)
// prepare
Expand All @@ -624,40 +605,10 @@ func (t *clusteringCompactionTask) writeToBuffer(ctx context.Context, clusterBuf
return err
}
}
//if clusterBuffer.currentSegmentID == 0 {
// segmentID, err := t.allocator.AllocOne()
// if err != nil {
// return err
// }
// clusterBuffer.currentSegmentID = segmentID
// clusterBuffer.currentSpillBinlogs = make(map[UniqueID]*datapb.FieldBinlog, 0)
// clusterBuffer.bufferTimeStampFrom = -1
// clusterBuffer.bufferTimeStampTo = -1
//}
if clusterBuffer.currentPKStats == nil {
stats, err := storage.NewPrimaryKeyStats(t.primaryKeyField.FieldID, int64(t.primaryKeyField.DataType), 1)
if err != nil {
return err
}
clusterBuffer.currentPKStats = stats
}

// Update timestampFrom, timestampTo
if timestamp < clusterBuffer.bufferTimeStampFrom || clusterBuffer.bufferTimeStampFrom == -1 {
clusterBuffer.bufferTimeStampFrom = timestamp
}
if timestamp > clusterBuffer.bufferTimeStampTo || clusterBuffer.bufferTimeStampFrom == -1 {
clusterBuffer.bufferTimeStampTo = timestamp
}
if err := clusterBuffer.buffer.Append(row); err != nil {
return err
}
rowSize := clusterBuffer.buffer.GetRowSize(clusterBuffer.buffer.GetRowNum() - 1)
clusterBuffer.bufferSize.Add(int64(rowSize))
clusterBuffer.bufferRowNum.Add(1)
clusterBuffer.currentPKStats.Update(pk)
t.totalBufferSize.Add(int64(rowSize))
return nil
return clusterBuffer.writer.Write(value)
}

func (t *clusteringCompactionTask) getWorkerPoolSize() int {
Expand Down Expand Up @@ -745,7 +696,7 @@ func (t *clusteringCompactionTask) spillAll(ctx context.Context) error {
log.Error("spill fail")
return err
}
err = t.packBuffersToSegments(ctx, buffer)
err = t.packBufferToSegment(ctx, buffer)
return err
}()
if err != nil {
Expand All @@ -756,24 +707,24 @@ func (t *clusteringCompactionTask) spillAll(ctx context.Context) error {
return nil
}

func (t *clusteringCompactionTask) packBuffersToSegments(ctx context.Context, buffer *ClusterBuffer) error {
if len(buffer.currentSpillBinlogs) == 0 {
func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buffer *ClusterBuffer) error {
if len(buffer.flushedBinlogs) == 0 {
return nil
}
insertLogs := make([]*datapb.FieldBinlog, 0)
for _, fieldBinlog := range buffer.currentSpillBinlogs {
for _, fieldBinlog := range buffer.flushedBinlogs {
insertLogs = append(insertLogs, fieldBinlog)
}
statPaths, err := t.statSerializeWrite(ctx, buffer.writer, buffer.currentSpillRowNum)
statPaths, err := t.statSerializeWrite(ctx, buffer.writer, buffer.flushedRowNum)
if err != nil {
return err
}

// pack current spill data into a segment
seg := &datapb.CompactionSegment{
PlanID: t.plan.GetPlanID(),
SegmentID: buffer.currentSegmentID,
NumOfRows: buffer.currentSpillRowNum,
SegmentID: buffer.writer.GetSegmentID(),
NumOfRows: buffer.flushedRowNum,
InsertLogs: insertLogs,
Field2StatslogPaths: []*datapb.FieldBinlog{statPaths},
Channel: t.plan.GetChannel(),
Expand All @@ -788,20 +739,14 @@ func (t *clusteringCompactionTask) packBuffersToSegments(ctx context.Context, bu
BF: buffer.clusteringKeyFieldStats.BF,
Centroids: buffer.clusteringKeyFieldStats.Centroids,
}},
NumRows: int(buffer.currentSpillRowNum),
NumRows: int(buffer.flushedRowNum),
}
buffer.uploadedSegmentStats[buffer.currentSegmentID] = segmentStats
buffer.uploadedSegmentStats[buffer.writer.GetSegmentID()] = segmentStats
// refresh
buffer.currentSpillRowNum = 0
buffer.currentSpillSize = 0
buffer.currentPKStats = nil
segmentID, err := t.allocator.AllocOne()
if err != nil {
return err
}
buffer.currentSegmentID = segmentID
buffer.currentSpillBinlogs = make(map[UniqueID]*datapb.FieldBinlog, 0)
log.Debug("finish pack segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", buffer.currentSegmentID), zap.String("seg", seg.String()), zap.Any("segStats", segmentStats))
t.refreshBufferWriter(buffer)
buffer.flushedRowNum = 0
buffer.flushedBinlogs = make(map[UniqueID]*datapb.FieldBinlog, 0)
log.Debug("finish pack segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", buffer.writer.GetSegmentID()), zap.String("seg", seg.String()), zap.Any("segStats", segmentStats))
return nil
}

Expand All @@ -820,32 +765,25 @@ func (t *clusteringCompactionTask) spill(ctx context.Context, buffer *ClusterBuf
}

for fID, path := range partialBinlogs {
for _, binlog := range path.GetBinlogs() {
binlog.TimestampTo = uint64(buffer.bufferTimeStampTo)
binlog.TimestampFrom = uint64(buffer.bufferTimeStampFrom)
}
tmpBinlog, ok := buffer.currentSpillBinlogs[fID]
tmpBinlog, ok := buffer.flushedBinlogs[fID]
if !ok {
tmpBinlog = path
} else {
tmpBinlog.Binlogs = append(tmpBinlog.Binlogs, path.GetBinlogs()...)
}
buffer.currentSpillBinlogs[fID] = tmpBinlog
buffer.flushedBinlogs[fID] = tmpBinlog
}
buffer.currentSpillRowNum = buffer.currentSpillRowNum + buffer.bufferRowNum.Load()
buffer.flushedRowNum = buffer.flushedRowNum + buffer.bufferRowNum.Load()

// clean buffer
t.totalBufferSize.Add(-buffer.bufferSize.Load())

err = t.refreshBufferWriter(buffer)
if err != nil {
return err
}
buffer.bufferSize.Store(0)
buffer.bufferRowNum.Store(0)

if buffer.currentSpillRowNum > t.plan.GetMaxSegmentRows() {
if err := t.packBuffersToSegments(ctx, buffer); err != nil {
if buffer.flushedRowNum > t.plan.GetMaxSegmentRows() {
if err := t.packBufferToSegment(ctx, buffer); err != nil {
return err
}
}
Expand Down Expand Up @@ -971,7 +909,7 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment(
// Unable to deal with all empty segments cases, so return error
if binlogNum == 0 {
log.Warn("compact wrong, all segments' binlogs are empty")
return nil, errIllegalCompactionPlan
return nil, merr.WrapErrIllegalCompactionPlan()
}
log.Debug("binlogNum", zap.Int("binlogNum", binlogNum))
for idx := 0; idx < binlogNum; idx++ {
Expand Down Expand Up @@ -1094,8 +1032,10 @@ func (t *clusteringCompactionTask) refreshBufferWriter(buffer *ClusterBuffer) er
if err != nil {
return err
}
writer, err := compaction.NewSegmentWriter(t.plan.GetSchema(), 0, segmentID, t.partitionID, t.collectionID)
writer, err := compaction.NewSegmentWriter(t.plan.GetSchema(), t.plan.MaxSegmentRows, segmentID, t.partitionID, t.collectionID)
buffer.writer = writer
buffer.bufferSize.Store(0)
buffer.bufferRowNum.Store(0)
return nil
}

Expand Down
15 changes: 7 additions & 8 deletions pkg/util/merr/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,13 @@ var (

// Compaction
ErrCompactionReadDeltaLogErr = newMilvusError("fail to read delta log", 2300, false)
ErrClusteringCompactionClusterNotSupport = newMilvusError("milvus cluster not support clustering compaction", 2301, false)
ErrClusteringCompactionCollectionNotSupport = newMilvusError("collection not support clustering compaction", 2302, false)
ErrClusteringCompactionCollectionIsCompacting = newMilvusError("collection is compacting", 2303, false)
ErrClusteringCompactionNotSupportVector = newMilvusError("vector field clustering compaction is not supported", 2304, false)
ErrClusteringCompactionSubmitTaskFail = newMilvusError("fail to submit task", 2305, true)
ErrClusteringCompactionCompactionTaskLost = newMilvusError("compaction task lost", 2306, true)
ErrClusteringCompactionGetCollectionFail = newMilvusError("fail to get collection in compaction", 2307, true)
ErrClusteringCompactionMetaError = newMilvusError("fail to update meta in clustering compaction", 2308, true)
ErrIllegalCompactionPlan = newMilvusError("compaction plan illegal", 2301, false)
ErrClusteringCompactionClusterNotSupport = newMilvusError("milvus cluster not support clustering compaction", 2302, false)
ErrClusteringCompactionCollectionNotSupport = newMilvusError("collection not support clustering compaction", 2303, false)
ErrClusteringCompactionCollectionIsCompacting = newMilvusError("collection is compacting", 2304, false)
ErrClusteringCompactionNotSupportVector = newMilvusError("vector field clustering compaction is not supported", 2305, false)
ErrClusteringCompactionSubmitTaskFail = newMilvusError("fail to submit task", 2306, true)
ErrClusteringCompactionMetaError = newMilvusError("fail to update meta in clustering compaction", 2307, true)

// General
ErrOperationNotSupported = newMilvusError("unsupported operation", 3000, false)
Expand Down
16 changes: 8 additions & 8 deletions pkg/util/merr/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,14 @@ func WrapErrCompactionReadDeltaLogErr(msg ...string) error {
return err
}

func WrapErrIllegalCompactionPlan(msg ...string) error {
err := error(ErrIllegalCompactionPlan)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "->"))
}
return err
}

func WrapErrClusteringCompactionClusterNotSupport(msg ...string) error {
err := error(ErrClusteringCompactionClusterNotSupport)
if len(msg) > 0 {
Expand Down Expand Up @@ -1079,14 +1087,6 @@ func WrapErrClusteringCompactionSubmitTaskFail(taskType string, err error) error
return wrapFieldsWithDesc(ErrClusteringCompactionSubmitTaskFail, err.Error(), value("taskType", taskType))
}

func WrapErrClusteringCompactionCompactionTaskLost(planID int64) error {
return wrapFields(ErrClusteringCompactionCompactionTaskLost, value("planID", planID))
}

func WrapErrClusteringCompactionGetCollectionFail(collectionID int64, err error) error {
return wrapFieldsWithDesc(ErrClusteringCompactionGetCollectionFail, err.Error(), value("collectionID", collectionID))
}

func WrapErrClusteringCompactionMetaError(operation string, err error) error {
return wrapFieldsWithDesc(ErrClusteringCompactionMetaError, err.Error(), value("operation", operation))
}

0 comments on commit 2fe7139

Please sign in to comment.