From 86b906b0ffc67940a93a8e0e36fac8d0892b3ace Mon Sep 17 00:00:00 2001 From: xixi Date: Fri, 13 Oct 2023 13:55:24 +0800 Subject: [PATCH] fix restore: count dir stats and quota (#4095) close #4044 ### Changes - restore with session - flush quotas before closing session --------- Signed-off-by: xixi --- cmd/restore.go | 7 ++++++ pkg/meta/base.go | 65 ++++++++++++++++++++++++------------------------ 2 files changed, 40 insertions(+), 32 deletions(-) diff --git a/cmd/restore.go b/cmd/restore.go index b2205d134f47..53b2c0ffad9b 100644 --- a/cmd/restore.go +++ b/cmd/restore.go @@ -58,6 +58,13 @@ func restore(ctx *cli.Context) error { } func doRestore(m meta.Meta, hour string, putBack bool, threads int) { + if err := m.NewSession(false); err != nil { + logger.Warningf("running without sessions because fail to new session: %s", err) + } else { + defer func() { + _ = m.CloseSession() + }() + } logger.Infof("restore files in %s ...", hour) ctx := meta.Background var parent meta.Ino diff --git a/pkg/meta/base.go b/pkg/meta/base.go index 1a5c730903f3..3fb6d73474db 100644 --- a/pkg/meta/base.go +++ b/pkg/meta/base.go @@ -609,6 +609,7 @@ func (m *baseMeta) CloseSession() error { return nil } m.doFlushDirStat() + m.doFlushQuotas() m.sesMu.Lock() m.umounting = true m.sesMu.Unlock() @@ -758,45 +759,45 @@ func (m *baseMeta) updateDirQuota(ctx Context, inode Ino, space, inodes int64) { } func (m *baseMeta) flushQuotas() { - quotas := make(map[Ino]*Quota) - var newSpace, newInodes int64 for { time.Sleep(time.Second * 3) - if !m.getFormat().DirStats { - continue + m.doFlushQuotas() + } +} + +func (m *baseMeta) doFlushQuotas() { + if !m.getFormat().DirStats { + return + } + stageMap := make(map[Ino]*Quota) + m.quotaMu.RLock() + for ino, q := range m.dirQuotas { + newSpace := atomic.LoadInt64(&q.newSpace) + newInodes := atomic.LoadInt64(&q.newInodes) + if newSpace != 0 || newInodes != 0 { + stageMap[ino] = &Quota{newSpace: newSpace, newInodes: newInodes} } + } + m.quotaMu.RUnlock() + if len(stageMap) == 0 { + return + } + + if err := m.en.doFlushQuotas(Background, stageMap); err != nil { + logger.Warnf("Flush quotas: %s", err) + } else { m.quotaMu.RLock() - for ino, q := range m.dirQuotas { - newSpace = atomic.LoadInt64(&q.newSpace) - newInodes = atomic.LoadInt64(&q.newInodes) - if newSpace != 0 || newInodes != 0 { - quotas[ino] = &Quota{newSpace: newSpace, newInodes: newInodes} + for ino, snap := range stageMap { + q := m.dirQuotas[ino] + if q == nil { + continue } + atomic.AddInt64(&q.newSpace, -snap.newSpace) + atomic.AddInt64(&q.UsedSpace, snap.newSpace) + atomic.AddInt64(&q.newInodes, -snap.newInodes) + atomic.AddInt64(&q.UsedInodes, snap.newInodes) } m.quotaMu.RUnlock() - if len(quotas) == 0 { - continue - } - - if err := m.en.doFlushQuotas(Background, quotas); err != nil { - logger.Warnf("Flush quotas: %s", err) - } else { - m.quotaMu.RLock() - for ino, snap := range quotas { - q := m.dirQuotas[ino] - if q == nil { - continue - } - atomic.AddInt64(&q.newSpace, -snap.newSpace) - atomic.AddInt64(&q.UsedSpace, snap.newSpace) - atomic.AddInt64(&q.newInodes, -snap.newInodes) - atomic.AddInt64(&q.UsedInodes, snap.newInodes) - } - m.quotaMu.RUnlock() - } - for ino := range quotas { - delete(quotas, ino) - } } }