Skip to content

Commit

Permalink
sqlite, wallet: add full index mode
Browse files Browse the repository at this point in the history
  • Loading branch information
n8maninger committed Feb 21, 2024
1 parent 59281e3 commit d24cb5b
Show file tree
Hide file tree
Showing 10 changed files with 461 additions and 157 deletions.
51 changes: 51 additions & 0 deletions persist/sqlite/addresses.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package sqlite

import (
"database/sql"
"errors"
"fmt"

"go.sia.tech/core/types"
"go.sia.tech/walletd/wallet"
)

// AddressBalance returns the balance of a single address.
func (s *Store) AddressBalance(address types.Address) (balance wallet.Balance, err error) {
err = s.transaction(func(tx *txn) error {
const query = `SELECT siacoin_balance, immature_siacoin_balance, siafund_balance FROM sia_addresses WHERE sia_address=$1`
return tx.QueryRow(query, encode(address)).Scan(decode(&balance.Siacoins), decode(&balance.ImmatureSiacoins), &balance.Siafunds)
})
if errors.Is(err, sql.ErrNoRows) {
return wallet.Balance{}, wallet.ErrNotFound
}
return
}

// AddressEvents returns the events related to a single address.
func (s *Store) AddressEvents(address types.Address, offset, limit int) (events []wallet.Event, err error) {
err = s.transaction(func(tx *txn) error {
const query = `SELECT ev.id, ev.event_id, ev.maturity_height, ev.date_created, ci.height, ci.block_id, ev.event_type, ev.event_data
FROM events ev
INNER JOIN chain_indices ci ON (ev.index_id = ci.id)
INNER JOIN event_addresses ea ON (ev.id = ea.event_id)
INNER JOIN sia_addresses sa ON (ea.address_id = sa.id)
WHERE sa.sia_address = $1
ORDER BY ev.maturity_height DESC
LIMIT $2 OFFSET $3`
rows, err := tx.Query(query, encode(address), limit, offset)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
_, event, err := scanEvent(rows)
if err != nil {
return fmt.Errorf("failed to scan event: %w", err)
}
event.Relevant = []types.Address{address} // only the address is relevant
events = append(events, event)
}
return rows.Err()
})
return
}
7 changes: 4 additions & 3 deletions persist/sqlite/config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package sqlite

import (
"go.sia.tech/walletd/wallet"
"go.uber.org/zap"
)

Expand All @@ -15,8 +14,10 @@ func WithLogger(log *zap.Logger) Option {
}
}

