-
Notifications
You must be signed in to change notification settings - Fork 24
/
shard_persist.go
100 lines (90 loc) · 2.75 KB
/
shard_persist.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package dagstore
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/url"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/dagstore/shard"
ds "github.com/ipfs/go-datastore"
)
// PersistedShard is the persistent representation of the Shard.
type PersistedShard struct {
Key string `json:"k"`
URL string `json:"u"`
TransientPath string `json:"t"`
State ShardState `json:"s"`
Lazy bool `json:"l"`
Error string `json:"e"`
}
// MarshalJSON returns a serialized representation of the state. It must be
// called with a shard lock (read, at least), such as from inside the event
// loop, as it accesses mutable state.
func (s *Shard) MarshalJSON() ([]byte, error) {
u, err := s.d.mounts.Represent(s.mount)
if err != nil {
return nil, fmt.Errorf("failed to encode mount: %w", err)
}
ps := PersistedShard{
Key: s.key.String(),
URL: u.String(),
State: s.state,
Lazy: s.lazy,
TransientPath: s.mount.TransientPath(),
}
if s.err != nil {
ps.Error = s.err.Error()
}
return json.Marshal(ps)
// TODO maybe switch to CBOR, as it's probably faster.
// var b bytes.Buffer
// if err := ps.MarshalCBOR(&b); err != nil {
// return nil, err
// }
// return b.Bytes(), nil
}
func (s *Shard) UnmarshalJSON(b []byte) error {
var ps PersistedShard // TODO try to avoid this alloc by marshalling/unmarshalling directly.
if err := json.Unmarshal(b, &ps); err != nil {
return err
}
// restore basics.
s.key = shard.KeyFromString(ps.Key)
s.state = ps.State
s.lazy = ps.Lazy
if ps.Error != "" {
s.err = errors.New(ps.Error)
}
// restore mount.
u, err := url.Parse(ps.URL)
if err != nil {
return fmt.Errorf("failed to parse mount URL: %w", err)
}
mnt, err := s.d.mounts.Instantiate(u)
if err != nil {
return fmt.Errorf("failed to instantiate mount from URL: %w", err)
}
s.mount, err = mount.Upgrade(mnt, s.d.throttleReaadyFetch, s.d.config.TransientsDir, s.key.String(), ps.TransientPath)
if err != nil {
return fmt.Errorf("failed to apply mount upgrader: %w", err)
}
return nil
}
// persist persists the shard's state into the supplied Datastore. It calls
// MarshalJSON, which requires holding a shard lock to be safe.
func (s *Shard) persist(ctx context.Context, store ds.Datastore) error {
ps, err := s.MarshalJSON()
if err != nil {
return fmt.Errorf("failed to serialize shard state: %w", err)
}
// assuming that the datastore is namespaced if need be.
k := ds.NewKey(s.key.String())
if err := store.Put(ctx, k, ps); err != nil {
return fmt.Errorf("failed to put shard state: %w", err)
}
if err := store.Sync(ctx, ds.Key{}); err != nil {
return fmt.Errorf("failed to sync shard state to store: %w", err)
}
return nil
}