From f4ec590b3122b047ecf2f72a8858ecc82966d61c Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Sat, 10 Aug 2024 20:13:44 +0800 Subject: [PATCH] scan tikv as a stream --- pkg/meta/tkv.go | 134 +++++++++++++++++-------------------------- pkg/meta/tkv_tikv.go | 24 +++++++- 2 files changed, 75 insertions(+), 83 deletions(-) diff --git a/pkg/meta/tkv.go b/pkg/meta/tkv.go index 47d4427936c0..1cc4e9e2b13a 100644 --- a/pkg/meta/tkv.go +++ b/pkg/meta/tkv.go @@ -2169,19 +2169,14 @@ func (m *kvMeta) doGetDirStat(ctx Context, ino Ino, trySync bool) (*dirStat, sys func (m *kvMeta) doFindDeletedFiles(ts int64, limit int) (map[Ino]uint64, error) { klen := 1 + 8 + 8 - vals, err := m.scanValues(m.fmtKey("D"), limit, func(k, v []byte) bool { - // filter out invalid ones - return len(k) == klen && len(v) == 8 && m.parseInt64(v) < ts + files := make(map[Ino]uint64) + err := m.client.scan(m.fmtKey("D"), func(k, v []byte) { + if len(k) == klen && len(v) == 8 && m.parseInt64(v) < ts { + rb := utils.FromBuffer([]byte(k)[1:]) + files[m.decodeInode(rb.Get(8))] = rb.Get64() + } }) - if err != nil { - return nil, err - } - files := make(map[Ino]uint64, len(vals)) - for k := range vals { - rb := utils.FromBuffer([]byte(k)[1:]) - files[m.decodeInode(rb.Get(8))] = rb.Get64() - } - return files, nil + return files, err } func (m *kvMeta) doCleanupSlices() { @@ -2189,21 +2184,19 @@ func (m *kvMeta) doCleanupSlices() { m.client.gc() } klen := 1 + 8 + 4 - vals, _ := m.scanValues(m.fmtKey("K"), -1, func(k, v []byte) bool { - // filter out invalid ones - return len(k) == klen && len(v) == 8 && parseCounter(v) <= 0 - }) - for k, v := range vals { - rb := utils.FromBuffer([]byte(k)[1:]) - id := rb.Get64() - size := rb.Get32() - refs := parseCounter(v) - if refs < 0 { - m.deleteSlice(id, size) - } else { - m.cleanupZeroRef(id, size) + m.client.scan(m.fmtKey("K"), func(k, v []byte) { + if len(k) == klen && len(v) == 8 && parseCounter(v) <= 0 { + rb := utils.FromBuffer([]byte(k)[1:]) + id := rb.Get64() + size := rb.Get32() + refs := parseCounter(v) + if refs < 0 { + m.deleteSlice(id, size) + } else { + m.cleanupZeroRef(id, size) + } } - } + }) } func (m *kvMeta) deleteChunk(inode Ino, indx uint32) error { @@ -2401,19 +2394,15 @@ func (m *kvMeta) ListSlices(ctx Context, slices map[Ino][]Slice, delete bool, sh } // AiiiiiiiiCnnnn file chunks klen := 1 + 8 + 1 + 4 - result, err := m.scanValues(m.fmtKey("A"), -1, func(k, v []byte) bool { - return len(k) == klen && k[1+8] == 'C' - }) - if err != nil { - logger.Warnf("scan chunks: %s", err) - return errno(err) - } - for key, value := range result { + m.client.scan(m.fmtKey("A"), func(key, value []byte) { + if len(key) != klen || key[1+8] != 'C' { + return + } inode := m.decodeInode([]byte(key)[1:9]) ss := readSliceBuf(value) if ss == nil { logger.Errorf("Corrupt value for inode %d chunk key %s", inode, key) - continue + return } for _, s := range ss { if s.id > 0 { @@ -2423,7 +2412,7 @@ func (m *kvMeta) ListSlices(ctx Context, slices map[Ino][]Slice, delete bool, sh } } } - } + }) if m.getFormat().TrashDays == 0 { return 0 } @@ -2446,18 +2435,14 @@ func (m *kvMeta) scanTrashSlices(ctx Context, scan trashSliceScan) error { // delayed slices: Lttttttttcccccccc klen := 1 + 8 + 8 - keys, err := m.scanKeys(m.fmtKey("L")) - if err != nil { - return err - } - var ss []Slice var rs []int64 - for _, key := range keys { - if len(key) != klen { - continue + return m.client.scan(m.fmtKey("L"), func(key, value []byte) { + if len(key) != klen || len(value) == 0 { + return } var clean bool + var err error err = m.txn(func(tx *kvTxn) error { ss := ss[:0] rs := rs[:0] @@ -2481,7 +2466,8 @@ func (m *kvMeta) scanTrashSlices(ctx Context, scan trashSliceScan) error { return nil }) if err != nil { - return err + logger.Warnf("scan trash slices %s: %s", key, err) + return } if clean && len(rs) == len(ss) { for i, s := range ss { @@ -2490,9 +2476,7 @@ func (m *kvMeta) scanTrashSlices(ctx Context, scan trashSliceScan) error { } } } - - } - return nil + }) } func (m *kvMeta) scanPendingSlices(ctx Context, scan pendingSliceScan) error { @@ -2502,29 +2486,24 @@ func (m *kvMeta) scanPendingSlices(ctx Context, scan pendingSliceScan) error { // slice refs: Kiiiiiiiissss klen := 1 + 8 + 4 - pairs, err := m.scanValues(m.fmtKey("K"), -1, func(k, v []byte) bool { + return m.client.scan(m.fmtKey("K"), func(key, v []byte) { refs := parseCounter(v) - return len(k) == klen && refs < 0 - }) - if err != nil { - return err - } - - for key := range pairs { - b := utils.ReadBuffer([]byte(key)[1:]) - id := b.Get64() - size := b.Get32() - clean, err := scan(id, size) - if err != nil { - return errors.Wrap(err, "scan pending deleted slices") - } - if clean { - // TODO: m.deleteSlice(id, size) - // avoid lint warning - _ = clean + if len(key) == klen && refs < 0 { + b := utils.ReadBuffer([]byte(key)[1:]) + id := b.Get64() + size := b.Get32() + clean, err := scan(id, size) + if err != nil { + logger.Warnf("scan pending deleted slices %d %d: %s", id, size, err) + return + } + if clean { + // TODO: m.deleteSlice(id, size) + // avoid lint warning + _ = clean + } } - } - return nil + }) } func (m *kvMeta) scanPendingFiles(ctx Context, scan pendingFileScan) error { @@ -2533,29 +2512,22 @@ func (m *kvMeta) scanPendingFiles(ctx Context, scan pendingFileScan) error { } // deleted files: Diiiiiiiissssssss klen := 1 + 8 + 8 - pairs, err := m.scanValues(m.fmtKey("D"), -1, func(k, v []byte) bool { - return len(k) == klen - }) - if err != nil { - return err - } - - for key, value := range pairs { + return m.client.scan(m.fmtKey("D"), func(key, value []byte) { if len(key) != klen { - return fmt.Errorf("invalid key %x", key) + return } ino := m.decodeInode([]byte(key)[1:9]) size := binary.BigEndian.Uint64([]byte(key)[9:]) ts := m.parseInt64(value) clean, err := scan(ino, size, ts) if err != nil { - return err + logger.Warnf("scan pending deleted files %d %d %d: %s", ino, size, ts, err) + return } if clean { m.doDeleteFileData(ino, size) } - } - return nil + }) } func (m *kvMeta) doRepair(ctx Context, inode Ino, attr *Attr) syscall.Errno { diff --git a/pkg/meta/tkv_tikv.go b/pkg/meta/tkv_tikv.go index c682bd7095be..1aeec2997832 100644 --- a/pkg/meta/tkv_tikv.go +++ b/pkg/meta/tkv_tikv.go @@ -211,19 +211,39 @@ func (c *tikvClient) scan(prefix []byte, handler func(key, value []byte)) error if err != nil { return err } + end := nextKey(prefix) snap := c.client.GetSnapshot(ts) snap.SetScanBatchSize(10240) snap.SetNotFillCache(true) snap.SetPriority(txnutil.PriorityLow) - it, err := snap.Iter(prefix, nextKey(prefix)) + it, err := snap.Iter(prefix, end) if err != nil { return err } defer it.Close() + var lastKey []byte for it.Valid() { handler(it.Key(), it.Value()) + lastKey = it.Key() if err = it.Next(); err != nil { - return err + if _, ok := err.(*tikverr.ErrGCTooEarly); !ok { + logger.Warnf("scan next key: %s", err) + return err + } + it.Close() + // restart scan + ts, err = c.client.CurrentTimestamp("global") + if err != nil { + return err + } + snap = c.client.GetSnapshot(ts) + snap.SetScanBatchSize(10240) + snap.SetNotFillCache(true) + snap.SetPriority(txnutil.PriorityLow) + it, err = snap.Iter(nextKey(lastKey), end) + if err != nil { + return err + } } } return nil