Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] scan tikv as a stream #5080

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 53 additions & 81 deletions pkg/meta/tkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -2169,41 +2169,34 @@

func (m *kvMeta) doFindDeletedFiles(ts int64, limit int) (map[Ino]uint64, error) {
klen := 1 + 8 + 8
vals, err := m.scanValues(m.fmtKey("D"), limit, func(k, v []byte) bool {
// filter out invalid ones
return len(k) == klen && len(v) == 8 && m.parseInt64(v) < ts
files := make(map[Ino]uint64)
err := m.client.scan(m.fmtKey("D"), func(k, v []byte) {
if len(k) == klen && len(v) == 8 && m.parseInt64(v) < ts {
rb := utils.FromBuffer([]byte(k)[1:])
files[m.decodeInode(rb.Get(8))] = rb.Get64()
}
})
if err != nil {
return nil, err
}
files := make(map[Ino]uint64, len(vals))
for k := range vals {
rb := utils.FromBuffer([]byte(k)[1:])
files[m.decodeInode(rb.Get(8))] = rb.Get64()
}
return files, nil
return files, err
}

func (m *kvMeta) doCleanupSlices() {
if m.Name() == "tikv" {
m.client.gc()
}
klen := 1 + 8 + 4
vals, _ := m.scanValues(m.fmtKey("K"), -1, func(k, v []byte) bool {
// filter out invalid ones
return len(k) == klen && len(v) == 8 && parseCounter(v) <= 0
})
for k, v := range vals {
rb := utils.FromBuffer([]byte(k)[1:])
id := rb.Get64()
size := rb.Get32()
refs := parseCounter(v)
if refs < 0 {
m.deleteSlice(id, size)
} else {
m.cleanupZeroRef(id, size)
m.client.scan(m.fmtKey("K"), func(k, v []byte) {

Check failure on line 2187 in pkg/meta/tkv.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `m.client.scan` is not checked (errcheck)
if len(k) == klen && len(v) == 8 && parseCounter(v) <= 0 {
rb := utils.FromBuffer([]byte(k)[1:])
id := rb.Get64()
size := rb.Get32()
refs := parseCounter(v)
if refs < 0 {
m.deleteSlice(id, size)
} else {
m.cleanupZeroRef(id, size)
}
}
}
})
}

func (m *kvMeta) deleteChunk(inode Ino, indx uint32) error {
Expand Down Expand Up @@ -2401,19 +2394,15 @@
}
// AiiiiiiiiCnnnn file chunks
klen := 1 + 8 + 1 + 4
result, err := m.scanValues(m.fmtKey("A"), -1, func(k, v []byte) bool {
return len(k) == klen && k[1+8] == 'C'
})
if err != nil {
logger.Warnf("scan chunks: %s", err)
return errno(err)
}
for key, value := range result {
m.client.scan(m.fmtKey("A"), func(key, value []byte) {

Check failure on line 2397 in pkg/meta/tkv.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `m.client.scan` is not checked (errcheck)
if len(key) != klen || key[1+8] != 'C' {
return
}
inode := m.decodeInode([]byte(key)[1:9])
ss := readSliceBuf(value)
if ss == nil {
logger.Errorf("Corrupt value for inode %d chunk key %s", inode, key)
continue
return
}
for _, s := range ss {
if s.id > 0 {
Expand All @@ -2423,7 +2412,7 @@
}
}
}
}
})
if m.getFormat().TrashDays == 0 {
return 0
}
Expand All @@ -2446,18 +2435,14 @@

// delayed slices: Lttttttttcccccccc
klen := 1 + 8 + 8
keys, err := m.scanKeys(m.fmtKey("L"))
if err != nil {
return err
}

var ss []Slice
var rs []int64
for _, key := range keys {
if len(key) != klen {
continue
return m.client.scan(m.fmtKey("L"), func(key, value []byte) {
if len(key) != klen || len(value) == 0 {
return
}
var clean bool
var err error
err = m.txn(func(tx *kvTxn) error {
ss := ss[:0]
rs := rs[:0]
Expand All @@ -2481,7 +2466,8 @@
return nil
})
if err != nil {
return err
logger.Warnf("scan trash slices %s: %s", key, err)
return
}
if clean && len(rs) == len(ss) {
for i, s := range ss {
Expand All @@ -2490,9 +2476,7 @@
}
}
}

}
return nil
})
}

func (m *kvMeta) scanPendingSlices(ctx Context, scan pendingSliceScan) error {
Expand All @@ -2502,29 +2486,24 @@

// slice refs: Kiiiiiiiissss
klen := 1 + 8 + 4
pairs, err := m.scanValues(m.fmtKey("K"), -1, func(k, v []byte) bool {
return m.client.scan(m.fmtKey("K"), func(key, v []byte) {
refs := parseCounter(v)
return len(k) == klen && refs < 0
})
if err != nil {
return err
}

for key := range pairs {
b := utils.ReadBuffer([]byte(key)[1:])
id := b.Get64()
size := b.Get32()
clean, err := scan(id, size)
if err != nil {
return errors.Wrap(err, "scan pending deleted slices")
}
if clean {
// TODO: m.deleteSlice(id, size)
// avoid lint warning
_ = clean
if len(key) == klen && refs < 0 {
b := utils.ReadBuffer([]byte(key)[1:])
id := b.Get64()
size := b.Get32()
clean, err := scan(id, size)
if err != nil {
logger.Warnf("scan pending deleted slices %d %d: %s", id, size, err)
return
}
if clean {
// TODO: m.deleteSlice(id, size)
// avoid lint warning
_ = clean
}
}
}
return nil
})
}

func (m *kvMeta) scanPendingFiles(ctx Context, scan pendingFileScan) error {
Expand All @@ -2533,29 +2512,22 @@
}
// deleted files: Diiiiiiiissssssss
klen := 1 + 8 + 8
pairs, err := m.scanValues(m.fmtKey("D"), -1, func(k, v []byte) bool {
return len(k) == klen
})
if err != nil {
return err
}

for key, value := range pairs {
return m.client.scan(m.fmtKey("D"), func(key, value []byte) {
if len(key) != klen {
return fmt.Errorf("invalid key %x", key)
return
}
ino := m.decodeInode([]byte(key)[1:9])
size := binary.BigEndian.Uint64([]byte(key)[9:])
ts := m.parseInt64(value)
clean, err := scan(ino, size, ts)
if err != nil {
return err
logger.Warnf("scan pending deleted files %d %d %d: %s", ino, size, ts, err)
return
}
if clean {
m.doDeleteFileData(ino, size)
}
}
return nil
})
}

func (m *kvMeta) doRepair(ctx Context, inode Ino, attr *Attr) syscall.Errno {
Expand Down
24 changes: 22 additions & 2 deletions pkg/meta/tkv_tikv.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,19 +211,39 @@ func (c *tikvClient) scan(prefix []byte, handler func(key, value []byte)) error
if err != nil {
return err
}
end := nextKey(prefix)
snap := c.client.GetSnapshot(ts)
snap.SetScanBatchSize(10240)
snap.SetNotFillCache(true)
snap.SetPriority(txnutil.PriorityLow)
it, err := snap.Iter(prefix, nextKey(prefix))
it, err := snap.Iter(prefix, end)
if err != nil {
return err
}
defer it.Close()
var lastKey []byte
for it.Valid() {
handler(it.Key(), it.Value())
lastKey = it.Key()
if err = it.Next(); err != nil {
return err
if _, ok := err.(*tikverr.ErrGCTooEarly); !ok {
logger.Warnf("scan next key: %s", err)
return err
}
it.Close()
// restart scan
ts, err = c.client.CurrentTimestamp("global")
if err != nil {
return err
}
snap = c.client.GetSnapshot(ts)
snap.SetScanBatchSize(10240)
snap.SetNotFillCache(true)
snap.SetPriority(txnutil.PriorityLow)
it, err = snap.Iter(nextKey(lastKey), end)
if err != nil {
return err
}
}
}
return nil
Expand Down
Loading