diff --git a/.github/scripts/prepare_db.sh b/.github/scripts/prepare_db.sh index 26771faa8b21..58c4b57c505d 100755 --- a/.github/scripts/prepare_db.sh +++ b/.github/scripts/prepare_db.sh @@ -41,7 +41,7 @@ install_etcd(){ quay.io/coreos/etcd:v3.5.7 \ /usr/local/bin/etcd --data-dir=/etcd-data --name node1 \ --listen-client-urls http://0.0.0.0:2379 \ - --advertise-client-urls http://0.0.0.0:2379 \ + --advertise-client-urls http://0.0.0.0:3379 \ --listen-peer-urls http://0.0.0.0:2380 \ --initial-advertise-peer-urls http://0.0.0.0:2380 \ --initial-cluster node1=http://0.0.0.0:2380 diff --git a/pkg/meta/base.go b/pkg/meta/base.go index 53f641f934ae..8fec829a92c5 100644 --- a/pkg/meta/base.go +++ b/pkg/meta/base.go @@ -47,6 +47,7 @@ const ( ) var maxCompactSlices = 1000 +var maxSlices = 2500 type engine interface { // Get the value of counter name. @@ -67,7 +68,7 @@ type engine interface { doInit(format *Format, force bool) error scanAllChunks(ctx Context, ch chan<- cchunk, bar *utils.Bar) error - compactChunk(inode Ino, indx uint32, force bool) + compactChunk(inode Ino, indx uint32, once, force bool) doDeleteSustainedInode(sid uint64, inode Ino) error doFindDeletedFiles(ts int64, limit int) (map[Ino]uint64, error) // limit < 0 means all doDeleteFileData(inode Ino, length uint64) @@ -2255,7 +2256,7 @@ func (m *baseMeta) CompactAll(ctx Context, threads int, bar *utils.Bar) syscall. go func() { for c := range ch { logger.Debugf("Compacting chunk %d:%d (%d slices)", c.inode, c.indx, c.slices) - m.en.compactChunk(c.inode, c.indx, true) + m.en.compactChunk(c.inode, c.indx, false, true) bar.Increment() } wg.Done() @@ -2287,7 +2288,7 @@ func (m *baseMeta) Compact(ctx Context, inode Ino, concurrency int, preFunc, pos go func() { defer wg.Done() for c := range chunkChan { - m.en.compactChunk(c.inode, c.indx, true) + m.en.compactChunk(c.inode, c.indx, false, true) postFunc() } }() diff --git a/pkg/meta/base_test.go b/pkg/meta/base_test.go index 4f2ce8935710..dd69c02be310 100644 --- a/pkg/meta/base_test.go +++ b/pkg/meta/base_test.go @@ -1369,7 +1369,7 @@ func testCaseIncensi(t *testing.T, m Meta) { } type compactor interface { - compactChunk(inode Ino, indx uint32, force bool) + compactChunk(inode Ino, indx uint32, once, force bool) } func testCompaction(t *testing.T, m Meta, trash bool) { @@ -1422,7 +1422,7 @@ func testCompaction(t *testing.T, m Meta, trash bool) { t.Fatalf("expect 5 slices, but got %+v", cs1) } if c, ok := m.(compactor); ok { - c.compactChunk(inode, 1, true) + c.compactChunk(inode, 1, false, true) } var cs []Slice _ = m.Read(ctx, inode, 1, &cs) @@ -1441,7 +1441,7 @@ func testCompaction(t *testing.T, m Meta, trash bool) { time.Sleep(time.Millisecond) } if c, ok := m.(compactor); ok { - c.compactChunk(inode, 0, true) + c.compactChunk(inode, 0, false, true) } var slices []Slice if st := m.Read(ctx, inode, 0, &slices); st != 0 { @@ -1494,7 +1494,7 @@ func testCompaction(t *testing.T, m Meta, trash bool) { t.Fatalf("truncate file: %s", st) } if c, ok := m.(compactor); ok { - c.compactChunk(inode, 0, true) + c.compactChunk(inode, 0, false, true) } if st := m.Read(ctx, inode, 0, &slices); st != 0 { t.Fatalf("read 0: %s", st) @@ -1509,7 +1509,7 @@ func testCompaction(t *testing.T, m Meta, trash bool) { m.NewSlice(ctx, &sliceId) _ = m.Write(ctx, inode, 0, uint32(1<<20), Slice{Id: sliceId, Size: 2 << 20, Len: 2 << 20}, time.Now()) if c, ok := m.(compactor); ok { - c.compactChunk(inode, 0, true) + c.compactChunk(inode, 0, false, true) } if st := m.Read(ctx, inode, 0, &slices); st != 0 { t.Fatalf("read 0: %s", st) @@ -1527,7 +1527,7 @@ func testCompaction(t *testing.T, m Meta, trash bool) { _ = m.Write(ctx, inode, 0, uint32(128<<10), Slice{Id: sliceId, Size: 2 << 20, Len: 128 << 10}, time.Now()) _ = m.Write(ctx, inode, 0, uint32(0), Slice{Id: 0, Size: 1 << 20, Len: 1 << 20}, time.Now()) if c, ok := m.(compactor); ok { - c.compactChunk(inode, 0, true) + c.compactChunk(inode, 0, false, true) } if st := m.Read(ctx, inode, 0, &slices); st != 0 { t.Fatalf("read 0: %s", st) @@ -1576,6 +1576,28 @@ func testConcurrentWrite(t *testing.T, m Meta) { if errno != 0 { t.Fatal() } + + var g2 sync.WaitGroup + for i := 0; i <= 10; i++ { + g2.Add(1) + go func() { + defer g2.Done() + for j := 0; j < 1000; j++ { + var sliceId uint64 + m.NewSlice(ctx, &sliceId) + var slice = Slice{Id: sliceId, Size: 100, Len: 100} + st := m.Write(ctx, inode, 0, uint32(200*j), slice, time.Now()) + if st != 0 { + errno = st + break + } + } + }() + } + g2.Wait() + if errno != 0 { + t.Fatal() + } } func testTruncateAndDelete(t *testing.T, m Meta) { diff --git a/pkg/meta/redis.go b/pkg/meta/redis.go index f855780b5d5a..d61885ea3a08 100644 --- a/pkg/meta/redis.go +++ b/pkg/meta/redis.go @@ -2339,7 +2339,7 @@ func (m *redisMeta) Read(ctx Context, inode Ino, indx uint32, slices *[]Slice) ( *slices = buildSlice(ss) m.of.CacheChunk(inode, indx, *slices) if !m.conf.ReadOnly && (len(vals) >= 5 || len(*slices) >= 5) { - go m.compactChunk(inode, indx, false) + go m.compactChunk(inode, indx, false, false) } return 0 } @@ -2354,7 +2354,7 @@ func (m *redisMeta) Write(ctx Context, inode Ino, indx uint32, off uint32, slice defer func() { m.of.InvalidateChunk(inode, indx) }() var newLength, newSpace int64 var attr Attr - var needCompact bool + var numSlices int64 err := m.txn(ctx, func(tx *redis.Tx) error { newLength, newSpace = 0, 0 attr = Attr{} @@ -2393,15 +2393,19 @@ func (m *redisMeta) Write(ctx Context, inode Ino, indx uint32, off uint32, slice return nil }) if err == nil { - needCompact = rpush.Val()%100 == 99 || rpush.Val() > 350 + numSlices = rpush.Val() } return err }, m.inodeKey(inode)) if err == nil { - if needCompact { - go m.compactChunk(inode, indx, false) - } m.updateParentStat(ctx, inode, attr.Parent, newLength, newSpace) + if numSlices%100 == 99 || numSlices > 350 { + if numSlices < int64(maxSlices) { + go m.compactChunk(inode, indx, false, false) + } else { + m.compactChunk(inode, indx, true, false) + } + } } return errno(err) } @@ -2996,11 +3000,11 @@ func (r *redisMeta) doCleanupDelayedSlices(edge int64) (int, error) { return count, err } -func (m *redisMeta) compactChunk(inode Ino, indx uint32, force bool) { +func (m *redisMeta) compactChunk(inode Ino, indx uint32, once, force bool) { // avoid too many or duplicated compaction - k := uint64(inode) + (uint64(indx) << 32) + k := uint64(inode) + (uint64(indx) << 40) m.Lock() - if force { + if once || force { for m.compacting[k] { m.Unlock() time.Sleep(time.Millisecond * 10) @@ -3009,17 +3013,19 @@ func (m *redisMeta) compactChunk(inode Ino, indx uint32, force bool) { } else if len(m.compacting) > 10 || m.compacting[k] { m.Unlock() return - } else { - m.compacting[k] = true - defer func() { - m.Lock() - delete(m.compacting, k) - m.Unlock() - }() } + m.compacting[k] = true + defer func() { + m.Lock() + delete(m.compacting, k) + m.Unlock() + }() m.Unlock() var ctx = Background + if once && m.rdb.LLen(ctx, m.chunkKey(inode, indx)).Val() < int64(maxSlices) { + return + } vals, err := m.rdb.LRange(ctx, m.chunkKey(inode, indx), 0, int64(maxCompactSlices)).Result() if err != nil { return @@ -3134,7 +3140,10 @@ func (m *redisMeta) compactChunk(inode Ino, indx uint32, force bool) { } if force { - m.compactChunk(inode, indx, force) + m.Lock() + delete(m.compacting, k) + m.Unlock() + m.compactChunk(inode, indx, once, force) } } diff --git a/pkg/meta/sql.go b/pkg/meta/sql.go index 064d48559799..61846e59ad7b 100644 --- a/pkg/meta/sql.go +++ b/pkg/meta/sql.go @@ -2408,7 +2408,7 @@ func (m *dbMeta) Read(ctx Context, inode Ino, indx uint32, slices *[]Slice) (rer *slices = buildSlice(ss) m.of.CacheChunk(inode, indx, *slices) if !m.conf.ReadOnly && (len(c.Slices)/sliceBytes >= 5 || len(*slices) >= 5) { - go m.compactChunk(inode, indx, false) + go m.compactChunk(inode, indx, false, false) } return 0 } @@ -2422,7 +2422,7 @@ func (m *dbMeta) Write(ctx Context, inode Ino, indx uint32, off uint32, slice Sl } defer func() { m.of.InvalidateChunk(inode, indx) }() var newLength, newSpace int64 - var needCompact bool + var numSlices int var nodeAttr node err := m.txn(func(s *xorm.Session) error { newLength, newSpace = 0, 0 @@ -2464,16 +2464,19 @@ func (m *dbMeta) Write(ctx Context, inode Ino, indx uint32, off uint32, slice Sl if err == nil && !insert { ck := chunk{Inode: inode, Indx: indx} _, _ = s.MustCols("indx").Get(&ck) - ns := len(ck.Slices) / sliceBytes // number of slices - needCompact = ns%100 == 99 || ns > 350 + numSlices = len(ck.Slices) / sliceBytes } return err }, inode) if err == nil { - if needCompact { - go m.compactChunk(inode, indx, false) - } m.updateParentStat(ctx, inode, nodeAttr.Parent, newLength, newSpace) + if numSlices%100 == 99 || numSlices > 350 { + if numSlices < maxSlices { + go m.compactChunk(inode, indx, false, false) + } else { + m.compactChunk(inode, indx, true, false) + } + } } return errno(err) } @@ -2919,11 +2922,11 @@ func (m *dbMeta) doCleanupDelayedSlices(edge int64) (int, error) { return count, nil } -func (m *dbMeta) compactChunk(inode Ino, indx uint32, force bool) { +func (m *dbMeta) compactChunk(inode Ino, indx uint32, once, force bool) { // avoid too many or duplicated compaction - k := uint64(inode) + (uint64(indx) << 32) + k := uint64(inode) + (uint64(indx) << 40) m.Lock() - if force { + if once || force { for m.compacting[k] { m.Unlock() time.Sleep(time.Millisecond * 10) @@ -2932,14 +2935,13 @@ func (m *dbMeta) compactChunk(inode Ino, indx uint32, force bool) { } else if len(m.compacting) > 10 || m.compacting[k] { m.Unlock() return - } else { - m.compacting[k] = true - defer func() { - m.Lock() - delete(m.compacting, k) - m.Unlock() - }() } + m.compacting[k] = true + defer func() { + m.Lock() + delete(m.compacting, k) + m.Unlock() + }() m.Unlock() var c = chunk{Inode: inode, Indx: indx} @@ -2950,6 +2952,9 @@ func (m *dbMeta) compactChunk(inode Ino, indx uint32, force bool) { if err != nil { return } + if once && len(c.Slices) < sliceBytes*maxSlices { + return + } if len(c.Slices) > sliceBytes*maxCompactSlices { c.Slices = c.Slices[:sliceBytes*maxCompactSlices] } @@ -3078,7 +3083,10 @@ func (m *dbMeta) compactChunk(inode Ino, indx uint32, force bool) { } if force { - m.compactChunk(inode, indx, force) + m.Lock() + delete(m.compacting, k) + m.Unlock() + m.compactChunk(inode, indx, once, force) } } diff --git a/pkg/meta/tkv.go b/pkg/meta/tkv.go index 222e7a55ae0c..f6a18aa576bb 100644 --- a/pkg/meta/tkv.go +++ b/pkg/meta/tkv.go @@ -2032,7 +2032,7 @@ func (m *kvMeta) Read(ctx Context, inode Ino, indx uint32, slices *[]Slice) (rer *slices = buildSlice(ss) m.of.CacheChunk(inode, indx, *slices) if !m.conf.ReadOnly && (len(val)/sliceBytes >= 5 || len(*slices) >= 5) { - go m.compactChunk(inode, indx, false) + go m.compactChunk(inode, indx, false, false) } return 0 } @@ -2046,7 +2046,7 @@ func (m *kvMeta) Write(ctx Context, inode Ino, indx uint32, off uint32, slice Sl } defer func() { m.of.InvalidateChunk(inode, indx) }() var newLength, newSpace int64 - var needCompact bool + var numSlices int var attr Attr err := m.txn(func(tx *kvTxn) error { newLength, newSpace = 0, 0 @@ -2087,15 +2087,18 @@ func (m *kvMeta) Write(ctx Context, inode Ino, indx uint32, off uint32, slice Sl val = append(rs[1], val...) tx.set(m.inodeKey(inode), m.marshal(&attr)) tx.set(m.chunkKey(inode, indx), val) - ns := len(val) / sliceBytes // number of slices - needCompact = ns%100 == 99 || ns > 350 + numSlices = len(val) / sliceBytes return nil }, inode) if err == nil { - if needCompact { - go m.compactChunk(inode, indx, false) - } m.updateParentStat(ctx, inode, attr.Parent, newLength, newSpace) + if numSlices%100 == 99 || numSlices > 350 { + if numSlices < maxSlices { + go m.compactChunk(inode, indx, false, false) + } else { + m.compactChunk(inode, indx, true, false) + } + } } return errno(err) } @@ -2489,11 +2492,11 @@ func (m *kvMeta) doCleanupDelayedSlices(edge int64) (int, error) { return count, nil } -func (m *kvMeta) compactChunk(inode Ino, indx uint32, force bool) { +func (m *kvMeta) compactChunk(inode Ino, indx uint32, once, force bool) { // avoid too many or duplicated compaction - k := uint64(inode) + (uint64(indx) << 32) + k := uint64(inode) + (uint64(indx) << 40) m.Lock() - if force { + if once || force { for m.compacting[k] { m.Unlock() time.Sleep(time.Millisecond * 10) @@ -2502,20 +2505,22 @@ func (m *kvMeta) compactChunk(inode Ino, indx uint32, force bool) { } else if len(m.compacting) > 10 || m.compacting[k] { m.Unlock() return - } else { - m.compacting[k] = true - defer func() { - m.Lock() - delete(m.compacting, k) - m.Unlock() - }() } + m.compacting[k] = true + defer func() { + m.Lock() + delete(m.compacting, k) + m.Unlock() + }() m.Unlock() buf, err := m.get(m.chunkKey(inode, indx)) if err != nil { return } + if once && len(buf) < sliceBytes*maxSlices { + return + } if len(buf) > sliceBytes*maxCompactSlices { buf = buf[:sliceBytes*maxCompactSlices] } @@ -2621,7 +2626,10 @@ func (m *kvMeta) compactChunk(inode Ino, indx uint32, force bool) { } if force { - m.compactChunk(inode, indx, force) + m.Lock() + delete(m.compacting, k) + m.Unlock() + m.compactChunk(inode, indx, once, force) } }