Skip to content

Commit

Permalink
delete all chunks together (#84)
Browse files Browse the repository at this point in the history
* delete all chunks together

* update comment

* cleanup

* fix lint error

* fix rename
  • Loading branch information
davies authored Jan 18, 2021
1 parent 8a474e5 commit 18c9746
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 89 deletions.
189 changes: 100 additions & 89 deletions pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
Flock: lockf$inode -> { $sid_$owner -> ltype }
POSIX lock: lockp$inode -> { $sid_$owner -> Plock(pid,ltype,start,end) }
Sessions: sessions -> [ $sid -> heartbeat ]
Removed chunks: delchunks -> [($inode,$start,$end,$maxchunk) -> seconds]
Removed chunks: delchunks -> [$inode -> seconds]
*/

var logger = utils.GetLogger("juicefs")
Expand Down Expand Up @@ -216,7 +216,7 @@ func (r *redisMeta) entryKey(parent Ino) string {
}

func (r *redisMeta) chunkKey(inode Ino, indx uint32) string {
return fmt.Sprintf("c%d_%d", inode, indx)
return "c" + inode.String() + "_" + strconv.FormatInt(int64(indx), 10)
}

func (r *redisMeta) xattrKey(inode Ino) string {
Expand Down Expand Up @@ -389,10 +389,6 @@ func (r *redisMeta) txn(txf func(tx *redis.Tx) error, keys ...string) syscall.Er
}

func (r *redisMeta) Truncate(ctx Context, inode Ino, flags uint8, length uint64, attr *Attr) syscall.Errno {
maxchunk, err := r.rdb.IncrBy(c, "nextchunk", 0).Uint64()
if err != nil {
return errno(err)
}
return r.txn(func(tx *redis.Tx) error {
var t Attr
a, err := tx.Get(c, r.inodeKey(inode)).Bytes()
Expand All @@ -404,6 +400,37 @@ func (r *redisMeta) Truncate(ctx Context, inode Ino, flags uint8, length uint64,
return syscall.EPERM
}
old := t.Length
var zeroChunks []uint32
if length > old {
if (length-old)/ChunkSize >= 100 {
// super large
var cursor uint64
var keys []string
for {
keys, cursor, err = tx.Scan(c, cursor, fmt.Sprintf("c%d_*", inode), 10000).Result()
if err != nil {
return err
}
for _, key := range keys {
indx, err := strconv.Atoi(strings.Split(key, "_")[1])
if err != nil {
logger.Errorf("parse %s: %s", key, err)
continue
}
if uint64(indx) > old/ChunkSize && uint64(indx) < length/ChunkSize {
zeroChunks = append(zeroChunks, uint32(indx))
}
}
if len(keys) < 10000 {
break
}
}
} else {
for i := old/ChunkSize + 1; i < length/ChunkSize; i++ {
zeroChunks = append(zeroChunks, uint32(i))
}
}
}
t.Length = length
now := time.Now()
t.Mtime = now.Unix()
Expand All @@ -412,17 +439,37 @@ func (r *redisMeta) Truncate(ctx Context, inode Ino, flags uint8, length uint64,
t.Ctimensec = uint32(now.Nanosecond())
_, err = tx.TxPipelined(c, func(pipe redis.Pipeliner) error {
pipe.Set(c, r.inodeKey(inode), r.marshal(&t), 0)
if old > length {
pipe.ZAdd(c, delchunks, &redis.Z{Score: float64(now.Unix()), Member: r.delChunks(inode, length, old, maxchunk)})
} else if length > (old/ChunkSize+1)*ChunkSize {
// zero out last chunks
if length > old {
// zero out from old to length
w := utils.NewBuffer(24)
w.Put32(uint32(old % ChunkSize))
w.Put64(0)
w.Put32(0)
w.Put32(0)
w.Put32(ChunkSize - uint32(old%ChunkSize))
if length > (old/ChunkSize+1)*ChunkSize {
w.Put32(ChunkSize - uint32(old%ChunkSize))
} else {
w.Put32(uint32(length - old))
}
pipe.RPush(c, r.chunkKey(inode, uint32(old/ChunkSize)), w.Bytes())
w = utils.NewBuffer(24)
w.Put32(0)
w.Put64(0)
w.Put32(0)
w.Put32(0)
w.Put32(ChunkSize)
for _, indx := range zeroChunks {
pipe.RPush(c, r.chunkKey(inode, indx), w.Bytes())
}
if length > (old/ChunkSize+1)*ChunkSize && length%ChunkSize > 0 {
w := utils.NewBuffer(24)
w.Put32(0)
w.Put64(0)
w.Put32(0)
w.Put32(0)
w.Put32(uint32(length % ChunkSize))
pipe.RPush(c, r.chunkKey(inode, uint32(length/ChunkSize)), w.Bytes())
}
}
pipe.IncrBy(c, usedSpace, align4K(length)-align4K(old))
return nil
Expand All @@ -431,7 +478,6 @@ func (r *redisMeta) Truncate(ctx Context, inode Ino, flags uint8, length uint64,
if attr != nil {
*attr = t
}
go r.deleteChunks(inode, length, old, maxchunk)
}
return err
}, r.inodeKey(inode))
Expand Down Expand Up @@ -721,17 +767,10 @@ func (r *redisMeta) Unlink(ctx Context, parent Ino, name string) syscall.Errno {

attr.Nlink--
var opened bool
var maxchunk uint64
if _type == TypeFile && attr.Nlink == 0 {
r.Lock()
opened = r.openFiles[inode] > 0
r.Unlock()
if !opened {
maxchunk, err = tx.IncrBy(c, "nextchunk", 0).Uint64()
if err != nil {
return err
}
}
}

_, err = tx.TxPipelined(c, func(pipe redis.Pipeliner) error {
Expand All @@ -750,7 +789,7 @@ func (r *redisMeta) Unlink(ctx Context, parent Ino, name string) syscall.Errno {
pipe.Set(c, r.inodeKey(inode), r.marshal(&attr), 0)
pipe.SAdd(c, r.sessionKey(r.sid), strconv.Itoa(int(inode)))
} else {
pipe.ZAdd(c, delchunks, &redis.Z{Score: float64(now.Unix()), Member: r.delChunks(inode, 0, attr.Length, maxchunk)})
pipe.ZAdd(c, delchunks, &redis.Z{Score: float64(now.Unix()), Member: inode.String()})
pipe.Del(c, r.inodeKey(inode))
pipe.IncrBy(c, usedSpace, -align4K(attr.Length))
}
Expand All @@ -765,7 +804,7 @@ func (r *redisMeta) Unlink(ctx Context, parent Ino, name string) syscall.Errno {
r.removedFiles[inode] = true
r.Unlock()
} else {
go r.deleteChunks(inode, 0, attr.Length, maxchunk)
go r.deleteChunks(inode, inode.String())
}
}
return err
Expand Down Expand Up @@ -869,7 +908,6 @@ func (r *redisMeta) Rename(ctx Context, parentSrc Ino, nameSrc string, parentDst
}
var tattr Attr
var opened bool
var maxchunk uint64
if err == nil {
typ1, dino1 := r.parseEntry(buf)
if dino1 != dino || typ1 != dtyp {
Expand Down Expand Up @@ -898,12 +936,6 @@ func (r *redisMeta) Rename(ctx Context, parentSrc Ino, nameSrc string, parentDst
r.Lock()
opened = r.openFiles[dino] > 0
r.Unlock()
if !opened {
maxchunk, err = tx.IncrBy(c, "nextchunk", 0).Uint64()
if err != nil {
return err
}
}
}
}
} else {
Expand Down Expand Up @@ -971,7 +1003,7 @@ func (r *redisMeta) Rename(ctx Context, parentSrc Ino, nameSrc string, parentDst
pipe.Set(c, r.inodeKey(dino), r.marshal(&tattr), 0)
pipe.SAdd(c, r.sessionKey(r.sid), strconv.Itoa(int(dino)))
} else {
pipe.ZAdd(c, delchunks, &redis.Z{Score: float64(now.Unix()), Member: r.delChunks(dino, 0, tattr.Length, maxchunk)})
pipe.ZAdd(c, delchunks, &redis.Z{Score: float64(now.Unix()), Member: dino.String()})
pipe.Del(c, r.inodeKey(dino))
pipe.IncrBy(c, usedSpace, -align4K(tattr.Length))
}
Expand All @@ -994,7 +1026,7 @@ func (r *redisMeta) Rename(ctx Context, parentSrc Ino, nameSrc string, parentDst
r.removedFiles[dino] = true
r.Unlock()
} else {
go r.deleteChunks(dino, 0, tattr.Length, maxchunk)
go r.deleteChunks(dino, dino.String())
}
}
return err
Expand Down Expand Up @@ -1166,19 +1198,15 @@ func (r *redisMeta) deleteInode(inode Ino) error {
if err != nil {
return err
}
maxchunk, err := r.rdb.IncrBy(c, "nextchunk", 0).Uint64()
if err != nil {
return err
}
r.parseAttr(a, &attr)
_, err = r.rdb.TxPipelined(c, func(pipe redis.Pipeliner) error {
pipe.ZAdd(c, delchunks, &redis.Z{Score: float64(time.Now().Unix()), Member: r.delChunks(inode, 0, attr.Length, maxchunk)})
pipe.ZAdd(c, delchunks, &redis.Z{Score: float64(time.Now().Unix()), Member: inode.String()})
pipe.Del(c, r.inodeKey(inode))
pipe.IncrBy(c, usedSpace, -align4K(attr.Length))
return nil
})
if err == nil {
go r.deleteChunks(inode, 0, attr.Length, maxchunk)
go r.deleteChunks(inode, inode.String())
}
return err
}
Expand Down Expand Up @@ -1307,93 +1335,77 @@ func (r *redisMeta) Write(ctx Context, inode Ino, indx uint32, off uint32, slice
}, r.inodeKey(inode))
}

func (r *redisMeta) delChunks(inode Ino, start, end, maxchunkid uint64) string {
return fmt.Sprintf("%d:%d:%d:%d", inode, start, end, maxchunkid)
}

func (r *redisMeta) cleanupChunks() {
for {
now := time.Now()
members, _ := r.rdb.ZRangeByScore(c, delchunks, &redis.ZRangeBy{Min: strconv.Itoa(0), Max: strconv.Itoa(int(now.Add(time.Hour).Unix())), Count: 1000}).Result()
for _, member := range members {
ps := strings.Split(member, ":")
if len(ps) != 4 {
logger.Errorf("invalid del chunks: %s", member)
continue
}
inode, _ := strconv.Atoi(ps[0])
start, _ := strconv.Atoi(ps[1])
end, _ := strconv.Atoi(ps[2])
maxchunk, _ := strconv.Atoi(ps[3])
r.deleteChunks(Ino(inode), uint64(start), uint64(end), uint64(maxchunk))
r.deleteChunks(Ino(inode), member)
}
time.Sleep(time.Minute)
}
}

func (r *redisMeta) deleteChunks(inode Ino, start, end, maxchunk uint64) {
var i uint32
if start > 0 {
i = uint32((start-1)/ChunkSize) + 1
}
func (r *redisMeta) deleteChunks(inode Ino, tracking string) {
var rs []*redis.StringSliceCmd
for uint64(i)*ChunkSize <= end {
for {
keys, _, err := r.rdb.Scan(c, 0, fmt.Sprintf("c%d_*", inode), 1000).Result()
if err != nil {
return
}
if len(keys) == 0 {
break
}
p := r.rdb.Pipeline()
var indx = i
for j := 0; uint64(i)*ChunkSize <= end && j < 1000; j++ {
rs = append(rs, p.LRange(c, r.chunkKey(inode, i), 0, 1000))
i++
for _, k := range keys {
rs = append(rs, p.LRange(c, k, 0, 1000))
}
vals, err := p.Exec(c)
if err != nil {
logger.Errorf("LRange %d[%d-%d]: %s", inode, start, end, err)
logger.Errorf("delete chunk of %d: %s", inode, err)
return
}
for j := range vals {
val, err := rs[j].Result()
if err == redis.Nil {
continue
}
indx, _ := strconv.Atoi(strings.Split(keys[j], "_")[1])
for _, cs := range val {
rb := utils.ReadBuffer([]byte(cs))
_ = rb.Get32() // pos
chunkid := rb.Get64()
if chunkid == 0 {
continue
cleng := rb.Get32()
var err error
if chunkid > 0 {
err = r.newMsg(DeleteChunk, chunkid, cleng)
}
// there could be new data written after the chunk is marked for deletion,
// so we should not delete any chunk with id > maxchunk
if chunkid > maxchunk {
// mark this chunk is deleted
break
if err == nil {
err = r.txn(func(tx *redis.Tx) error {
val, err := tx.LRange(c, r.chunkKey(inode, uint32(indx)), 0, 0).Result()
if err != nil {
return err
}
if len(val) == 1 && val[0] == cs {
_, err = tx.TxPipelined(c, func(pipe redis.Pipeliner) error {
pipe.LPop(c, r.chunkKey(inode, uint32(indx)))
return nil
})
return err
}
return fmt.Errorf("chunk %d %d changed", inode, uint32(indx))
}, r.chunkKey(inode, uint32(indx)))
}
cleng := rb.Get32()
err := r.newMsg(DeleteChunk, chunkid, cleng)
if err != nil {
if err != nil && err != syscall.Errno(0) {
logger.Warnf("delete chunk %d fail: %s, retry later", inode, err)
now := time.Now()
key := r.delChunks(inode, uint64((indx+uint32(j)))*ChunkSize, uint64((indx+uint32(j)+1))*ChunkSize, maxchunk)
r.rdb.ZAdd(c, delchunks, &redis.Z{Score: float64(now.Unix()), Member: key})
return
}
_ = r.txn(func(tx *redis.Tx) error {
val, err := tx.LRange(c, r.chunkKey(inode, indx+uint32(j)), 0, 1).Result()
if err != nil {
return err
}
if len(val) == 1 && val[0] == cs {
_, err = tx.TxPipelined(c, func(pipe redis.Pipeliner) error {
pipe.LPop(c, r.chunkKey(inode, indx+uint32(j)))
return nil
})
return err
}
return nil
}, r.chunkKey(inode, indx+uint32(j)))
}
}
}
r.rdb.ZRem(c, delchunks, r.delChunks(inode, start, end, maxchunk))
r.rdb.ZRem(c, delchunks, tracking)
}

func (r *redisMeta) parseSlice(buf []byte) *slice {
Expand Down Expand Up @@ -1488,8 +1500,7 @@ func (r *redisMeta) compact(inode Ino, indx uint32) {
w.Put32(size)
_, err = r.rdb.Pipelined(c, func(pipe redis.Pipeliner) error {
pipe.RPush(c, r.chunkKey(0, 0), w.Bytes())
key := r.delChunks(0, 0, uint64(size), chunkid)
r.rdb.ZAdd(c, delchunks, &redis.Z{Score: float64(time.Now().Unix()), Member: key})
r.rdb.ZAdd(c, delchunks, &redis.Z{Score: float64(time.Now().Unix()), Member: "0"})
return nil
})
if err != nil {
Expand Down
Loading

0 comments on commit 18c9746

Please sign in to comment.