func WithIndexMode(mode wallet.IndexMode) Option {
// WithFullIndex sets the store to index all transactions and outputs, rather
// than just those relevant to the wallet.
func WithFullIndex() Option {
return func(s *Store) {
s.indexMode = mode
s.fullIndex = true
}
}
50 changes: 46 additions & 4 deletions persist/sqlite/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
type updateTx struct {
tx *txn

fullIndex bool
relevantAddresses map[types.Address]bool
}

Expand Down Expand Up @@ -49,6 +50,10 @@ func (ut *updateTx) SiacoinStateElements() ([]types.StateElement, error) {
}

func (ut *updateTx) UpdateSiacoinStateElements(elements []types.StateElement) error {
if ut.fullIndex {
panic("UpdateSiafundStateElements should not be called with full index enabled")
}

const query = `UPDATE siacoin_elements SET merkle_proof=$1, leaf_index=$2 WHERE id=$3 RETURNING id`
stmt, err := ut.tx.Prepare(query)
if err != nil {
Expand Down Expand Up @@ -86,6 +91,10 @@ func (ut *updateTx) SiafundStateElements() ([]types.StateElement, error) {
}

func (ut *updateTx) UpdateSiafundStateElements(elements []types.StateElement) error {
if ut.fullIndex {
panic("UpdateSiafundStateElements should not be called with full index enabled")
}

const query = `UPDATE siafund_elements SET merkle_proof=$1, leaf_index=$2 WHERE id=$3 RETURNING id`
stmt, err := ut.tx.Prepare(query)
if err != nil {
Expand All @@ -103,7 +112,32 @@ func (ut *updateTx) UpdateSiafundStateElements(elements []types.StateElement) er
return nil
}

// UpdateStateTree updates the state tree with the given changes.
func (ut *updateTx) UpdateStateTree(changes []wallet.TreeNodeUpdate) error {
if !ut.fullIndex {
panic("UpdateStateTree should not be called with full index disabled")
}

stmt, err := ut.tx.Prepare(`INSERT INTO state_tree (row, column, value) VALUES($1, $2, $3) ON CONFLICT (row, column) DO UPDATE SET value=EXCLUDED.value;`)
if err != nil {
return fmt.Errorf("failed to prepare statement: %w", err)
}
defer stmt.Close()

for _, change := range changes {
_, err := stmt.Exec(change.Row, change.Column, encode(change.Hash))
if err != nil {
return fmt.Errorf("failed to execute statement: %w", err)
}
}
return nil
}

func (ut *updateTx) AddressRelevant(addr types.Address) (bool, error) {
if ut.fullIndex {
panic("AddressRelevant should not be called with full index enabled")
}

if relevant, ok := ut.relevantAddresses[addr]; ok {
return relevant, nil
}
Expand All @@ -122,19 +156,25 @@ func (ut *updateTx) AddressRelevant(addr types.Address) (bool, error) {

func (ut *updateTx) AddressBalance(addr types.Address) (balance wallet.Balance, err error) {
err = ut.tx.QueryRow(`SELECT siacoin_balance, immature_siacoin_balance, siafund_balance FROM sia_addresses WHERE sia_address=$1`, encode(addr)).Scan(decode(&balance.Siacoins), decode(&balance.ImmatureSiacoins), &balance.Siafunds)
if errors.Is(err, sql.ErrNoRows) {
if ut.fullIndex {
return wallet.Balance{}, nil
}
return wallet.Balance{}, wallet.ErrNotFound
}
return
}

func (ut *updateTx) UpdateBalances(balances []wallet.AddressBalance) error {
const query = `UPDATE sia_addresses SET siacoin_balance=$1, immature_siacoin_balance=$2, siafund_balance=$3 WHERE sia_address=$4`
const query = `INSERT INTO sia_addresses (sia_address, siacoin_balance, immature_siacoin_balance, siafund_balance) VALUES ($1, $2, $3, $4) ON CONFLICT (sia_address) DO UPDATE SET siacoin_balance=EXCLUDED.siacoin_balance, immature_siacoin_balance=EXCLUDED.immature_siacoin_balance, siafund_balance=EXCLUDED.siafund_balance;`
stmt, err := ut.tx.Prepare(query)
if err != nil {
return fmt.Errorf("failed to prepare statement: %w", err)
}
defer stmt.Close()

for _, ab := range balances {
_, err := stmt.Exec(encode(ab.Balance.Siacoins), encode(ab.Balance.ImmatureSiacoins), ab.Balance.Siafunds, encode(ab.Address))
_, err := stmt.Exec(encode(ab.Address), encode(ab.Balance.Siacoins), encode(ab.Balance.ImmatureSiacoins), ab.Balance.Siafunds)
if err != nil {
return fmt.Errorf("failed to execute statement: %w", err)
}
Expand Down Expand Up @@ -341,10 +381,11 @@ func (s *Store) ProcessChainApplyUpdate(cau *chain.ApplyUpdate, mayCommit bool)
return s.transaction(func(tx *txn) error {
utx := &updateTx{
tx: tx,
fullIndex: s.fullIndex,
relevantAddresses: make(map[types.Address]bool),
}

if err := wallet.ApplyChainUpdates(utx, s.updates); err != nil {
if err := wallet.ApplyChainUpdates(utx, s.updates, s.fullIndex); err != nil {
return fmt.Errorf("failed to apply updates: %w", err)
} else if err := setLastCommittedIndex(tx, cau.State.Index); err != nil {
return fmt.Errorf("failed to set last committed index: %w", err)
Expand Down Expand Up @@ -373,10 +414,11 @@ func (s *Store) ProcessChainRevertUpdate(cru *chain.RevertUpdate) error {
return s.transaction(func(tx *txn) error {
utx := &updateTx{
tx: tx,
fullIndex: s.fullIndex,
relevantAddresses: make(map[types.Address]bool),
}

if err := wallet.RevertChainUpdate(utx, cru); err != nil {
if err := wallet.RevertChainUpdate(utx, cru, s.fullIndex); err != nil {
return fmt.Errorf("failed to revert update: %w", err)
} else if err := setLastCommittedIndex(tx, cru.State.Index); err != nil {
return fmt.Errorf("failed to set last committed index: %w", err)
Expand Down
154 changes: 154 additions & 0 deletions persist/sqlite/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,3 +588,157 @@ func TestV2(t *testing.T) {
t.Fatalf("expected address %v, got %v", addr, events[0].Relevant[0])
}
}

func TestFullIndex(t *testing.T) {
log := zaptest.NewLogger(t)
dir := t.TempDir()
db, err := sqlite.OpenDatabase(filepath.Join(dir, "walletd.sqlite3"),
sqlite.WithLogger(log.Named("sqlite3")),
sqlite.WithFullIndex())
if err != nil {
t.Fatal(err)
}
defer db.Close()

bdb, err := coreutils.OpenBoltChainDB(filepath.Join(dir, "consensus.db"))
if err != nil {
t.Fatal(err)
}
defer bdb.Close()

network, genesisBlock := testV1Network()

store, genesisState, err := chain.NewDBStore(bdb, network, genesisBlock)
if err != nil {
t.Fatal(err)
}
defer store.Close()

cm := chain.NewManager(store, genesisState)

if err := cm.AddSubscriber(db, types.ChainIndex{}); err != nil {
t.Fatal(err)
}

siacoinAirdropAddr, err := types.ParseAddress("addr:3d7f707d05f2e0ec7ccc9220ed7c8af3bc560fbee84d068c2cc28151d617899e1ee8bc069946")
if err != nil {
t.Fatal(err)
}
siacoinAirdropValue := types.Siacoins(1).Mul64(1e12)

siafundAirdropAddr, err := types.ParseAddress("addr:053b2def3cbdd078c19d62ce2b4f0b1a3c5e0ffbeeff01280efb1f8969b2f5bb4fdc680f0807")
if err != nil {
t.Fatal(err)
}
siafundAirdropValue := uint64(10000)

balance, err := db.AddressBalance(siacoinAirdropAddr)
if err != nil {
t.Fatal(err)
} else if !balance.Siacoins.Equals(siacoinAirdropValue) {
t.Fatalf("expected %v, got %v", siacoinAirdropValue, balance.Siacoins)
}

events, err := db.AddressEvents(siacoinAirdropAddr, 0, 100)
if err != nil {
t.Fatal(err)
} else if len(events) != 1 {
t.Fatalf("expected 1 event, got %v", len(events))
} else if events[0].Data.EventType() != wallet.EventTypeTransaction {
t.Fatalf("expected transaction event, got %v", events[0].Data.EventType())
} else if events[0].ID != types.Hash256(genesisBlock.Transactions[0].ID()) {
t.Fatalf("expected transaction ID %q got %q", genesisBlock.Transactions[0].ID(), events[0].ID)
}

tx, ok := events[0].Data.(*wallet.EventTransaction)
if !ok {
t.Fatalf("expected transaction event, got %v", events[0].Data.EventType())
} else if tx.SiacoinOutputs[0].SiacoinOutput.Address != siacoinAirdropAddr {
t.Fatalf("expected address %v, got %v", siacoinAirdropAddr, tx.SiacoinOutputs[0].SiacoinOutput.Address)
} else if !tx.SiacoinOutputs[0].SiacoinOutput.Value.Equals(siacoinAirdropValue) {
t.Fatalf("expected %v, got %v", siacoinAirdropValue, tx.SiacoinOutputs[0].SiacoinOutput.Value)
}

balance, err = db.AddressBalance(siafundAirdropAddr)
if err != nil {
t.Fatal(err)
} else if balance.Siafunds != siafundAirdropValue {
t.Fatalf("expected %v, got %v", siafundAirdropValue, balance.Siafunds)
}

pk := types.GeneratePrivateKey()
addr := types.StandardUnlockHash(pk.PublicKey())

expectedPayout := cm.TipState().BlockReward()
block := mineBlock(cm.TipState(), nil, addr)
minerPayoutID := block.ID().MinerOutputID(0)
// mine a block sending the payout to the wallet
if err := cm.AddBlocks([]types.Block{block}); err != nil {
t.Fatal(err)
}

// check that the payout was received
balance, err = db.AddressBalance(addr)
if err != nil {
t.Fatal(err)
} else if !balance.ImmatureSiacoins.Equals(expectedPayout) {
t.Fatalf("expected %v, got %v", expectedPayout, balance.ImmatureSiacoins)
}

// check that a payout event was recorded
events, err = db.AddressEvents(addr, 0, 100)
if err != nil {
t.Fatal(err)
} else if len(events) != 1 {
t.Fatalf("expected 1 event, got %v", len(events))
} else if events[0].Data.EventType() != wallet.EventTypeMinerPayout {
t.Fatalf("expected payout event, got %v", events[0].Data.EventType())
} else if events[0].ID != types.Hash256(minerPayoutID) {
t.Fatalf("expected %v, got %v", minerPayoutID, events[0].ID)
}

// mine until the payout matures
maturityHeight := cm.TipState().MaturityHeight() + 1
for i := cm.TipState().Index.Height; i < maturityHeight; i++ {
if err := cm.AddBlocks([]types.Block{mineBlock(cm.TipState(), nil, types.VoidAddress)}); err != nil {
t.Fatal(err)
}
}

// check that the balance matured
balance, err = db.AddressBalance(addr)
if err != nil {
t.Fatal(err)
} else if !balance.ImmatureSiacoins.IsZero() {
t.Fatalf("expected 0, got %v", balance.ImmatureSiacoins)
} else if !balance.Siacoins.Equals(expectedPayout) {
t.Fatalf("expected %v, got %v", expectedPayout, balance.Siacoins)
}

// add the address to a wallet
if err := db.AddWallet("test", nil); err != nil {
t.Fatal(err)
} else if err := db.AddAddress("test", addr, nil); err != nil {
t.Fatal(err)
}

// check that the wallet balance is correct
walletBalance, err := db.WalletBalance("test")
if err != nil {
t.Fatal(err)
} else if !walletBalance.Siacoins.Equals(expectedPayout) {
t.Fatalf("expected %v, got %v", expectedPayout, walletBalance.Siacoins)
}

// check that the payout event was associated with the wallet
events, err = db.WalletEvents("test", 0, 100)
if err != nil {
t.Fatal(err)
} else if len(events) != 1 {
t.Fatalf("expected 1 event, got %v", len(events))
} else if events[0].Data.EventType() != wallet.EventTypeMinerPayout {
t.Fatalf("expected payout event, got %v", events[0].Data.EventType())
} else if events[0].ID != types.Hash256(minerPayoutID) {
t.Fatalf("expected %v, got %v", minerPayoutID, events[0].ID)
}
}
7 changes: 7 additions & 0 deletions persist/sqlite/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ CREATE TABLE siafund_elements (
);
CREATE INDEX siafund_elements_address_id ON siafund_elements (address_id);

CREATE TABLE state_tree (
row INTEGER,
column INTEGER,
value BLOB NOT NULL,
PRIMARY KEY (row, column)
);

CREATE TABLE wallets (
id TEXT PRIMARY KEY NOT NULL,
extra_data BLOB NOT NULL
Expand Down
9 changes: 3 additions & 6 deletions persist/sqlite/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"time"

"go.sia.tech/coreutils/chain"
"go.sia.tech/walletd/wallet"
"go.uber.org/zap"
"lukechampine.com/frand"
)
Expand All @@ -21,7 +20,7 @@ type (
db *sql.DB

log *zap.Logger
indexMode wallet.IndexMode
fullIndex bool

updates []*chain.ApplyUpdate
}
Expand Down Expand Up @@ -116,10 +115,8 @@ func OpenDatabase(fp string, opts ...Option) (*Store, error) {
return nil, err
}
store := &Store{
db: db,

log: zap.NewNop(),
indexMode: wallet.IndexModeDefault,
db: db,
log: zap.NewNop(),
}
for _, opt := range opts {
opt(store)
Expand Down
Loading

0 comments on commit d24cb5b

Please sign in to comment.