From 89a5c7d4af20ac14ba4f218751258c9746486768 Mon Sep 17 00:00:00 2001 From: wayblink Date: Tue, 28 May 2024 14:20:36 +0800 Subject: [PATCH] fix clustering compactor --- internal/datanode/clustering_compactor.go | 17 ++++++++++------- internal/datanode/io/binlog_io.go | 2 +- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/internal/datanode/clustering_compactor.go b/internal/datanode/clustering_compactor.go index 13da49f4c59cb..3876f4a50398d 100644 --- a/internal/datanode/clustering_compactor.go +++ b/internal/datanode/clustering_compactor.go @@ -298,10 +298,10 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e } buffer := &ClusterBuffer{ id: id, + flushedBinlogs: make(map[UniqueID]*datapb.FieldBinlog, 0), uploadedSegments: make([]*datapb.CompactionSegment, 0), uploadedSegmentStats: make(map[UniqueID]storage.SegmentStats, 0), clusteringKeyFieldStats: fieldStats, - //buffer: writeBuffer, // no need } t.clusterBuffers = append(t.clusterBuffers, buffer) for _, key := range bucket { @@ -346,6 +346,7 @@ func (t *clusteringCompactionTask) getVectorAnalyzeResult(ctx context.Context) e fieldStats.SetVectorCentroids(storage.NewVectorFieldValue(t.clusteringKeyField.DataType, centroid)) clusterBuffer := &ClusterBuffer{ id: id, + flushedBinlogs: make(map[UniqueID]*datapb.FieldBinlog, 0), uploadedSegments: make([]*datapb.CompactionSegment, 0), uploadedSegmentStats: make(map[UniqueID]storage.SegmentStats, 0), clusteringKeyFieldStats: fieldStats, @@ -605,10 +606,14 @@ func (t *clusteringCompactionTask) writeToBuffer(ctx context.Context, clusterBuf return err } } + err := clusterBuffer.writer.Write(value) + if err != nil { + return err + } clusterBuffer.bufferSize.Add(int64(rowSize)) clusterBuffer.bufferRowNum.Add(1) t.totalBufferSize.Add(int64(rowSize)) - return clusterBuffer.writer.Write(value) + return nil } func (t *clusteringCompactionTask) getWorkerPoolSize() int { @@ -746,7 +751,7 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff t.refreshBufferWriter(buffer) buffer.flushedRowNum = 0 buffer.flushedBinlogs = make(map[UniqueID]*datapb.FieldBinlog, 0) - log.Debug("finish pack segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", buffer.writer.GetSegmentID()), zap.String("seg", seg.String()), zap.Any("segStats", segmentStats)) + log.Info("finish pack segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", buffer.writer.GetSegmentID()), zap.String("seg", seg.String()), zap.Any("segStats", segmentStats)) return nil } @@ -777,10 +782,8 @@ func (t *clusteringCompactionTask) spill(ctx context.Context, buffer *ClusterBuf // clean buffer t.totalBufferSize.Add(-buffer.bufferSize.Load()) - err = t.refreshBufferWriter(buffer) - if err != nil { - return err - } + buffer.bufferSize.Store(0) + buffer.bufferRowNum.Store(0) if buffer.flushedRowNum > t.plan.GetMaxSegmentRows() { if err := t.packBufferToSegment(ctx, buffer); err != nil { diff --git a/internal/datanode/io/binlog_io.go b/internal/datanode/io/binlog_io.go index 317f267978132..76623b7cfd74c 100644 --- a/internal/datanode/io/binlog_io.go +++ b/internal/datanode/io/binlog_io.go @@ -91,7 +91,7 @@ func (b *BinlogIoImpl) Upload(ctx context.Context, kvs map[string][]byte) error innerK, innerV := k, v future := b.pool.Submit(func() (any, error) { var err error - log.Debug("BinlogIO upload", zap.String("paths", innerK)) + log.Info("BinlogIO upload", zap.String("paths", innerK)) err = retry.Do(ctx, func() error { err = b.Write(ctx, innerK, innerV) if err != nil {