Skip to content

Commit

Permalink
meta: force compaction when there's too many slices in one chunk (#4573)
Browse files Browse the repository at this point in the history
  • Loading branch information
SandyXSD authored Mar 26, 2024
1 parent 3f3b129 commit 11d436e
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 63 deletions.
2 changes: 1 addition & 1 deletion .github/scripts/prepare_db.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ install_etcd(){
quay.io/coreos/etcd:v3.5.7 \
/usr/local/bin/etcd --data-dir=/etcd-data --name node1 \
--listen-client-urls http://0.0.0.0:2379 \
--advertise-client-urls http://0.0.0.0:2379 \
--advertise-client-urls http://0.0.0.0:3379 \
--listen-peer-urls http://0.0.0.0:2380 \
--initial-advertise-peer-urls http://0.0.0.0:2380 \
--initial-cluster node1=http://0.0.0.0:2380
Expand Down
7 changes: 4 additions & 3 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (
)

var maxCompactSlices = 1000
var maxSlices = 2500

type engine interface {
// Get the value of counter name.
Expand All @@ -67,7 +68,7 @@ type engine interface {
doInit(format *Format, force bool) error

scanAllChunks(ctx Context, ch chan<- cchunk, bar *utils.Bar) error
compactChunk(inode Ino, indx uint32, force bool)
compactChunk(inode Ino, indx uint32, once, force bool)
doDeleteSustainedInode(sid uint64, inode Ino) error
doFindDeletedFiles(ts int64, limit int) (map[Ino]uint64, error) // limit < 0 means all
doDeleteFileData(inode Ino, length uint64)
Expand Down Expand Up @@ -2255,7 +2256,7 @@ func (m *baseMeta) CompactAll(ctx Context, threads int, bar *utils.Bar) syscall.
go func() {
for c := range ch {
logger.Debugf("Compacting chunk %d:%d (%d slices)", c.inode, c.indx, c.slices)
m.en.compactChunk(c.inode, c.indx, true)
m.en.compactChunk(c.inode, c.indx, false, true)
bar.Increment()
}
wg.Done()
Expand Down Expand Up @@ -2287,7 +2288,7 @@ func (m *baseMeta) Compact(ctx Context, inode Ino, concurrency int, preFunc, pos
go func() {
defer wg.Done()
for c := range chunkChan {
m.en.compactChunk(c.inode, c.indx, true)
m.en.compactChunk(c.inode, c.indx, false, true)
postFunc()
}
}()
Expand Down
34 changes: 28 additions & 6 deletions pkg/meta/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1369,7 +1369,7 @@ func testCaseIncensi(t *testing.T, m Meta) {
}

type compactor interface {
compactChunk(inode Ino, indx uint32, force bool)
compactChunk(inode Ino, indx uint32, once, force bool)
}

func testCompaction(t *testing.T, m Meta, trash bool) {
Expand Down Expand Up @@ -1422,7 +1422,7 @@ func testCompaction(t *testing.T, m Meta, trash bool) {
t.Fatalf("expect 5 slices, but got %+v", cs1)
}
if c, ok := m.(compactor); ok {
c.compactChunk(inode, 1, true)
c.compactChunk(inode, 1, false, true)
}
var cs []Slice
_ = m.Read(ctx, inode, 1, &cs)
Expand All @@ -1441,7 +1441,7 @@ func testCompaction(t *testing.T, m Meta, trash bool) {
time.Sleep(time.Millisecond)
}
if c, ok := m.(compactor); ok {
c.compactChunk(inode, 0, true)
c.compactChunk(inode, 0, false, true)
}
var slices []Slice
if st := m.Read(ctx, inode, 0, &slices); st != 0 {
Expand Down Expand Up @@ -1494,7 +1494,7 @@ func testCompaction(t *testing.T, m Meta, trash bool) {
t.Fatalf("truncate file: %s", st)
}
if c, ok := m.(compactor); ok {
c.compactChunk(inode, 0, true)
c.compactChunk(inode, 0, false, true)
}
if st := m.Read(ctx, inode, 0, &slices); st != 0 {
t.Fatalf("read 0: %s", st)
Expand All @@ -1509,7 +1509,7 @@ func testCompaction(t *testing.T, m Meta, trash bool) {
m.NewSlice(ctx, &sliceId)
_ = m.Write(ctx, inode, 0, uint32(1<<20), Slice{Id: sliceId, Size: 2 << 20, Len: 2 << 20}, time.Now())
if c, ok := m.(compactor); ok {
c.compactChunk(inode, 0, true)
c.compactChunk(inode, 0, false, true)
}
if st := m.Read(ctx, inode, 0, &slices); st != 0 {
t.Fatalf("read 0: %s", st)
Expand All @@ -1527,7 +1527,7 @@ func testCompaction(t *testing.T, m Meta, trash bool) {
_ = m.Write(ctx, inode, 0, uint32(128<<10), Slice{Id: sliceId, Size: 2 << 20, Len: 128 << 10}, time.Now())
_ = m.Write(ctx, inode, 0, uint32(0), Slice{Id: 0, Size: 1 << 20, Len: 1 << 20}, time.Now())
if c, ok := m.(compactor); ok {
c.compactChunk(inode, 0, true)
c.compactChunk(inode, 0, false, true)
}
if st := m.Read(ctx, inode, 0, &slices); st != 0 {
t.Fatalf("read 0: %s", st)
Expand Down Expand Up @@ -1576,6 +1576,28 @@ func testConcurrentWrite(t *testing.T, m Meta) {
if errno != 0 {
t.Fatal()
}

var g2 sync.WaitGroup
for i := 0; i <= 10; i++ {
g2.Add(1)
go func() {
defer g2.Done()
for j := 0; j < 1000; j++ {
var sliceId uint64
m.NewSlice(ctx, &sliceId)
var slice = Slice{Id: sliceId, Size: 100, Len: 100}
st := m.Write(ctx, inode, 0, uint32(200*j), slice, time.Now())
if st != 0 {
errno = st
break
}
}
}()
}
g2.Wait()
if errno != 0 {
t.Fatal()
}
}

func testTruncateAndDelete(t *testing.T, m Meta) {
Expand Down
43 changes: 26 additions & 17 deletions pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -2339,7 +2339,7 @@ func (m *redisMeta) Read(ctx Context, inode Ino, indx uint32, slices *[]Slice) (
*slices = buildSlice(ss)
m.of.CacheChunk(inode, indx, *slices)
if !m.conf.ReadOnly && (len(vals) >= 5 || len(*slices) >= 5) {
go m.compactChunk(inode, indx, false)
go m.compactChunk(inode, indx, false, false)
}
return 0
}
Expand All @@ -2354,7 +2354,7 @@ func (m *redisMeta) Write(ctx Context, inode Ino, indx uint32, off uint32, slice
defer func() { m.of.InvalidateChunk(inode, indx) }()
var newLength, newSpace int64
var attr Attr
var needCompact bool
var numSlices int64
err := m.txn(ctx, func(tx *redis.Tx) error {
newLength, newSpace = 0, 0
attr = Attr{}
Expand Down Expand Up @@ -2393,15 +2393,19 @@ func (m *redisMeta) Write(ctx Context, inode Ino, indx uint32, off uint32, slice
return nil
})
if err == nil {
needCompact = rpush.Val()%100 == 99 || rpush.Val() > 350
numSlices = rpush.Val()
}
return err
}, m.inodeKey(inode))
if err == nil {
if needCompact {
go m.compactChunk(inode, indx, false)
}
m.updateParentStat(ctx, inode, attr.Parent, newLength, newSpace)
if numSlices%100 == 99 || numSlices > 350 {
if numSlices < int64(maxSlices) {
go m.compactChunk(inode, indx, false, false)
} else {
m.compactChunk(inode, indx, true, false)
}
}
}
return errno(err)
}
Expand Down Expand Up @@ -2996,11 +3000,11 @@ func (r *redisMeta) doCleanupDelayedSlices(edge int64) (int, error) {
return count, err
}

func (m *redisMeta) compactChunk(inode Ino, indx uint32, force bool) {
func (m *redisMeta) compactChunk(inode Ino, indx uint32, once, force bool) {
// avoid too many or duplicated compaction
k := uint64(inode) + (uint64(indx) << 32)
k := uint64(inode) + (uint64(indx) << 40)
m.Lock()
if force {
if once || force {
for m.compacting[k] {
m.Unlock()
time.Sleep(time.Millisecond * 10)
Expand All @@ -3009,17 +3013,19 @@ func (m *redisMeta) compactChunk(inode Ino, indx uint32, force bool) {
} else if len(m.compacting) > 10 || m.compacting[k] {
m.Unlock()
return
} else {
m.compacting[k] = true
defer func() {
m.Lock()
delete(m.compacting, k)
m.Unlock()
}()
}
m.compacting[k] = true
defer func() {
m.Lock()
delete(m.compacting, k)
m.Unlock()
}()
m.Unlock()

var ctx = Background
if once && m.rdb.LLen(ctx, m.chunkKey(inode, indx)).Val() < int64(maxSlices) {
return
}
vals, err := m.rdb.LRange(ctx, m.chunkKey(inode, indx), 0, int64(maxCompactSlices)).Result()
if err != nil {
return
Expand Down Expand Up @@ -3134,7 +3140,10 @@ func (m *redisMeta) compactChunk(inode Ino, indx uint32, force bool) {
}

if force {
m.compactChunk(inode, indx, force)
m.Lock()
delete(m.compacting, k)
m.Unlock()
m.compactChunk(inode, indx, once, force)
}
}

Expand Down
44 changes: 26 additions & 18 deletions pkg/meta/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -2408,7 +2408,7 @@ func (m *dbMeta) Read(ctx Context, inode Ino, indx uint32, slices *[]Slice) (rer
*slices = buildSlice(ss)
m.of.CacheChunk(inode, indx, *slices)
if !m.conf.ReadOnly && (len(c.Slices)/sliceBytes >= 5 || len(*slices) >= 5) {
go m.compactChunk(inode, indx, false)
go m.compactChunk(inode, indx, false, false)
}
return 0
}
Expand All @@ -2422,7 +2422,7 @@ func (m *dbMeta) Write(ctx Context, inode Ino, indx uint32, off uint32, slice Sl
}
defer func() { m.of.InvalidateChunk(inode, indx) }()
var newLength, newSpace int64
var needCompact bool
var numSlices int
var nodeAttr node
err := m.txn(func(s *xorm.Session) error {
newLength, newSpace = 0, 0
Expand Down Expand Up @@ -2464,16 +2464,19 @@ func (m *dbMeta) Write(ctx Context, inode Ino, indx uint32, off uint32, slice Sl
if err == nil && !insert {
ck := chunk{Inode: inode, Indx: indx}
_, _ = s.MustCols("indx").Get(&ck)
ns := len(ck.Slices) / sliceBytes // number of slices
needCompact = ns%100 == 99 || ns > 350
numSlices = len(ck.Slices) / sliceBytes
}
return err
}, inode)
if err == nil {
if needCompact {
go m.compactChunk(inode, indx, false)
}
m.updateParentStat(ctx, inode, nodeAttr.Parent, newLength, newSpace)
if numSlices%100 == 99 || numSlices > 350 {
if numSlices < maxSlices {
go m.compactChunk(inode, indx, false, false)
} else {
m.compactChunk(inode, indx, true, false)
}
}
}
return errno(err)
}
Expand Down Expand Up @@ -2919,11 +2922,11 @@ func (m *dbMeta) doCleanupDelayedSlices(edge int64) (int, error) {
return count, nil
}

func (m *dbMeta) compactChunk(inode Ino, indx uint32, force bool) {
func (m *dbMeta) compactChunk(inode Ino, indx uint32, once, force bool) {
// avoid too many or duplicated compaction
k := uint64(inode) + (uint64(indx) << 32)
k := uint64(inode) + (uint64(indx) << 40)
m.Lock()
if force {
if once || force {
for m.compacting[k] {
m.Unlock()
time.Sleep(time.Millisecond * 10)
Expand All @@ -2932,14 +2935,13 @@ func (m *dbMeta) compactChunk(inode Ino, indx uint32, force bool) {
} else if len(m.compacting) > 10 || m.compacting[k] {
m.Unlock()
return
} else {
m.compacting[k] = true
defer func() {
m.Lock()
delete(m.compacting, k)
m.Unlock()
}()
}
m.compacting[k] = true
defer func() {
m.Lock()
delete(m.compacting, k)
m.Unlock()
}()
m.Unlock()

var c = chunk{Inode: inode, Indx: indx}
Expand All @@ -2950,6 +2952,9 @@ func (m *dbMeta) compactChunk(inode Ino, indx uint32, force bool) {
if err != nil {
return
}
if once && len(c.Slices) < sliceBytes*maxSlices {
return
}
if len(c.Slices) > sliceBytes*maxCompactSlices {
c.Slices = c.Slices[:sliceBytes*maxCompactSlices]
}
Expand Down Expand Up @@ -3078,7 +3083,10 @@ func (m *dbMeta) compactChunk(inode Ino, indx uint32, force bool) {
}

if force {
m.compactChunk(inode, indx, force)
m.Lock()
delete(m.compacting, k)
m.Unlock()
m.compactChunk(inode, indx, once, force)
}
}

Expand Down
Loading

0 comments on commit 11d436e

Please sign in to comment.