Skip to content

Commit

Permalink
fix clustering compactor
Browse files Browse the repository at this point in the history
  • Loading branch information
wayblink committed May 28, 2024
1 parent 71bd0a1 commit 5588e2e
Showing 1 changed file with 10 additions and 7 deletions.
17 changes: 10 additions & 7 deletions internal/datanode/clustering_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,10 +298,10 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e
}
buffer := &ClusterBuffer{
id: id,
flushedBinlogs: make(map[UniqueID]*datapb.FieldBinlog, 0),
uploadedSegments: make([]*datapb.CompactionSegment, 0),
uploadedSegmentStats: make(map[UniqueID]storage.SegmentStats, 0),
clusteringKeyFieldStats: fieldStats,
//buffer: writeBuffer, // no need
}
t.clusterBuffers = append(t.clusterBuffers, buffer)
for _, key := range bucket {
Expand Down Expand Up @@ -346,6 +346,7 @@ func (t *clusteringCompactionTask) getVectorAnalyzeResult(ctx context.Context) e
fieldStats.SetVectorCentroids(storage.NewVectorFieldValue(t.clusteringKeyField.DataType, centroid))
clusterBuffer := &ClusterBuffer{
id: id,
flushedBinlogs: make(map[UniqueID]*datapb.FieldBinlog, 0),
uploadedSegments: make([]*datapb.CompactionSegment, 0),
uploadedSegmentStats: make(map[UniqueID]storage.SegmentStats, 0),
clusteringKeyFieldStats: fieldStats,
Expand Down Expand Up @@ -605,10 +606,14 @@ func (t *clusteringCompactionTask) writeToBuffer(ctx context.Context, clusterBuf
return err
}
}
err := clusterBuffer.writer.Write(value)
if err != nil {
return err
}
clusterBuffer.bufferSize.Add(int64(rowSize))
clusterBuffer.bufferRowNum.Add(1)
t.totalBufferSize.Add(int64(rowSize))
return clusterBuffer.writer.Write(value)
return nil
}

func (t *clusteringCompactionTask) getWorkerPoolSize() int {
Expand Down Expand Up @@ -746,7 +751,7 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff
t.refreshBufferWriter(buffer)
buffer.flushedRowNum = 0
buffer.flushedBinlogs = make(map[UniqueID]*datapb.FieldBinlog, 0)
log.Debug("finish pack segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", buffer.writer.GetSegmentID()), zap.String("seg", seg.String()), zap.Any("segStats", segmentStats))
log.Info("finish pack segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", buffer.writer.GetSegmentID()), zap.String("seg", seg.String()), zap.Any("segStats", segmentStats))
return nil
}

Expand Down Expand Up @@ -777,10 +782,8 @@ func (t *clusteringCompactionTask) spill(ctx context.Context, buffer *ClusterBuf

// clean buffer
t.totalBufferSize.Add(-buffer.bufferSize.Load())
err = t.refreshBufferWriter(buffer)
if err != nil {
return err
}
buffer.bufferSize.Store(0)
buffer.bufferRowNum.Store(0)

if buffer.flushedRowNum > t.plan.GetMaxSegmentRows() {
if err := t.packBufferToSegment(ctx, buffer); err != nil {
Expand Down

0 comments on commit 5588e2e

Please sign in to comment.