diff --git a/pkg/meta/sql.go b/pkg/meta/sql.go index 32322774a38b..c1d43d06fb91 100644 --- a/pkg/meta/sql.go +++ b/pkg/meta/sql.go @@ -795,6 +795,11 @@ func mustInsert(s *xorm.Session, beans ...interface{}) error { var errBusy error func (m *dbMeta) shouldRetry(err error) bool { + if m.Name() == "mysql" && err == syscall.EBUSY { + // Retry transaction when parent node update return 0 rows in MySQL + return true + } + msg := strings.ToLower(err.Error()) if strings.Contains(msg, "too many connections") || strings.Contains(msg, "too many clients") { logger.Warnf("transaction failed: %s, will retry it. please increase the max number of connections in your database, or use a connection pool.", msg) @@ -807,7 +812,8 @@ func (m *dbMeta) shouldRetry(err error) bool { // MySQL, MariaDB or TiDB // error 1020 for MariaDB when conflict return strings.Contains(msg, "try restarting transaction") || strings.Contains(msg, "try again later") || - strings.Contains(msg, "duplicate entry") || strings.Contains(msg, "error 1020 (hy000)") + strings.Contains(msg, "duplicate entry") || strings.Contains(msg, "error 1020 (hy000)") || + strings.Contains(msg, "invalid connection") || strings.Contains(msg, "bad connection") || errors.Is(err, io.EOF) // could not send data to client: No buffer space available case "postgres": return strings.Contains(msg, "current transaction is aborted") || strings.Contains(msg, "deadlock detected") || strings.Contains(msg, "duplicate key value") || strings.Contains(msg, "could not serialize access") || @@ -1383,7 +1389,7 @@ func (m *dbMeta) doReadlink(ctx Context, inode Ino, noatime bool) (atime int64, func (m *dbMeta) doMknod(ctx Context, parent Ino, name string, _type uint8, mode, cumask uint16, path string, inode *Ino, attr *Attr) syscall.Errno { return errno(m.txn(func(s *xorm.Session) error { var pn = node{Inode: parent} - ok, err := s.ForUpdate().Get(&pn) + ok, err := s.Get(&pn) if err != nil { return err } @@ -1473,15 +1479,19 @@ func (m *dbMeta) doMknod(ctx Context, parent Ino, name string, _type uint8, mode } var updateParent bool + var nlinkAdjust int32 now := time.Now().UnixNano() if parent != TrashInode { if _type == TypeDirectory { pn.Nlink++ updateParent = true + nlinkAdjust++ } if updateParent || time.Duration(now-pn.Mtime*1e3-int64(pn.Mtimensec)) >= m.conf.SkipDirMtime { pn.Mtime = now / 1e3 pn.Ctime = now / 1e3 + pn.Mtimensec = int16(now % 1e3) + pn.Ctimensec = int16(now % 1e3) updateParent = true } } @@ -1513,11 +1523,6 @@ func (m *dbMeta) doMknod(ctx Context, parent Ino, name string, _type uint8, mode if err = mustInsert(s, &edge{Parent: parent, Name: []byte(name), Inode: *inode, Type: _type}, &n); err != nil { return err } - if updateParent { - if _, err := s.Cols("nlink", "mtime", "ctime", "mtimensec", "ctimensec").Update(&pn, &node{Inode: pn.Inode}); err != nil { - return err - } - } if _type == TypeSymlink { if err = mustInsert(s, &symlink{Inode: *inode, Target: []byte(path)}); err != nil { return err @@ -1528,9 +1533,24 @@ func (m *dbMeta) doMknod(ctx Context, parent Ino, name string, _type uint8, mode return err } } + if updateParent { + if _n, err := s.SetExpr("nlink", fmt.Sprintf("nlink + (%d)", nlinkAdjust)).Cols("nlink", "mtime", "ctime", "mtimensec", "ctimensec").Update(&pn, &node{Inode: pn.Inode}); err != nil || _n == 0 { + if err == nil { + logger.Infof("Update parent node affected rows = %d should be 1 for inode = %d .", _n, pn.Inode) + if m.Name() == "mysql" { + err = syscall.EBUSY + } else { + err = syscall.ENOENT + } + } + if err != nil { + return err + } + } + } m.parseAttr(&n, attr) return nil - }, parent)) + })) } func (m *dbMeta) doUnlink(ctx Context, parent Ino, name string, attr *Attr, skipCheckTrash ...bool) syscall.Errno { @@ -1547,7 +1567,7 @@ func (m *dbMeta) doUnlink(ctx Context, parent Ino, name string, attr *Attr, skip opened = false newSpace, newInode = 0, 0 var pn = node{Inode: parent} - ok, err := s.ForUpdate().Get(&pn) + ok, err := s.Get(&pn) if err != nil { return err } @@ -1631,11 +1651,7 @@ func (m *dbMeta) doUnlink(ctx Context, parent Ino, name string, attr *Attr, skip if _, err := s.Delete(&edge{Parent: parent, Name: e.Name}); err != nil { return err } - if updateParent { - if _, err = s.Cols("mtime", "ctime", "mtimensec", "ctimensec").Update(&pn, &node{Inode: pn.Inode}); err != nil { - return err - } - } + if n.Nlink > 0 { if _, err := s.Cols("nlink", "ctime", "ctimensec", "parent").Update(&n, &node{Inode: e.Inode}); err != nil { return err @@ -1679,8 +1695,24 @@ func (m *dbMeta) doUnlink(ctx Context, parent Ino, name string, attr *Attr, skip return err } } + if updateParent { + var _n int64 + if _n, err = s.Cols("mtime", "ctime", "mtimensec", "ctimensec").Update(&pn, &node{Inode: pn.Inode}); err != nil || _n == 0 { + if err == nil { + logger.Infof("Update parent node affected rows = %d should be 1 for inode = %d .", _n, pn.Inode) + if m.Name() == "mysql" { + err = syscall.EBUSY + } else { + err = syscall.ENOENT + } + } + if err != nil { + return err + } + } + } return err - }, parent) + }) if err == nil && trash == 0 { if n.Type == TypeFile && n.Nlink == 0 { m.fileDeleted(opened, isTrash(parent), n.Inode, n.Length) @@ -1702,7 +1734,7 @@ func (m *dbMeta) doRmdir(ctx Context, parent Ino, name string, pinode *Ino, skip } err := m.txn(func(s *xorm.Session) error { var pn = node{Inode: parent} - ok, err := s.ForUpdate().Get(&pn) + ok, err := s.Get(&pn) if err != nil { return err } @@ -1801,10 +1833,10 @@ func (m *dbMeta) doRmdir(ctx Context, parent Ino, name string, pinode *Ino, skip } } if !isTrash(parent) { - _, err = s.Cols("nlink", "mtime", "ctime", "mtimensec", "ctimensec").Update(&pn, &node{Inode: pn.Inode}) + _, err = s.SetExpr("nlink", "nlink - 1").Cols("nlink", "mtime", "ctime", "mtimensec", "ctimensec").Update(&pn, &node{Inode: pn.Inode}) } return err - }, parent) + }) if err == nil && trash == 0 { m.updateStats(-align4K(0), -1) } @@ -1826,6 +1858,19 @@ func (m *dbMeta) getNodesForUpdate(s *xorm.Session, nodes ...*node) error { return nil } +func (m *dbMeta) getNodes(s *xorm.Session, nodes ...*node) error { + for i := range nodes { + ok, err := s.Get(nodes[i]) + if err != nil { + return err + } + if !ok { + return syscall.ENOENT + } + } + return nil +} + func (m *dbMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentDst Ino, nameDst string, flags uint32, inode, tInode *Ino, attr, tAttr *Attr) syscall.Errno { var trash Ino if st := m.checkTrash(parentDst, &trash); st != 0 { @@ -1836,17 +1881,13 @@ func (m *dbMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentDst var dino Ino var dn node var newSpace, newInode int64 - lockParent := parentSrc - if isTrash(lockParent) { - lockParent = parentDst - } err := m.txn(func(s *xorm.Session) error { opened = false dino = 0 newSpace, newInode = 0, 0 var spn = node{Inode: parentSrc} var dpn = node{Inode: parentDst} - err := m.getNodesForUpdate(s, &spn, &dpn) + err := m.getNodes(s, &spn, &dpn) if err != nil { return err } @@ -1929,6 +1970,7 @@ func (m *dbMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentDst } } var supdate, dupdate bool + var srcnlink, dstnlink int32 now := time.Now().UnixNano() dn = node{Inode: de.Inode} if ok { @@ -1954,7 +1996,9 @@ func (m *dbMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentDst if de.Type == TypeDirectory { dn.Parent = parentSrc dpn.Nlink-- + dstnlink-- spn.Nlink++ + srcnlink++ supdate, dupdate = true, true } else if dn.Parent > 0 { dn.Parent = parentSrc @@ -1970,6 +2014,7 @@ func (m *dbMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentDst return syscall.ENOTEMPTY } dpn.Nlink-- + dstnlink-- dupdate = true if trash > 0 { dn.Parent = trash @@ -2002,7 +2047,9 @@ func (m *dbMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentDst if se.Type == TypeDirectory { sn.Parent = parentDst spn.Nlink-- + srcnlink-- dpn.Nlink++ + dstnlink++ supdate, dupdate = true, true } else if sn.Parent > 0 { sn.Parent = parentDst @@ -2108,21 +2155,61 @@ func (m *dbMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentDst return err } } - if parentDst != parentSrc && !isTrash(parentSrc) && supdate { - if _, err := s.Cols("nlink", "mtime", "ctime", "mtimensec", "ctimensec").Update(&spn, &node{Inode: parentSrc}); err != nil { - return err - } - } + if _, err := s.Cols("ctime", "ctimensec", "parent").Update(&sn, &node{Inode: sn.Inode}); err != nil { return err } + + if parentDst != parentSrc && !isTrash(parentSrc) && supdate { + if dupdate && dpn.Inode < spn.Inode { + if _n, err := s.SetExpr("nlink", fmt.Sprintf("nlink + (%d)", dstnlink)).Cols("nlink", "mtime", "ctime", "mtimensec", "ctimensec").Update(&dpn, &node{Inode: parentDst}); err != nil || _n == 0 { + if err == nil { + logger.Infof("Update parent node affected rows = %d should be 1 for inode = %d .", _n, dpn.Inode) + if m.Name() == "mysql" { + err = syscall.EBUSY + } else { + err = syscall.ENOENT + } + } + if err != nil { + return err + } + } + dupdate = false + } + + if _n, err := s.SetExpr("nlink", fmt.Sprintf("nlink + (%d)", srcnlink)).Cols("nlink", "mtime", "ctime", "mtimensec", "ctimensec").Update(&spn, &node{Inode: parentSrc}); err != nil || _n == 0 { + if err == nil { + logger.Infof("Update parent node affected rows = %d should be 1 for inode = %d .", _n, spn.Inode) + if m.Name() == "mysql" { + err = syscall.EBUSY + } else { + err = syscall.ENOENT + } + } + if err != nil { + return err + } + } + } + if dupdate { - if _, err := s.Cols("nlink", "mtime", "ctime", "mtimensec", "ctimensec").Update(&dpn, &node{Inode: parentDst}); err != nil { - return err + if _n, err := s.SetExpr("nlink", fmt.Sprintf("nlink + (%d)", dstnlink)).Cols("nlink", "mtime", "ctime", "mtimensec", "ctimensec").Update(&dpn, &node{Inode: parentDst}); err != nil || _n == 0 { + if err == nil { + logger.Infof("Update parent node affected rows = %d should be 1 for inode = %d .", _n, dpn.Inode) + if m.Name() == "mysql" { + err = syscall.EBUSY + } else { + err = syscall.ENOENT + } + } + if err != nil { + return err + } } } return err - }, lockParent) + }) if err == nil && !exchange && trash == 0 { if dino > 0 && dn.Type == TypeFile && dn.Nlink == 0 { m.fileDeleted(opened, false, dino, dn.Length) @@ -2135,7 +2222,7 @@ func (m *dbMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentDst func (m *dbMeta) doLink(ctx Context, inode, parent Ino, name string, attr *Attr) syscall.Errno { return errno(m.txn(func(s *xorm.Session) error { var pn = node{Inode: parent} - ok, err := s.ForUpdate().Get(&pn) + ok, err := s.Get(&pn) if err != nil { return err } @@ -2185,6 +2272,8 @@ func (m *dbMeta) doLink(ctx Context, inode, parent Ino, name string, attr *Attr) if time.Duration(now-pn.Mtime*1e3-int64(pn.Mtimensec)) >= m.conf.SkipDirMtime { pn.Mtime = now / 1e3 pn.Ctime = now / 1e3 + pn.Mtimensec = int16(now % 1e3) + pn.Ctimensec = int16(now % 1e3) updateParent = true } n.Parent = 0 @@ -2194,19 +2283,28 @@ func (m *dbMeta) doLink(ctx Context, inode, parent Ino, name string, attr *Attr) if err = mustInsert(s, &edge{Parent: parent, Name: []byte(name), Inode: inode, Type: n.Type}); err != nil { return err } - if updateParent { - if _, err := s.Cols("mtime", "ctime", "mtimensec", "ctimensec").Update(&pn, &node{Inode: parent}); err != nil { - return err - } - } if _, err := s.Cols("nlink", "ctime", "ctimensec", "parent").Update(&n, node{Inode: inode}); err != nil { return err } - if err == nil { - m.parseAttr(&n, attr) + if updateParent { + if _n, err := s.Cols("mtime", "ctime", "mtimensec", "ctimensec").Update(&pn, &node{Inode: parent}); err != nil || _n == 0 { + if err == nil { + logger.Infof("Update parent node affected rows = %d should be 1 for inode = %d .", _n, pn.Inode) + if m.Name() == "mysql" { + err = syscall.EBUSY + } else { + err = syscall.ENOENT + } + } + if err != nil { + return err + } + } } + + m.parseAttr(&n, attr) return err - }, parent)) + }, inode)) } func (m *dbMeta) doReaddir(ctx Context, inode Ino, plus uint8, entries *[]*Entry, limit int) syscall.Errno {