Skip to content

Commit

Permalink
meta/tkv: support new backup
Browse files Browse the repository at this point in the history
Signed-off-by: jiefenghuang <[email protected]>
  • Loading branch information
jiefenghuang committed Dec 11, 2024
1 parent d1c00a8 commit 9c79f07
Show file tree
Hide file tree
Showing 13 changed files with 774 additions and 113 deletions.
53 changes: 25 additions & 28 deletions pkg/meta/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,17 @@ const (

var errBakEOF = fmt.Errorf("reach backup EOF")

func getMessageNameFromType(typ int) protoreflect.FullName {
func getMessageFromType(typ int) (proto.Message, error) {
var name protoreflect.FullName
if typ == segTypeFormat {
return proto.MessageName(&pb.Format{})
name = proto.MessageName(&pb.Format{})
} else if typ < segTypeMax {
return proto.MessageName(&pb.Batch{})
} else {
return ""
name = proto.MessageName(&pb.Batch{})
}
if name == "" {
return nil, fmt.Errorf("unknown message type %d", typ)
}
return createMessageByName(name)
}

func createMessageByName(name protoreflect.FullName) (proto.Message, error) {
Expand Down Expand Up @@ -128,16 +131,7 @@ func (f *bakFormat) writeFooter(w io.Writer) error {
if err := f.writeEOS(w); err != nil {
return err
}

data, err := f.footer.Marshal()
if err != nil {
return err
}
n, err := w.Write(data)
if err != nil && n != len(data) {
return fmt.Errorf("failed to write footer: err %v, write len %d, expect len %d", err, n, len(data))
}
return nil
return f.footer.Marshal(w)
}

func (f *bakFormat) writeEOS(w io.Writer) error {
Expand All @@ -164,15 +158,21 @@ type bakFooter struct {
len uint64
}

func (h *bakFooter) Marshal() ([]byte, error) {
func (h *bakFooter) Marshal(w io.Writer) error {
data, err := proto.Marshal(h.msg)
if err != nil {
return nil, fmt.Errorf("failed to marshal footer: %w", err)
return fmt.Errorf("failed to marshal footer: %w", err)
}

if n, err := w.Write(data); err != nil && n != len(data) {
return fmt.Errorf("failed to write footer data: err %w, write len %d, expect len %d", err, n, len(data))
}

h.len = uint64(len(data))
data = binary.BigEndian.AppendUint64(data, h.len)
return data, nil
if n, err := w.Write(binary.BigEndian.AppendUint64(nil, h.len)); err != nil && n != 8 {
return fmt.Errorf("failed to write footer length: err %w, write len %d, expect len 8", err, n)
}
return nil
}

func (h *bakFooter) Unmarshal(r io.ReadSeeker) error {
Expand Down Expand Up @@ -205,7 +205,7 @@ type bakSegment struct {
}

func (s *bakSegment) String() string {
return string(proto.MessageName(s.val).Name())
return fmt.Sprintf("type-%d", s.typ)
}

func (s *bakSegment) num() uint64 {
Expand Down Expand Up @@ -310,13 +310,9 @@ func (s *bakSegment) Unmarshal(r io.Reader) error {
return fmt.Errorf("failed to read segment type: %v", err)
}

if s.typ == BakMagic {
if s.typ == BakEOS {
return errBakEOF
}
name := getMessageNameFromType(int(s.typ))
if name == "" {
return fmt.Errorf("segment type %d is unknown", s.typ)
}

if err := binary.Read(r, binary.BigEndian, &s.len); err != nil {
return fmt.Errorf("failed to read segment %s length: %v", s, err)
Expand All @@ -326,12 +322,13 @@ func (s *bakSegment) Unmarshal(r io.Reader) error {
if err != nil && n != int(s.len) {
return fmt.Errorf("failed to read segment value: err %v, read len %d, expect len %d", err, n, s.len)
}
msg, err := createMessageByName(name)

msg, err := getMessageFromType(int(s.typ))
if err != nil {
return fmt.Errorf("failed to create message %s: %v", name, err)
return fmt.Errorf("failed to create message by type %d: %w", s.typ, err)
}
if err = proto.Unmarshal(data, msg); err != nil {
return fmt.Errorf("failed to unmarshal segment msg %s: %v", name, err)
return fmt.Errorf("failed to unmarshal segment msg %d: %w", s.typ, err)
}
s.val = msg
return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/meta/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2367,7 +2367,7 @@ func setAttr(t *testing.T, m Meta, inode Ino, attr *Attr) {
return err
})
case *kvMeta:
err = m.txn(func(tx *kvTxn) error {
err = m.txn(Background, func(tx *kvTxn) error {
tx.set(m.inodeKey(inode), m.marshal(attr))
return nil
})
Expand Down Expand Up @@ -2834,7 +2834,7 @@ func testClone(t *testing.T, m Meta) {
t.Fatalf("remove tree error rootInode: %v", cloneDstIno)
}
removedItem = append(removedItem, m.detachedKey(cloneDstIno))
m.txn(func(tx *kvTxn) error {
m.txn(Background, func(tx *kvTxn) error {
for _, key := range removedItem {
if buf := tx.get(key.([]byte)); buf != nil {
t.Fatalf("has keys not removed: %v", removedItem)
Expand Down
20 changes: 18 additions & 2 deletions pkg/meta/load_dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ func TestLoadDumpV2(t *testing.T) {
"sqlite3": {"sqlite3://dev.db", "sqlite3://dev2.db"},
// "mysql": {"mysql://root:@/dev", "mysql://root:@/dev2"},
"redis": {"redis://127.0.0.1:6379/2", "redis://127.0.0.1:6379/3"},
// "tikv": {"tikv://127.0.0.1:2379/jfs-load-dump-1", "tikv://127.0.0.1:2379/jfs-load-dump-2"},
"tikv": {"tikv://127.0.0.1:2379/jfs-load-dump-1", "tikv://127.0.0.1:2379/jfs-load-dump-2"},
}

for name, addrs := range engines {
Expand Down Expand Up @@ -494,7 +494,7 @@ func BenchmarkLoadDumpV2(b *testing.B) {
engines := map[string]string{
"mysql": "mysql://root:@/dev",
"redis": "redis://127.0.0.1:6379/2",
"tikv": "tikv://127.0.0.1:2379/jfs-load-dump-1",
"tikv": "tikv://127.0.0.1:2379/jfs-load-dump-1",
}
sample := "../../1M_files_in_one_dir.dump"
Expand Down Expand Up @@ -551,6 +551,22 @@ func BenchmarkLoadDumpV2(b *testing.B) {
b.Fatalf("dump meta: %s", err)
}
fp.Sync()
b.StopTimer()
bak := &bakFormat{}
fp2, err := os.Open(path)
if err != nil {
b.Fatalf("open file: %s", path)
}
defer fp2.Close()
footer, err := bak.readFooter(fp2)
if err != nil {
b.Fatalf("read footer: %s", err)
}
for name, info := range footer.msg.Infos {
b.Logf("segment: %s, num: %d", name, info.Num)
}
b.StartTimer()
})
b.Run("LoadV2 "+name, func(b *testing.B) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ func (m *redisMeta) nextTrashKey() string {
}

func (m *redisMeta) counterKey(name string) string {
if name == "nextInode" || name == "nextChunk" || name == "nextSession" {
if name == "nextInode" || name == "nextChunk" || name == "nextSession" || name == "nextTrash" {
name = strings.ToLower(name)
}
return m.prefix + name
Expand Down
Loading

0 comments on commit 9c79f07

Please sign in to comment.