Skip to content

Commit

Permalink
Merge pull request #46 from onflow/m4ksio/4547-ledger-paths
Browse files Browse the repository at this point in the history
Use new Ledger package
  • Loading branch information
Kay-Zee authored Oct 6, 2020
2 parents 830ee3f + 4a4244a commit 8b9dd47
Show file tree
Hide file tree
Showing 63 changed files with 1,398 additions and 996 deletions.
4 changes: 2 additions & 2 deletions cmd/bootstrap/run/execution_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"github.com/onflow/flow-go/crypto"
"github.com/onflow/flow-go/crypto/hash"
"github.com/onflow/flow-go/engine/execution/state/bootstrap"
ledger "github.com/onflow/flow-go/ledger/complete"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/storage/ledger"
)

// NOTE: this is now unused and should become part of another tool.
Expand All @@ -35,7 +35,7 @@ func GenerateExecutionState(
) (flow.StateCommitment, error) {
metricsCollector := &metrics.NoopCollector{}

ledgerStorage, err := ledger.NewMTrieStorage(dbDir, 100, metricsCollector, nil)
ledgerStorage, err := ledger.NewLedger(dbDir, 100, metricsCollector, zerolog.Nop(), nil)
defer ledgerStorage.CloseStorage()
if err != nil {
return nil, err
Expand Down
8 changes: 4 additions & 4 deletions cmd/execution/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/onflow/flow-go/engine/execution/state"
"github.com/onflow/flow-go/engine/execution/state/bootstrap"
"github.com/onflow/flow-go/fvm"
ledger "github.com/onflow/flow-go/ledger/complete"
wal "github.com/onflow/flow-go/ledger/complete/wal"
bootstrapFilenames "github.com/onflow/flow-go/model/bootstrap"
"github.com/onflow/flow-go/model/encoding"
"github.com/onflow/flow-go/model/flow"
Expand All @@ -40,14 +42,12 @@ import (
"github.com/onflow/flow-go/module/signature"
chainsync "github.com/onflow/flow-go/module/synchronization"
storage "github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/storage/ledger"
"github.com/onflow/flow-go/storage/ledger/wal"
)

func main() {

var (
ledgerStorage *ledger.MTrieStorage
ledgerStorage *ledger.Ledger
events *storage.Events
txResults *storage.TransactionResults
results *storage.ExecutionResults
Expand Down Expand Up @@ -171,7 +171,7 @@ func main() {
}
}

ledgerStorage, err = ledger.NewMTrieStorage(triedir, int(mTrieCacheSize), collector, node.MetricsRegisterer)
ledgerStorage, err = ledger.NewLedger(triedir, int(mTrieCacheSize), collector, node.Logger.With().Str("subcomponent", "ledger").Logger(), node.MetricsRegisterer)
return ledgerStorage, err
}).
Component("execution state ledger WAL compactor", func(node *cmd.FlowNodeBuilder) (module.ReadyDoneAware, error) {
Expand Down
36 changes: 9 additions & 27 deletions cmd/util/cmd/exec-data-json-export/delta_snapshot_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,10 @@ import (
"github.com/onflow/flow-go/storage/badger/operation"
)

type dMapping struct {
Hash string `json:"hash"`
Owner string `json:"owner"`
Key string `json:"key"`
Controller string `json:"controller"`
}

type dSnapshot struct {
DeltaJSONStr string `json:"delta_json_str"`
DeltaMapping []dMapping `json:"delta_mapping"`
Reads []string `json:"reads"`
SpockSecret string `json:"spock_secret_data"`
DeltaJSONStr string `json:"delta_json_str"`
Reads []string `json:"reads"`
SpockSecret string `json:"spock_secret_data"`
}

// ExportDeltaSnapshots exports all the delta snapshots
Expand Down Expand Up @@ -74,28 +66,18 @@ func ExportDeltaSnapshots(blockID flow.Identifier, dbPath string, outputPath str
return fmt.Errorf("could not load delta snapshot: %w", err)
}

dm := make([]dMapping, 0)
for k, v := range snap[0].Delta.ReadMappings {
dm = append(dm, dMapping{Hash: hex.EncodeToString([]byte(k)),
Owner: hex.EncodeToString([]byte(v.Owner)),
Key: hex.EncodeToString([]byte(v.Key)),
Controller: hex.EncodeToString([]byte(v.Controller))})
}
for k, v := range snap[0].Delta.WriteMappings {
dm = append(dm, dMapping{Hash: hex.EncodeToString([]byte(k)),
Owner: hex.EncodeToString([]byte(v.Owner)),
Key: hex.EncodeToString([]byte(v.Key)),
Controller: hex.EncodeToString([]byte(v.Controller))})
}

reads := make([]string, 0)
for _, r := range snap[0].Reads {
reads = append(reads, hex.EncodeToString(r[:]))

json, err := json.Marshal(r)
if err != nil {
return fmt.Errorf("could not create a json obj for a read registerID: %w", err)
}
reads = append(reads, string(json))
}

data := dSnapshot{
DeltaJSONStr: string(m),
DeltaMapping: dm,
Reads: reads,
SpockSecret: hex.EncodeToString(snap[0].SpockSecret),
}
Expand Down
23 changes: 19 additions & 4 deletions cmd/util/cmd/execution-state-extract/cmd.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package extract

import (
"fmt"

"github.com/rs/zerolog/log"
"github.com/spf13/cobra"

Expand All @@ -17,6 +15,7 @@ var (
flagOutputDir string
flagBlockHash string
flagDatadir string
flagMappingsFile string
)

var Cmd = &cobra.Command{
Expand All @@ -41,6 +40,10 @@ func init() {
Cmd.Flags().StringVar(&flagDatadir, "datadir", "",
"directory that stores the protocol state")
_ = Cmd.MarkFlagRequired("datadir")

Cmd.Flags().StringVar(&flagMappingsFile, "mappings-file", "",
"file containing mappings from previous Sporks")
_ = Cmd.MarkFlagRequired("mappings-file")
}

func run(*cobra.Command, []string) {
Expand All @@ -61,9 +64,21 @@ func run(*cobra.Command, []string) {
log.Fatal().Err(err).Msg("cannot get state commitment for block")
}

fmt.Printf("%x\n", stateCommitment)
mappings, err := getMappingsFromDatabase(db)
if err != nil {
log.Fatal().Err(err).Msg("cannot get mapping for a database")
}

mappingFromFile, err := readMegamappings(flagMappingsFile)
if err != nil {
log.Fatal().Err(err).Msg("cannot load mappings from a file")
}

for k, v := range mappingFromFile {
mappings[k] = v
}

err = extractExecutionState(flagExecutionStateDir, stateCommitment, flagOutputDir, log.Logger)
err = extractExecutionState(flagExecutionStateDir, stateCommitment, flagOutputDir, log.Logger, mappings)
if err != nil {
log.Fatal().Err(err).Msg("cannot generate checkpoint with state commitment")
}
Expand Down
168 changes: 155 additions & 13 deletions cmd/util/cmd/execution-state-extract/execution_state_extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,100 @@ package extract

import (
"bytes"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"path"
"time"

"github.com/dgraph-io/badger/v2"

"github.com/onflow/flow-go/engine/execution/state"
"github.com/onflow/flow-go/engine/execution/state/delta"
"github.com/onflow/flow-go/ledger/common/pathfinder"
"github.com/onflow/flow-go/ledger/complete/mtrie"
"github.com/onflow/flow-go/ledger/complete/mtrie/flattener"
"github.com/onflow/flow-go/ledger/complete/wal"
"github.com/onflow/flow-go/storage/badger/operation"

"github.com/rs/zerolog"

"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/storage/ledger"
"github.com/onflow/flow-go/storage/ledger/mtrie"
"github.com/onflow/flow-go/storage/ledger/mtrie/flattener"
oldLedger "github.com/onflow/flow-go/storage/ledger"
oldMtrie "github.com/onflow/flow-go/storage/ledger/mtrie"
oldFlattener "github.com/onflow/flow-go/storage/ledger/mtrie/flattener"
"github.com/onflow/flow-go/storage/ledger/mtrie/trie"
"github.com/onflow/flow-go/storage/ledger/wal"
oldWal "github.com/onflow/flow-go/storage/ledger/wal"
)

//
func getStateCommitment(commits storage.Commits, blockHash flow.Identifier) (flow.StateCommitment, error) {
return commits.ByBlockID(blockHash)
}

func extractExecutionState(dir string, targetHash flow.StateCommitment, outputDir string, log zerolog.Logger) error {
func getMappingsFromDatabase(db *badger.DB) (map[string]delta.Mapping, error) {

mappings := make(map[string]delta.Mapping)

var found [][]*delta.LegacySnapshot
err := db.View(operation.FindLegacyExecutionStateInteractions(func(interactions []*delta.LegacySnapshot) bool {

for _, interaction := range interactions {
for k, mapping := range interaction.Delta.ReadMappings {
mappings[k] = mapping
}
for k, mapping := range interaction.Delta.WriteMappings {
mappings[k] = mapping
}
}

return false
}, &found))

return mappings, err
}

func readMegamappings(filename string) (map[string]delta.Mapping, error) {
var readMappings = map[string]delta.Mapping{}
var hexencodedRead map[string]delta.Mapping

w, err := wal.NewWAL(nil, nil, dir, ledger.CacheSize, ledger.RegisterKeySize, wal.SegmentSize)
bytes, err := ioutil.ReadFile(filename)
if err != nil {
return nil, fmt.Errorf("cannot open mappings file: %w", err)
}
err = json.Unmarshal(bytes, &hexencodedRead)
if err != nil {
return nil, fmt.Errorf("cannot unmarshall mappings: %w", err)
}

for k, mapping := range hexencodedRead {
decodeString, err := hex.DecodeString(k)
if err != nil {
return nil, fmt.Errorf("cannot decode key: %w", err)
}
readMappings[string(decodeString)] = mapping
}

return readMappings, nil
}

//
func extractExecutionState(dir string, targetHash flow.StateCommitment, outputDir string, log zerolog.Logger, mappings map[string]delta.Mapping) error {

w, err := oldWal.NewWAL(nil, nil, dir, oldLedger.CacheSize, oldLedger.RegisterKeySize, oldWal.SegmentSize)
if err != nil {
return fmt.Errorf("cannot create WAL: %w", err)
}
defer func() {
_ = w.Close()
}()

mForest, err := mtrie.NewMForest(ledger.RegisterKeySize, outputDir, 1000, &metrics.NoopCollector{}, func(evictedTrie *trie.MTrie) error { return nil })
oldMForest, err := oldMtrie.NewMForest(oldLedger.RegisterKeySize, outputDir, 1000, &metrics.NoopCollector{}, func(evictedTrie *trie.MTrie) error { return nil })
if err != nil {
return fmt.Errorf("cannot create mForest: %w", err)
}
Expand All @@ -48,21 +110,23 @@ func extractExecutionState(dir string, targetHash flow.StateCommitment, outputDi

FoundHashError := fmt.Errorf("found hash %s", targetHash)

log.Info().Msg("Replaying WAL")

err = w.ReplayLogsOnly(
func(forestSequencing *flattener.FlattenedForest) error {
rebuiltTries, err := flattener.RebuildTries(forestSequencing)
func(forestSequencing *oldFlattener.FlattenedForest) error {
rebuiltTries, err := oldFlattener.RebuildTries(forestSequencing)
if err != nil {
return fmt.Errorf("rebuilding forest from sequenced nodes failed: %w", err)
}
err = mForest.AddTries(rebuiltTries)
err = oldMForest.AddTries(rebuiltTries)
if err != nil {
return fmt.Errorf("adding rebuilt tries to forest failed: %w", err)
}
return nil
},
func(stateCommitment flow.StateCommitment, keys [][]byte, values [][]byte) error {

newTrie, err := mForest.Update(stateCommitment, keys, values)
newTrie, err := oldMForest.Update(stateCommitment, keys, values)

for _, value := range values {
valuesSize += len(value)
Expand Down Expand Up @@ -100,12 +164,90 @@ func extractExecutionState(dir string, targetHash flow.StateCommitment, outputDi
return fmt.Errorf("no value found: %w", err)
}

log.Info().Int("values_count", valuesCount).Int("values_size_bytes", valuesSize).Int("updates_count", i).Float64("total_time_s", duration.Seconds()).Msg("finished seeking")
log.Info().Int("values_count", valuesCount).Int("values_size_bytes", valuesSize).Int("updates_count", i).Float64("total_time_s", duration.Seconds()).Msg("finished replaying")

//remove other tries
tries, err := oldMForest.GetTries()
if err != nil {
return fmt.Errorf("cannot get tries: %w", err)
}

for _, mTrie := range tries {
if !bytes.Equal(mTrie.RootHash(), targetHash) {
oldMForest.RemoveTrie(mTrie.RootHash())
}
}

// check if we have only one trie
tries, err = oldMForest.GetTries()
if err != nil {
return fmt.Errorf("cannot get tries again: %w", err)
}

if len(tries) != 1 {
return fmt.Errorf("too many tries left after filtering: %w", err)
}

// converting data
log.Info().Msg("Converting data")
rootTrie := tries[0]

iterator := oldFlattener.NewNodeIterator(rootTrie)

forest, err := mtrie.NewForest(pathfinder.PathByteSize, "", 1000, &metrics.NoopCollector{}, nil)
if err != nil {
return fmt.Errorf("cannot create new forest: %w", err)
}

paths := make([]ledger.Path, 0)
payloads := make([]*ledger.Payload, 0)

for iterator.Next() {
node := iterator.Value()

if !node.IsLeaf() {
continue
}

oldKey := node.Key()
oldValue := node.Value()

mapping, ok := mappings[string(oldKey)]
if !ok {
return fmt.Errorf("mapping not found for key %x", oldKey)
}

registerID := flow.NewRegisterID(mapping.Owner, mapping.Controller, mapping.Key)
key := state.RegisterIDToKey(registerID)

path, err := pathfinder.KeyToPath(key, 0)
if err != nil {
return fmt.Errorf("cannot convert key to path: %w", err)
}

paths = append(paths, path)
payload := ledger.NewPayload(key, oldValue)
payloads = append(payloads, payload)
}

update := &ledger.TrieUpdate{
RootHash: forest.GetEmptyRootHash(),
Paths: paths,
Payloads: payloads,
}

newHash, err := forest.Update(update)
if err != nil {
return fmt.Errorf("cannot update trie with new values: %w", err)
}

log.Info().Msgf("NEW STATE COMMITMENT: %s", hex.EncodeToString(newHash))

log.Info().Msg("writing root checkpoint")

startTime = time.Now()

flattenForest, err := flattener.FlattenForest(mForest)
flattenForest, err := flattener.FlattenForest(forest)
if err != nil {
return fmt.Errorf("cannot flatten forest: %w", err)
}
Expand Down
Loading

0 comments on commit 8b9dd47

Please sign in to comment.