Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

meta/sql: improve MySQL/PostgreSQL meta concurrency #5460

Merged
merged 10 commits into from
Jan 2, 2025
178 changes: 138 additions & 40 deletions pkg/meta/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,11 @@ func mustInsert(s *xorm.Session, beans ...interface{}) error {
var errBusy error

func (m *dbMeta) shouldRetry(err error) bool {
if m.Name() == "mysql" && err == syscall.EBUSY {
// Retry transaction when parent node update return 0 rows in MySQL
return true
}

msg := strings.ToLower(err.Error())
if strings.Contains(msg, "too many connections") || strings.Contains(msg, "too many clients") {
logger.Warnf("transaction failed: %s, will retry it. please increase the max number of connections in your database, or use a connection pool.", msg)
Expand All @@ -807,7 +812,8 @@ func (m *dbMeta) shouldRetry(err error) bool {
// MySQL, MariaDB or TiDB
// error 1020 for MariaDB when conflict
return strings.Contains(msg, "try restarting transaction") || strings.Contains(msg, "try again later") ||
strings.Contains(msg, "duplicate entry") || strings.Contains(msg, "error 1020 (hy000)")
strings.Contains(msg, "duplicate entry") || strings.Contains(msg, "error 1020 (hy000)") ||
strings.Contains(msg, "invalid connection") || strings.Contains(msg, "bad connection") || errors.Is(err, io.EOF) // could not send data to client: No buffer space available
case "postgres":
return strings.Contains(msg, "current transaction is aborted") || strings.Contains(msg, "deadlock detected") ||
strings.Contains(msg, "duplicate key value") || strings.Contains(msg, "could not serialize access") ||
Expand Down Expand Up @@ -1383,7 +1389,7 @@ func (m *dbMeta) doReadlink(ctx Context, inode Ino, noatime bool) (atime int64,
func (m *dbMeta) doMknod(ctx Context, parent Ino, name string, _type uint8, mode, cumask uint16, path string, inode *Ino, attr *Attr) syscall.Errno {
return errno(m.txn(func(s *xorm.Session) error {
var pn = node{Inode: parent}
ok, err := s.ForUpdate().Get(&pn)
ok, err := s.Get(&pn)
if err != nil {
return err
}
Expand Down Expand Up @@ -1473,15 +1479,19 @@ func (m *dbMeta) doMknod(ctx Context, parent Ino, name string, _type uint8, mode
}

var updateParent bool
var nlinkAdjust int32
now := time.Now().UnixNano()
if parent != TrashInode {
if _type == TypeDirectory {
pn.Nlink++
updateParent = true
nlinkAdjust++
}
if updateParent || time.Duration(now-pn.Mtime*1e3-int64(pn.Mtimensec)) >= m.conf.SkipDirMtime {
pn.Mtime = now / 1e3
pn.Ctime = now / 1e3
pn.Mtimensec = int16(now % 1e3)
pn.Ctimensec = int16(now % 1e3)
updateParent = true
}
}
Expand Down Expand Up @@ -1513,11 +1523,6 @@ func (m *dbMeta) doMknod(ctx Context, parent Ino, name string, _type uint8, mode
if err = mustInsert(s, &edge{Parent: parent, Name: []byte(name), Inode: *inode, Type: _type}, &n); err != nil {
return err
}
if updateParent {
if _, err := s.Cols("nlink", "mtime", "ctime", "mtimensec", "ctimensec").Update(&pn, &node{Inode: pn.Inode}); err != nil {
return err
}
}
if _type == TypeSymlink {
if err = mustInsert(s, &symlink{Inode: *inode, Target: []byte(path)}); err != nil {
return err
Expand All @@ -1528,9 +1533,24 @@ func (m *dbMeta) doMknod(ctx Context, parent Ino, name string, _type uint8, mode
return err
}
}
if updateParent {
if _n, err := s.SetExpr("nlink", fmt.Sprintf("nlink + (%d)", nlinkAdjust)).Cols("nlink", "mtime", "ctime", "mtimensec", "ctimensec").Update(&pn, &node{Inode: pn.Inode}); err != nil || _n == 0 {
if err == nil {
logger.Infof("Update parent node affected rows = %d should be 1 for inode = %d .", _n, pn.Inode)
if m.Name() == "mysql" {
err = syscall.EBUSY
} else {
err = syscall.ENOENT
}
}
if err != nil {
return err
}
}
}
m.parseAttr(&n, attr)
return nil
}, parent))
}))
}

func (m *dbMeta) doUnlink(ctx Context, parent Ino, name string, attr *Attr, skipCheckTrash ...bool) syscall.Errno {
Expand All @@ -1547,7 +1567,7 @@ func (m *dbMeta) doUnlink(ctx Context, parent Ino, name string, attr *Attr, skip
opened = false
newSpace, newInode = 0, 0
var pn = node{Inode: parent}
ok, err := s.ForUpdate().Get(&pn)
ok, err := s.Get(&pn)
if err != nil {
return err
}
Expand Down Expand Up @@ -1631,11 +1651,7 @@ func (m *dbMeta) doUnlink(ctx Context, parent Ino, name string, attr *Attr, skip
if _, err := s.Delete(&edge{Parent: parent, Name: e.Name}); err != nil {
return err
}
if updateParent {
if _, err = s.Cols("mtime", "ctime", "mtimensec", "ctimensec").Update(&pn, &node{Inode: pn.Inode}); err != nil {
return err
}
}

if n.Nlink > 0 {
if _, err := s.Cols("nlink", "ctime", "ctimensec", "parent").Update(&n, &node{Inode: e.Inode}); err != nil {
return err
Expand Down Expand Up @@ -1679,8 +1695,24 @@ func (m *dbMeta) doUnlink(ctx Context, parent Ino, name string, attr *Attr, skip
return err
}
}
if updateParent {
var _n int64
if _n, err = s.Cols("mtime", "ctime", "mtimensec", "ctimensec").Update(&pn, &node{Inode: pn.Inode}); err != nil || _n == 0 {
if err == nil {
logger.Infof("Update parent node affected rows = %d should be 1 for inode = %d .", _n, pn.Inode)
if m.Name() == "mysql" {
err = syscall.EBUSY
} else {
err = syscall.ENOENT
}
}
if err != nil {
return err
}
}
}
return err
}, parent)
})
if err == nil && trash == 0 {
if n.Type == TypeFile && n.Nlink == 0 {
m.fileDeleted(opened, isTrash(parent), n.Inode, n.Length)
Expand All @@ -1702,7 +1734,7 @@ func (m *dbMeta) doRmdir(ctx Context, parent Ino, name string, pinode *Ino, skip
}
err := m.txn(func(s *xorm.Session) error {
var pn = node{Inode: parent}
ok, err := s.ForUpdate().Get(&pn)
ok, err := s.Get(&pn)
if err != nil {
return err
}
Expand Down Expand Up @@ -1801,10 +1833,10 @@ func (m *dbMeta) doRmdir(ctx Context, parent Ino, name string, pinode *Ino, skip
}
}
if !isTrash(parent) {
_, err = s.Cols("nlink", "mtime", "ctime", "mtimensec", "ctimensec").Update(&pn, &node{Inode: pn.Inode})
_, err = s.SetExpr("nlink", "nlink - 1").Cols("nlink", "mtime", "ctime", "mtimensec", "ctimensec").Update(&pn, &node{Inode: pn.Inode})
}
return err
}, parent)
})
if err == nil && trash == 0 {
m.updateStats(-align4K(0), -1)
}
Expand All @@ -1826,6 +1858,19 @@ func (m *dbMeta) getNodesForUpdate(s *xorm.Session, nodes ...*node) error {
return nil
}

func (m *dbMeta) getNodes(s *xorm.Session, nodes ...*node) error {
for i := range nodes {
ok, err := s.Get(nodes[i])
if err != nil {
return err
}
if !ok {
return syscall.ENOENT
}
}
return nil
}

func (m *dbMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentDst Ino, nameDst string, flags uint32, inode, tInode *Ino, attr, tAttr *Attr) syscall.Errno {
var trash Ino
if st := m.checkTrash(parentDst, &trash); st != 0 {
Expand All @@ -1836,17 +1881,13 @@ func (m *dbMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentDst
var dino Ino
var dn node
var newSpace, newInode int64
lockParent := parentSrc
if isTrash(lockParent) {
lockParent = parentDst
}
err := m.txn(func(s *xorm.Session) error {
opened = false
dino = 0
newSpace, newInode = 0, 0
var spn = node{Inode: parentSrc}
var dpn = node{Inode: parentDst}
err := m.getNodesForUpdate(s, &spn, &dpn)
err := m.getNodes(s, &spn, &dpn)
if err != nil {
return err
}
Expand Down Expand Up @@ -1929,6 +1970,7 @@ func (m *dbMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentDst
}
}
var supdate, dupdate bool
var srcnlink, dstnlink int32
now := time.Now().UnixNano()
dn = node{Inode: de.Inode}
if ok {
Expand All @@ -1954,7 +1996,9 @@ func (m *dbMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentDst
if de.Type == TypeDirectory {
dn.Parent = parentSrc
dpn.Nlink--
dstnlink--
spn.Nlink++
srcnlink++
supdate, dupdate = true, true
} else if dn.Parent > 0 {
dn.Parent = parentSrc
Expand All @@ -1970,6 +2014,7 @@ func (m *dbMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentDst
return syscall.ENOTEMPTY
}
dpn.Nlink--
dstnlink--
dupdate = true
if trash > 0 {
dn.Parent = trash
Expand Down Expand Up @@ -2002,7 +2047,9 @@ func (m *dbMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentDst
if se.Type == TypeDirectory {
sn.Parent = parentDst
spn.Nlink--
srcnlink--
dpn.Nlink++
dstnlink++
supdate, dupdate = true, true
} else if sn.Parent > 0 {
sn.Parent = parentDst
Expand Down Expand Up @@ -2108,21 +2155,61 @@ func (m *dbMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentDst
return err
}
}
if parentDst != parentSrc && !isTrash(parentSrc) && supdate {
if _, err := s.Cols("nlink", "mtime", "ctime", "mtimensec", "ctimensec").Update(&spn, &node{Inode: parentSrc}); err != nil {
return err
}
}

if _, err := s.Cols("ctime", "ctimensec", "parent").Update(&sn, &node{Inode: sn.Inode}); err != nil {
return err
}

if parentDst != parentSrc && !isTrash(parentSrc) && supdate {
if dupdate && dpn.Inode < spn.Inode {
if _n, err := s.SetExpr("nlink", fmt.Sprintf("nlink + (%d)", dstnlink)).Cols("nlink", "mtime", "ctime", "mtimensec", "ctimensec").Update(&dpn, &node{Inode: parentDst}); err != nil || _n == 0 {
if err == nil {
logger.Infof("Update parent node affected rows = %d should be 1 for inode = %d .", _n, dpn.Inode)
if m.Name() == "mysql" {
err = syscall.EBUSY
} else {
err = syscall.ENOENT
}
}
if err != nil {
return err
}
}
dupdate = false
}

if _n, err := s.SetExpr("nlink", fmt.Sprintf("nlink + (%d)", srcnlink)).Cols("nlink", "mtime", "ctime", "mtimensec", "ctimensec").Update(&spn, &node{Inode: parentSrc}); err != nil || _n == 0 {
if err == nil {
logger.Infof("Update parent node affected rows = %d should be 1 for inode = %d .", _n, spn.Inode)
if m.Name() == "mysql" {
err = syscall.EBUSY
} else {
err = syscall.ENOENT
}
}
if err != nil {
return err
}
}
}

if dupdate {
if _, err := s.Cols("nlink", "mtime", "ctime", "mtimensec", "ctimensec").Update(&dpn, &node{Inode: parentDst}); err != nil {
return err
if _n, err := s.SetExpr("nlink", fmt.Sprintf("nlink + (%d)", dstnlink)).Cols("nlink", "mtime", "ctime", "mtimensec", "ctimensec").Update(&dpn, &node{Inode: parentDst}); err != nil || _n == 0 {
if err == nil {
logger.Infof("Update parent node affected rows = %d should be 1 for inode = %d .", _n, dpn.Inode)
if m.Name() == "mysql" {
err = syscall.EBUSY
} else {
err = syscall.ENOENT
}
}
if err != nil {
return err
}
}
}
return err
}, lockParent)
})
if err == nil && !exchange && trash == 0 {
if dino > 0 && dn.Type == TypeFile && dn.Nlink == 0 {
m.fileDeleted(opened, false, dino, dn.Length)
Expand All @@ -2135,7 +2222,7 @@ func (m *dbMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentDst
func (m *dbMeta) doLink(ctx Context, inode, parent Ino, name string, attr *Attr) syscall.Errno {
return errno(m.txn(func(s *xorm.Session) error {
var pn = node{Inode: parent}
ok, err := s.ForUpdate().Get(&pn)
ok, err := s.Get(&pn)
if err != nil {
return err
}
Expand Down Expand Up @@ -2185,6 +2272,8 @@ func (m *dbMeta) doLink(ctx Context, inode, parent Ino, name string, attr *Attr)
if time.Duration(now-pn.Mtime*1e3-int64(pn.Mtimensec)) >= m.conf.SkipDirMtime {
pn.Mtime = now / 1e3
pn.Ctime = now / 1e3
pn.Mtimensec = int16(now % 1e3)
pn.Ctimensec = int16(now % 1e3)
updateParent = true
}
n.Parent = 0
Expand All @@ -2194,19 +2283,28 @@ func (m *dbMeta) doLink(ctx Context, inode, parent Ino, name string, attr *Attr)
if err = mustInsert(s, &edge{Parent: parent, Name: []byte(name), Inode: inode, Type: n.Type}); err != nil {
return err
}
if updateParent {
if _, err := s.Cols("mtime", "ctime", "mtimensec", "ctimensec").Update(&pn, &node{Inode: parent}); err != nil {
return err
}
}
if _, err := s.Cols("nlink", "ctime", "ctimensec", "parent").Update(&n, node{Inode: inode}); err != nil {
return err
}
if err == nil {
m.parseAttr(&n, attr)
if updateParent {
if _n, err := s.Cols("mtime", "ctime", "mtimensec", "ctimensec").Update(&pn, &node{Inode: parent}); err != nil || _n == 0 {
if err == nil {
davies marked this conversation as resolved.
Show resolved Hide resolved
logger.Infof("Update parent node affected rows = %d should be 1 for inode = %d .", _n, pn.Inode)
if m.Name() == "mysql" {
err = syscall.EBUSY
} else {
err = syscall.ENOENT
}
}
if err != nil {
return err
}
}
}

m.parseAttr(&n, attr)
return err
}, parent))
}, inode))
}

func (m *dbMeta) doReaddir(ctx Context, inode Ino, plus uint8, entries *[]*Entry, limit int) syscall.Errno {
Expand Down
Loading