Skip to content

Commit

Permalink
fix clustering compactor
Browse files Browse the repository at this point in the history
  • Loading branch information
wayblink committed May 28, 2024
1 parent 71bd0a1 commit 89a5c7d
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 8 deletions.
17 changes: 10 additions & 7 deletions internal/datanode/clustering_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,10 +298,10 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e
}
buffer := &ClusterBuffer{
id: id,
flushedBinlogs: make(map[UniqueID]*datapb.FieldBinlog, 0),
uploadedSegments: make([]*datapb.CompactionSegment, 0),
uploadedSegmentStats: make(map[UniqueID]storage.SegmentStats, 0),
clusteringKeyFieldStats: fieldStats,
//buffer: writeBuffer, // no need
}
t.clusterBuffers = append(t.clusterBuffers, buffer)
for _, key := range bucket {
Expand Down Expand Up @@ -346,6 +346,7 @@ func (t *clusteringCompactionTask) getVectorAnalyzeResult(ctx context.Context) e
fieldStats.SetVectorCentroids(storage.NewVectorFieldValue(t.clusteringKeyField.DataType, centroid))
clusterBuffer := &ClusterBuffer{
id: id,
flushedBinlogs: make(map[UniqueID]*datapb.FieldBinlog, 0),
uploadedSegments: make([]*datapb.CompactionSegment, 0),
uploadedSegmentStats: make(map[UniqueID]storage.SegmentStats, 0),
clusteringKeyFieldStats: fieldStats,
Expand Down Expand Up @@ -605,10 +606,14 @@ func (t *clusteringCompactionTask) writeToBuffer(ctx context.Context, clusterBuf
return err
}
}
err := clusterBuffer.writer.Write(value)
if err != nil {
return err
}
clusterBuffer.bufferSize.Add(int64(rowSize))
clusterBuffer.bufferRowNum.Add(1)
t.totalBufferSize.Add(int64(rowSize))
return clusterBuffer.writer.Write(value)
return nil
}

func (t *clusteringCompactionTask) getWorkerPoolSize() int {
Expand Down Expand Up @@ -746,7 +751,7 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff
t.refreshBufferWriter(buffer)
buffer.flushedRowNum = 0
buffer.flushedBinlogs = make(map[UniqueID]*datapb.FieldBinlog, 0)
log.Debug("finish pack segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", buffer.writer.GetSegmentID()), zap.String("seg", seg.String()), zap.Any("segStats", segmentStats))
log.Info("finish pack segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", buffer.writer.GetSegmentID()), zap.String("seg", seg.String()), zap.Any("segStats", segmentStats))
return nil
}

Expand Down Expand Up @@ -777,10 +782,8 @@ func (t *clusteringCompactionTask) spill(ctx context.Context, buffer *ClusterBuf

// clean buffer
t.totalBufferSize.Add(-buffer.bufferSize.Load())
err = t.refreshBufferWriter(buffer)
if err != nil {
return err
}
buffer.bufferSize.Store(0)
buffer.bufferRowNum.Store(0)

if buffer.flushedRowNum > t.plan.GetMaxSegmentRows() {
if err := t.packBufferToSegment(ctx, buffer); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/datanode/io/binlog_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (b *BinlogIoImpl) Upload(ctx context.Context, kvs map[string][]byte) error
innerK, innerV := k, v
future := b.pool.Submit(func() (any, error) {
var err error
log.Debug("BinlogIO upload", zap.String("paths", innerK))
log.Info("BinlogIO upload", zap.String("paths", innerK))
err = retry.Do(ctx, func() error {
err = b.Write(ctx, innerK, innerV)
if err != nil {
Expand Down

0 comments on commit 89a5c7d

Please sign in to comment.