diff --git a/pkg/meta/redis.go b/pkg/meta/redis.go index 0132f74c4448..529443a6f217 100644 --- a/pkg/meta/redis.go +++ b/pkg/meta/redis.go @@ -43,7 +43,7 @@ import ( Flock: lockf$inode -> { $sid_$owner -> ltype } POSIX lock: lockp$inode -> { $sid_$owner -> Plock(pid,ltype,start,end) } Sessions: sessions -> [ $sid -> heartbeat ] - Removed chunks: delchunks -> [($inode,$start,$end,$maxchunk) -> seconds] + Removed chunks: delchunks -> [$inode -> seconds] */ var logger = utils.GetLogger("juicefs") @@ -216,7 +216,7 @@ func (r *redisMeta) entryKey(parent Ino) string { } func (r *redisMeta) chunkKey(inode Ino, indx uint32) string { - return fmt.Sprintf("c%d_%d", inode, indx) + return "c" + inode.String() + "_" + strconv.FormatInt(int64(indx), 10) } func (r *redisMeta) xattrKey(inode Ino) string { @@ -389,10 +389,6 @@ func (r *redisMeta) txn(txf func(tx *redis.Tx) error, keys ...string) syscall.Er } func (r *redisMeta) Truncate(ctx Context, inode Ino, flags uint8, length uint64, attr *Attr) syscall.Errno { - maxchunk, err := r.rdb.IncrBy(c, "nextchunk", 0).Uint64() - if err != nil { - return errno(err) - } return r.txn(func(tx *redis.Tx) error { var t Attr a, err := tx.Get(c, r.inodeKey(inode)).Bytes() @@ -404,6 +400,37 @@ func (r *redisMeta) Truncate(ctx Context, inode Ino, flags uint8, length uint64, return syscall.EPERM } old := t.Length + var zeroChunks []uint32 + if length > old { + if (length-old)/ChunkSize >= 100 { + // super large + var cursor uint64 + var keys []string + for { + keys, cursor, err = tx.Scan(c, cursor, fmt.Sprintf("c%d_*", inode), 10000).Result() + if err != nil { + return err + } + for _, key := range keys { + indx, err := strconv.Atoi(strings.Split(key, "_")[1]) + if err != nil { + logger.Errorf("parse %s: %s", key, err) + continue + } + if uint64(indx) > old/ChunkSize && uint64(indx) < length/ChunkSize { + zeroChunks = append(zeroChunks, uint32(indx)) + } + } + if len(keys) < 10000 { + break + } + } + } else { + for i := old/ChunkSize + 1; i < length/ChunkSize; i++ { + zeroChunks = append(zeroChunks, uint32(i)) + } + } + } t.Length = length now := time.Now() t.Mtime = now.Unix() @@ -412,17 +439,37 @@ func (r *redisMeta) Truncate(ctx Context, inode Ino, flags uint8, length uint64, t.Ctimensec = uint32(now.Nanosecond()) _, err = tx.TxPipelined(c, func(pipe redis.Pipeliner) error { pipe.Set(c, r.inodeKey(inode), r.marshal(&t), 0) - if old > length { - pipe.ZAdd(c, delchunks, &redis.Z{Score: float64(now.Unix()), Member: r.delChunks(inode, length, old, maxchunk)}) - } else if length > (old/ChunkSize+1)*ChunkSize { - // zero out last chunks + if length > old { + // zero out from old to length w := utils.NewBuffer(24) w.Put32(uint32(old % ChunkSize)) w.Put64(0) w.Put32(0) w.Put32(0) - w.Put32(ChunkSize - uint32(old%ChunkSize)) + if length > (old/ChunkSize+1)*ChunkSize { + w.Put32(ChunkSize - uint32(old%ChunkSize)) + } else { + w.Put32(uint32(length - old)) + } pipe.RPush(c, r.chunkKey(inode, uint32(old/ChunkSize)), w.Bytes()) + w = utils.NewBuffer(24) + w.Put32(0) + w.Put64(0) + w.Put32(0) + w.Put32(0) + w.Put32(ChunkSize) + for _, indx := range zeroChunks { + pipe.RPush(c, r.chunkKey(inode, indx), w.Bytes()) + } + if length > (old/ChunkSize+1)*ChunkSize && length%ChunkSize > 0 { + w := utils.NewBuffer(24) + w.Put32(0) + w.Put64(0) + w.Put32(0) + w.Put32(0) + w.Put32(uint32(length % ChunkSize)) + pipe.RPush(c, r.chunkKey(inode, uint32(length/ChunkSize)), w.Bytes()) + } } pipe.IncrBy(c, usedSpace, align4K(length)-align4K(old)) return nil @@ -431,7 +478,6 @@ func (r *redisMeta) Truncate(ctx Context, inode Ino, flags uint8, length uint64, if attr != nil { *attr = t } - go r.deleteChunks(inode, length, old, maxchunk) } return err }, r.inodeKey(inode)) @@ -721,17 +767,10 @@ func (r *redisMeta) Unlink(ctx Context, parent Ino, name string) syscall.Errno { attr.Nlink-- var opened bool - var maxchunk uint64 if _type == TypeFile && attr.Nlink == 0 { r.Lock() opened = r.openFiles[inode] > 0 r.Unlock() - if !opened { - maxchunk, err = tx.IncrBy(c, "nextchunk", 0).Uint64() - if err != nil { - return err - } - } } _, err = tx.TxPipelined(c, func(pipe redis.Pipeliner) error { @@ -750,7 +789,7 @@ func (r *redisMeta) Unlink(ctx Context, parent Ino, name string) syscall.Errno { pipe.Set(c, r.inodeKey(inode), r.marshal(&attr), 0) pipe.SAdd(c, r.sessionKey(r.sid), strconv.Itoa(int(inode))) } else { - pipe.ZAdd(c, delchunks, &redis.Z{Score: float64(now.Unix()), Member: r.delChunks(inode, 0, attr.Length, maxchunk)}) + pipe.ZAdd(c, delchunks, &redis.Z{Score: float64(now.Unix()), Member: inode.String()}) pipe.Del(c, r.inodeKey(inode)) pipe.IncrBy(c, usedSpace, -align4K(attr.Length)) } @@ -765,7 +804,7 @@ func (r *redisMeta) Unlink(ctx Context, parent Ino, name string) syscall.Errno { r.removedFiles[inode] = true r.Unlock() } else { - go r.deleteChunks(inode, 0, attr.Length, maxchunk) + go r.deleteChunks(inode, inode.String()) } } return err @@ -869,7 +908,6 @@ func (r *redisMeta) Rename(ctx Context, parentSrc Ino, nameSrc string, parentDst } var tattr Attr var opened bool - var maxchunk uint64 if err == nil { typ1, dino1 := r.parseEntry(buf) if dino1 != dino || typ1 != dtyp { @@ -898,12 +936,6 @@ func (r *redisMeta) Rename(ctx Context, parentSrc Ino, nameSrc string, parentDst r.Lock() opened = r.openFiles[dino] > 0 r.Unlock() - if !opened { - maxchunk, err = tx.IncrBy(c, "nextchunk", 0).Uint64() - if err != nil { - return err - } - } } } } else { @@ -971,7 +1003,7 @@ func (r *redisMeta) Rename(ctx Context, parentSrc Ino, nameSrc string, parentDst pipe.Set(c, r.inodeKey(dino), r.marshal(&tattr), 0) pipe.SAdd(c, r.sessionKey(r.sid), strconv.Itoa(int(dino))) } else { - pipe.ZAdd(c, delchunks, &redis.Z{Score: float64(now.Unix()), Member: r.delChunks(dino, 0, tattr.Length, maxchunk)}) + pipe.ZAdd(c, delchunks, &redis.Z{Score: float64(now.Unix()), Member: dino.String()}) pipe.Del(c, r.inodeKey(dino)) pipe.IncrBy(c, usedSpace, -align4K(tattr.Length)) } @@ -994,7 +1026,7 @@ func (r *redisMeta) Rename(ctx Context, parentSrc Ino, nameSrc string, parentDst r.removedFiles[dino] = true r.Unlock() } else { - go r.deleteChunks(dino, 0, tattr.Length, maxchunk) + go r.deleteChunks(dino, dino.String()) } } return err @@ -1166,19 +1198,15 @@ func (r *redisMeta) deleteInode(inode Ino) error { if err != nil { return err } - maxchunk, err := r.rdb.IncrBy(c, "nextchunk", 0).Uint64() - if err != nil { - return err - } r.parseAttr(a, &attr) _, err = r.rdb.TxPipelined(c, func(pipe redis.Pipeliner) error { - pipe.ZAdd(c, delchunks, &redis.Z{Score: float64(time.Now().Unix()), Member: r.delChunks(inode, 0, attr.Length, maxchunk)}) + pipe.ZAdd(c, delchunks, &redis.Z{Score: float64(time.Now().Unix()), Member: inode.String()}) pipe.Del(c, r.inodeKey(inode)) pipe.IncrBy(c, usedSpace, -align4K(attr.Length)) return nil }) if err == nil { - go r.deleteChunks(inode, 0, attr.Length, maxchunk) + go r.deleteChunks(inode, inode.String()) } return err } @@ -1307,46 +1335,36 @@ func (r *redisMeta) Write(ctx Context, inode Ino, indx uint32, off uint32, slice }, r.inodeKey(inode)) } -func (r *redisMeta) delChunks(inode Ino, start, end, maxchunkid uint64) string { - return fmt.Sprintf("%d:%d:%d:%d", inode, start, end, maxchunkid) -} - func (r *redisMeta) cleanupChunks() { for { now := time.Now() members, _ := r.rdb.ZRangeByScore(c, delchunks, &redis.ZRangeBy{Min: strconv.Itoa(0), Max: strconv.Itoa(int(now.Add(time.Hour).Unix())), Count: 1000}).Result() for _, member := range members { ps := strings.Split(member, ":") - if len(ps) != 4 { - logger.Errorf("invalid del chunks: %s", member) - continue - } inode, _ := strconv.Atoi(ps[0]) - start, _ := strconv.Atoi(ps[1]) - end, _ := strconv.Atoi(ps[2]) - maxchunk, _ := strconv.Atoi(ps[3]) - r.deleteChunks(Ino(inode), uint64(start), uint64(end), uint64(maxchunk)) + r.deleteChunks(Ino(inode), member) } time.Sleep(time.Minute) } } -func (r *redisMeta) deleteChunks(inode Ino, start, end, maxchunk uint64) { - var i uint32 - if start > 0 { - i = uint32((start-1)/ChunkSize) + 1 - } +func (r *redisMeta) deleteChunks(inode Ino, tracking string) { var rs []*redis.StringSliceCmd - for uint64(i)*ChunkSize <= end { + for { + keys, _, err := r.rdb.Scan(c, 0, fmt.Sprintf("c%d_*", inode), 1000).Result() + if err != nil { + return + } + if len(keys) == 0 { + break + } p := r.rdb.Pipeline() - var indx = i - for j := 0; uint64(i)*ChunkSize <= end && j < 1000; j++ { - rs = append(rs, p.LRange(c, r.chunkKey(inode, i), 0, 1000)) - i++ + for _, k := range keys { + rs = append(rs, p.LRange(c, k, 0, 1000)) } vals, err := p.Exec(c) if err != nil { - logger.Errorf("LRange %d[%d-%d]: %s", inode, start, end, err) + logger.Errorf("delete chunk of %d: %s", inode, err) return } for j := range vals { @@ -1354,46 +1372,40 @@ func (r *redisMeta) deleteChunks(inode Ino, start, end, maxchunk uint64) { if err == redis.Nil { continue } + indx, _ := strconv.Atoi(strings.Split(keys[j], "_")[1]) for _, cs := range val { rb := utils.ReadBuffer([]byte(cs)) _ = rb.Get32() // pos chunkid := rb.Get64() - if chunkid == 0 { - continue + cleng := rb.Get32() + var err error + if chunkid > 0 { + err = r.newMsg(DeleteChunk, chunkid, cleng) } - // there could be new data written after the chunk is marked for deletion, - // so we should not delete any chunk with id > maxchunk - if chunkid > maxchunk { - // mark this chunk is deleted - break + if err == nil { + err = r.txn(func(tx *redis.Tx) error { + val, err := tx.LRange(c, r.chunkKey(inode, uint32(indx)), 0, 0).Result() + if err != nil { + return err + } + if len(val) == 1 && val[0] == cs { + _, err = tx.TxPipelined(c, func(pipe redis.Pipeliner) error { + pipe.LPop(c, r.chunkKey(inode, uint32(indx))) + return nil + }) + return err + } + return fmt.Errorf("chunk %d %d changed", inode, uint32(indx)) + }, r.chunkKey(inode, uint32(indx))) } - cleng := rb.Get32() - err := r.newMsg(DeleteChunk, chunkid, cleng) - if err != nil { + if err != nil && err != syscall.Errno(0) { logger.Warnf("delete chunk %d fail: %s, retry later", inode, err) - now := time.Now() - key := r.delChunks(inode, uint64((indx+uint32(j)))*ChunkSize, uint64((indx+uint32(j)+1))*ChunkSize, maxchunk) - r.rdb.ZAdd(c, delchunks, &redis.Z{Score: float64(now.Unix()), Member: key}) return } - _ = r.txn(func(tx *redis.Tx) error { - val, err := tx.LRange(c, r.chunkKey(inode, indx+uint32(j)), 0, 1).Result() - if err != nil { - return err - } - if len(val) == 1 && val[0] == cs { - _, err = tx.TxPipelined(c, func(pipe redis.Pipeliner) error { - pipe.LPop(c, r.chunkKey(inode, indx+uint32(j))) - return nil - }) - return err - } - return nil - }, r.chunkKey(inode, indx+uint32(j))) } } } - r.rdb.ZRem(c, delchunks, r.delChunks(inode, start, end, maxchunk)) + r.rdb.ZRem(c, delchunks, tracking) } func (r *redisMeta) parseSlice(buf []byte) *slice { @@ -1488,8 +1500,7 @@ func (r *redisMeta) compact(inode Ino, indx uint32) { w.Put32(size) _, err = r.rdb.Pipelined(c, func(pipe redis.Pipeliner) error { pipe.RPush(c, r.chunkKey(0, 0), w.Bytes()) - key := r.delChunks(0, 0, uint64(size), chunkid) - r.rdb.ZAdd(c, delchunks, &redis.Z{Score: float64(time.Now().Unix()), Member: key}) + r.rdb.ZAdd(c, delchunks, &redis.Z{Score: float64(time.Now().Unix()), Member: "0"}) return nil }) if err != nil { diff --git a/pkg/meta/redis_test.go b/pkg/meta/redis_test.go index a97943359649..c8b50887ab5c 100644 --- a/pkg/meta/redis_test.go +++ b/pkg/meta/redis_test.go @@ -16,6 +16,7 @@ package meta import ( + "fmt" "sync" "syscall" "testing" @@ -314,3 +315,51 @@ func TestConcurrentWrite(t *testing.T) { t.Fatal() } } + +// nolint:errcheck +func TestTruncateAndDelete(t *testing.T) { + var conf RedisConfig + m, err := NewRedisMeta("redis://127.0.0.1/10", &conf) + if err != nil { + t.Logf("redis is not available: %s", err) + t.Skip() + } + m.OnMsg(DeleteChunk, func(args ...interface{}) error { + return nil + }) + _ = m.Init(Format{Name: "test"}, true) + ctx := Background + var inode Ino + var attr = &Attr{} + m.Unlink(ctx, 1, "f") + if st := m.Create(ctx, 1, "f", 0650, 022, &inode, attr); st != 0 { + t.Fatalf("create file %s", st) + } + defer m.Unlink(ctx, 1, "f") + if st := m.Write(ctx, inode, 0, 100, Slice{1, 100, 0, 100}); st != 0 { + t.Fatalf("write file %s", st) + } + if st := m.Truncate(ctx, inode, 0, 200<<20, attr); st != 0 { + t.Fatalf("truncate file %s", st) + } + if st := m.Truncate(ctx, inode, 0, (10<<40)+10, attr); st != 0 { + t.Fatalf("truncate file %s", st) + } + r := m.(*redisMeta) + keys, _, _ := r.rdb.Scan(c, 0, fmt.Sprintf("c%d_*", inode), 1000).Result() + if len(keys) != 5 { + for _, k := range keys { + println("key", k) + } + t.Fatalf("number of chunks: %d != 5", len(keys)) + } + m.Close(ctx, inode) + if st := m.Unlink(ctx, 1, "f"); st != 0 { + t.Fatalf("unlink file %s", st) + } + time.Sleep(time.Millisecond * 10) + keys, _, _ = r.rdb.Scan(c, 0, fmt.Sprintf("c%d_*", inode), 1000).Result() + if len(keys) != 0 { + t.Fatalf("number of chunks: %d != 0", len(keys)) + } +}