Skip to content

Commit

Permalink
meta: fix the issue that truncated slices may never be compacted (jui…
Browse files Browse the repository at this point in the history
  • Loading branch information
SandyXSD authored Jan 10, 2024
1 parent 113c836 commit 7b85645
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 5 deletions.
2 changes: 2 additions & 0 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ const (
nlocks = 1024
)

var maxCompactSlices = 1000

type engine interface {
// Get the value of counter name.
getCounter(name string) (int64, error)
Expand Down
9 changes: 8 additions & 1 deletion pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -2912,7 +2912,7 @@ func (m *redisMeta) compactChunk(inode Ino, indx uint32, force bool) {
m.Unlock()

var ctx = Background
vals, err := m.rdb.LRange(ctx, m.chunkKey(inode, indx), 0, 1000).Result()
vals, err := m.rdb.LRange(ctx, m.chunkKey(inode, indx), 0, int64(maxCompactSlices)).Result()
if err != nil {
return
}
Expand All @@ -2923,11 +2923,18 @@ func (m *redisMeta) compactChunk(inode Ino, indx uint32, force bool) {
return
}
skipped := skipSome(ss)
var last *slice
if skipped > 0 {
last = ss[skipped-1]
}
ss = ss[skipped:]
pos, size, slices := compactChunk(ss)
if len(ss) < 2 || size == 0 {
return
}
if last != nil && last.pos+last.len > pos {
panic(fmt.Sprintf("invalid compaction: last skipped slice %+v, pos %d", last, pos))
}

var id uint64
st := m.NewSlice(ctx, &id)
Expand Down
4 changes: 3 additions & 1 deletion pkg/meta/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,9 @@ func buildSlice(ss []*slice) []Slice {
func compactChunk(ss []*slice) (uint32, uint32, []Slice) {
var chunk = buildSlice(ss)
var pos uint32
if len(chunk) > 0 && chunk[0].Id == 0 {
if len(chunk) == 1 && chunk[0].Id == 0 {
chunk[0].Len = 1
} else if len(chunk) > 1 && chunk[0].Id == 0 {
pos = chunk[0].Len
chunk = chunk[1:]
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/meta/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -2773,18 +2773,28 @@ func (m *dbMeta) compactChunk(inode Ino, indx uint32, force bool) {
if err != nil {
return
}
if len(c.Slices) > sliceBytes*maxCompactSlices {
c.Slices = c.Slices[:sliceBytes*maxCompactSlices]
}

ss := readSliceBuf(c.Slices)
if ss == nil {
logger.Errorf("Corrupt value for inode %d chunk indx %d", inode, indx)
return
}
skipped := skipSome(ss)
var last *slice
if skipped > 0 {
last = ss[skipped-1]
}
ss = ss[skipped:]
pos, size, slices := compactChunk(ss)
if len(ss) < 2 || size == 0 {
return
}
if last != nil && last.pos+last.len > pos {
panic(fmt.Sprintf("invalid compaction: last skipped slice %+v, pos %d", last, pos))
}

var id uint64
st := m.NewSlice(Background, &id)
Expand Down
13 changes: 10 additions & 3 deletions pkg/meta/tkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -2400,21 +2400,28 @@ func (m *kvMeta) compactChunk(inode Ino, indx uint32, force bool) {
if err != nil {
return
}

if len(buf) > sliceBytes*100 {
buf = buf[:sliceBytes*100]
if len(buf) > sliceBytes*maxCompactSlices {
buf = buf[:sliceBytes*maxCompactSlices]
}

ss := readSliceBuf(buf)
if ss == nil {
logger.Errorf("Corrupt value for inode %d chunk indx %d", inode, indx)
return
}
skipped := skipSome(ss)
var last *slice
if skipped > 0 {
last = ss[skipped-1]
}
ss = ss[skipped:]
pos, size, slices := compactChunk(ss)
if len(ss) < 2 || size == 0 {
return
}
if last != nil && last.pos+last.len > pos {
panic(fmt.Sprintf("invalid compaction: last skipped slice %+v, pos %d", last, pos))
}

var id uint64
st := m.NewSlice(Background, &id)
Expand Down
1 change: 1 addition & 0 deletions pkg/meta/tkv_etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ func newEtcdClient(addr string) (tkvClient, error) {
if err != nil {
return nil, err
}
maxCompactSlices = 100
var prefix string = u.Path + "\xFD"
return withPrefix(&etcdClient{c, etcd.NewKV(c)}, []byte(prefix)), nil
}
Expand Down

0 comments on commit 7b85645

Please sign in to comment.