Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add index mode #46

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestWallet(t *testing.T) {
}
cm := chain.NewManager(dbstore, tipState)

ws, err := sqlite.OpenDatabase(filepath.Join(t.TempDir(), "wallets.db"), log.Named("sqlite3"))
ws, err := sqlite.OpenDatabase(filepath.Join(t.TempDir(), "wallets.db"), sqlite.WithLogger(log.Named("sqlite3")))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -254,7 +254,7 @@ func TestV2(t *testing.T) {
t.Fatal(err)
}
cm := chain.NewManager(dbstore, tipState)
ws, err := sqlite.OpenDatabase(filepath.Join(t.TempDir(), "wallets.db"), log.Named("sqlite3"))
ws, err := sqlite.OpenDatabase(filepath.Join(t.TempDir(), "wallets.db"), sqlite.WithLogger(log.Named("sqlite3")))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -465,7 +465,7 @@ func TestP2P(t *testing.T) {
}
log1 := logger.Named("one")
cm1 := chain.NewManager(dbstore1, tipState)
store1, err := sqlite.OpenDatabase(filepath.Join(t.TempDir(), "wallets.db"), log1.Named("sqlite3"))
store1, err := sqlite.OpenDatabase(filepath.Join(t.TempDir(), "wallets.db"), sqlite.WithLogger(log1.Named("sqlite3")))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -504,7 +504,7 @@ func TestP2P(t *testing.T) {
}
log2 := logger.Named("two")
cm2 := chain.NewManager(dbstore2, tipState)
store2, err := sqlite.OpenDatabase(filepath.Join(t.TempDir(), "wallets.db"), log2.Named("sqlite3"))
store2, err := sqlite.OpenDatabase(filepath.Join(t.TempDir(), "wallets.db"), sqlite.WithLogger(log2.Named("sqlite3")))
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/walletd/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func newNode(addr, dir string, chainNetwork string, useUPNP bool, log *zap.Logge
syncerAddr = net.JoinHostPort("127.0.0.1", port)
}

store, err := sqlite.OpenDatabase(filepath.Join(dir, "walletd.sqlite3"), log.Named("sqlite3"))
store, err := sqlite.OpenDatabase(filepath.Join(dir, "walletd.sqlite3"), sqlite.WithLogger(log.Named("sqlite3")))
if err != nil {
return nil, fmt.Errorf("failed to open wallet database: %w", err)
}
Expand Down
51 changes: 51 additions & 0 deletions persist/sqlite/addresses.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package sqlite

import (
"database/sql"
"errors"
"fmt"

"go.sia.tech/core/types"
"go.sia.tech/walletd/wallet"
)

// AddressBalance returns the balance of a single address.
func (s *Store) AddressBalance(address types.Address) (balance wallet.Balance, err error) {
err = s.transaction(func(tx *txn) error {
const query = `SELECT siacoin_balance, immature_siacoin_balance, siafund_balance FROM sia_addresses WHERE sia_address=$1`
return tx.QueryRow(query, encode(address)).Scan(decode(&balance.Siacoins), decode(&balance.ImmatureSiacoins), &balance.Siafunds)
})
if errors.Is(err, sql.ErrNoRows) {
return wallet.Balance{}, wallet.ErrNotFound
}
return
}

// AddressEvents returns the events related to a single address.
func (s *Store) AddressEvents(address types.Address, offset, limit int) (events []wallet.Event, err error) {
err = s.transaction(func(tx *txn) error {
const query = `SELECT ev.id, ev.event_id, ev.maturity_height, ev.date_created, ci.height, ci.block_id, ev.event_type, ev.event_data
FROM events ev
INNER JOIN chain_indices ci ON (ev.index_id = ci.id)
INNER JOIN event_addresses ea ON (ev.id = ea.event_id)
INNER JOIN sia_addresses sa ON (ea.address_id = sa.id)
WHERE sa.sia_address = $1
ORDER BY ev.maturity_height DESC
LIMIT $2 OFFSET $3`
rows, err := tx.Query(query, encode(address), limit, offset)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
_, event, err := scanEvent(rows)
if err != nil {
return fmt.Errorf("failed to scan event: %w", err)
}
event.Relevant = []types.Address{address} // only the address is relevant
events = append(events, event)
}
return rows.Err()
})
return
}
23 changes: 23 additions & 0 deletions persist/sqlite/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package sqlite

import (
"go.uber.org/zap"
)

// An Option is a functional option for configuring a Store.
type Option func(*Store)

// WithLogger sets the logger used by the Store.
func WithLogger(log *zap.Logger) Option {
return func(s *Store) {
s.log = log
}
}

// WithFullIndex sets the store to index all transactions and outputs, rather
// than just those relevant to the wallet.
func WithFullIndex() Option {
return func(s *Store) {
s.fullIndex = true
}
}
78 changes: 62 additions & 16 deletions persist/sqlite/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
type updateTx struct {
tx *txn

fullIndex bool
relevantAddresses map[types.Address]bool
}

Expand Down Expand Up @@ -49,6 +50,10 @@ func (ut *updateTx) SiacoinStateElements() ([]types.StateElement, error) {
}

func (ut *updateTx) UpdateSiacoinStateElements(elements []types.StateElement) error {
if ut.fullIndex {
panic("UpdateSiafundStateElements should not be called with full index enabled")
}

const query = `UPDATE siacoin_elements SET merkle_proof=$1, leaf_index=$2 WHERE id=$3 RETURNING id`
stmt, err := ut.tx.Prepare(query)
if err != nil {
Expand Down Expand Up @@ -86,6 +91,10 @@ func (ut *updateTx) SiafundStateElements() ([]types.StateElement, error) {
}

func (ut *updateTx) UpdateSiafundStateElements(elements []types.StateElement) error {
if ut.fullIndex {
panic("UpdateSiafundStateElements should not be called with full index enabled")
}

const query = `UPDATE siafund_elements SET merkle_proof=$1, leaf_index=$2 WHERE id=$3 RETURNING id`
stmt, err := ut.tx.Prepare(query)
if err != nil {
Expand All @@ -103,7 +112,32 @@ func (ut *updateTx) UpdateSiafundStateElements(elements []types.StateElement) er
return nil
}

// UpdateStateTree updates the state tree with the given changes.
func (ut *updateTx) UpdateStateTree(changes []wallet.TreeNodeUpdate) error {
if !ut.fullIndex {
panic("UpdateStateTree should not be called with full index disabled")
}

stmt, err := ut.tx.Prepare(`INSERT INTO state_tree (row, column, value) VALUES($1, $2, $3) ON CONFLICT (row, column) DO UPDATE SET value=EXCLUDED.value;`)
if err != nil {
return fmt.Errorf("failed to prepare statement: %w", err)
}
defer stmt.Close()

for _, change := range changes {
_, err := stmt.Exec(change.Row, change.Column, encode(change.Hash))
if err != nil {
return fmt.Errorf("failed to execute statement: %w", err)
}
}
return nil
}

func (ut *updateTx) AddressRelevant(addr types.Address) (bool, error) {
if ut.fullIndex {
panic("AddressRelevant should not be called with full index enabled")
}

if relevant, ok := ut.relevantAddresses[addr]; ok {
return relevant, nil
}
Expand All @@ -122,19 +156,25 @@ func (ut *updateTx) AddressRelevant(addr types.Address) (bool, error) {

func (ut *updateTx) AddressBalance(addr types.Address) (balance wallet.Balance, err error) {
err = ut.tx.QueryRow(`SELECT siacoin_balance, immature_siacoin_balance, siafund_balance FROM sia_addresses WHERE sia_address=$1`, encode(addr)).Scan(decode(&balance.Siacoins), decode(&balance.ImmatureSiacoins), &balance.Siafunds)
if errors.Is(err, sql.ErrNoRows) {
if ut.fullIndex {
return wallet.Balance{}, nil
}
return wallet.Balance{}, wallet.ErrNotFound
}
return
}

func (ut *updateTx) UpdateBalances(balances []wallet.AddressBalance) error {
const query = `UPDATE sia_addresses SET siacoin_balance=$1, immature_siacoin_balance=$2, siafund_balance=$3 WHERE sia_address=$4`
const query = `INSERT INTO sia_addresses (sia_address, siacoin_balance, immature_siacoin_balance, siafund_balance) VALUES ($1, $2, $3, $4) ON CONFLICT (sia_address) DO UPDATE SET siacoin_balance=EXCLUDED.siacoin_balance, immature_siacoin_balance=EXCLUDED.immature_siacoin_balance, siafund_balance=EXCLUDED.siafund_balance;`
stmt, err := ut.tx.Prepare(query)
if err != nil {
return fmt.Errorf("failed to prepare statement: %w", err)
}
defer stmt.Close()

for _, ab := range balances {
_, err := stmt.Exec(encode(ab.Balance.Siacoins), encode(ab.Balance.ImmatureSiacoins), ab.Balance.Siafunds, encode(ab.Address))
_, err := stmt.Exec(encode(ab.Address), encode(ab.Balance.Siacoins), encode(ab.Balance.ImmatureSiacoins), ab.Balance.Siafunds)
if err != nil {
return fmt.Errorf("failed to execute statement: %w", err)
}
Expand Down Expand Up @@ -170,20 +210,20 @@ func (ut *updateTx) AddSiacoinElements(elements []types.SiacoinElement) error {
}
defer addrStmt.Close()

inserStmt, err := ut.tx.Prepare(`INSERT INTO siacoin_elements (id, siacoin_value, merkle_proof, leaf_index, maturity_height, address_id) VALUES ($1, $2, $3, $4, $5, $6)`)
insertStmt, err := ut.tx.Prepare(`INSERT INTO siacoin_elements (id, siacoin_value, merkle_proof, leaf_index, maturity_height, address_id) VALUES ($1, $2, $3, $4, $5, $6)`)
if err != nil {
return fmt.Errorf("failed to prepare insert statement: %w", err)
}
defer inserStmt.Close()
defer insertStmt.Close()

for _, se := range elements {
var addressID int64
err := addrStmt.QueryRow(encode(se.SiacoinOutput.Address), encode(types.ZeroCurrency), 0).Scan(&addressID)
err = addrStmt.QueryRow(encode(se.SiacoinOutput.Address), encode(types.ZeroCurrency), 0).Scan(&addressID)
if err != nil {
return fmt.Errorf("failed to query address: %w", err)
}

_, err = inserStmt.Exec(encode(se.ID), encode(se.SiacoinOutput.Value), encodeSlice(se.MerkleProof), se.LeafIndex, se.MaturityHeight, addressID)
_, err = insertStmt.Exec(encode(se.ID), encode(se.SiacoinOutput.Value), encodeSlice(se.MerkleProof), se.LeafIndex, se.MaturityHeight, addressID)
if err != nil {
return fmt.Errorf("failed to execute statement: %w", err)
}
Expand Down Expand Up @@ -215,20 +255,20 @@ func (ut *updateTx) AddSiafundElements(elements []types.SiafundElement) error {
}
defer addrStmt.Close()

inserStmt, err := ut.tx.Prepare(`INSERT INTO siafund_elements (id, siafund_value, merkle_proof, leaf_index, claim_start, address_id) VALUES ($1, $2, $3, $4, $5, $6)`)
insertStmt, err := ut.tx.Prepare(`INSERT INTO siafund_elements (id, siafund_value, merkle_proof, leaf_index, claim_start, address_id) VALUES ($1, $2, $3, $4, $5, $6)`)
if err != nil {
return fmt.Errorf("failed to prepare statement: %w", err)
}
defer inserStmt.Close()
defer insertStmt.Close()

for _, se := range elements {
var addressID int64
err := addrStmt.QueryRow(encode(se.SiafundOutput.Address), encode(types.ZeroCurrency), 0).Scan(&addressID)
err = addrStmt.QueryRow(encode(se.SiafundOutput.Address), encode(types.ZeroCurrency), 0).Scan(&addressID)
if err != nil {
return fmt.Errorf("failed to query address: %w", err)
}

_, err = inserStmt.Exec(encode(se.ID), se.SiafundOutput.Value, encodeSlice(se.MerkleProof), se.LeafIndex, encode(se.ClaimStart), addressID)
_, err = insertStmt.Exec(encode(se.ID), se.SiafundOutput.Value, encodeSlice(se.MerkleProof), se.LeafIndex, encode(se.ClaimStart), addressID)
if err != nil {
return fmt.Errorf("failed to execute statement: %w", err)
}
Expand All @@ -254,7 +294,7 @@ func (ut *updateTx) RemoveSiafundElements(elements []types.SiafundOutputID) erro
}

func (ut *updateTx) AddEvents(events []wallet.Event) error {
indexStmt, err := ut.tx.Prepare(`INSERT INTO chain_indices (height, block_id) VALUES ($1, $2) ON CONFLICT (block_id) DO UPDATE SET height=EXCLUDED.height RETURNING id`)
indexStmt, err := insertIndexStmt(ut.tx)
if err != nil {
return fmt.Errorf("failed to prepare index statement: %w", err)
}
Expand Down Expand Up @@ -321,10 +361,10 @@ func (ut *updateTx) AddEvents(events []wallet.Event) error {
return nil
}

// RevertEvents reverts the events that were added in the given block.
func (ut *updateTx) RevertEvents(blockID types.BlockID) error {
// RevertEvents reverts any events that were added by the index
func (ut *updateTx) RevertEvents(index types.ChainIndex) error {
var id int64
err := ut.tx.QueryRow(`DELETE FROM chain_indices WHERE block_id=$1 RETURNING id`, encode(blockID)).Scan(&id)
err := ut.tx.QueryRow(`DELETE FROM chain_indices WHERE block_id=$1 AND height=$2 RETURNING id`, encode(index.ID), index.Height).Scan(&id)
if errors.Is(err, sql.ErrNoRows) {
return nil
}
Expand All @@ -341,10 +381,11 @@ func (s *Store) ProcessChainApplyUpdate(cau *chain.ApplyUpdate, mayCommit bool)
return s.transaction(func(tx *txn) error {
utx := &updateTx{
tx: tx,
fullIndex: s.fullIndex,
relevantAddresses: make(map[types.Address]bool),
}

if err := wallet.ApplyChainUpdates(utx, s.updates); err != nil {
if err := wallet.ApplyChainUpdates(utx, s.updates, s.fullIndex); err != nil {
return fmt.Errorf("failed to apply updates: %w", err)
} else if err := setLastCommittedIndex(tx, cau.State.Index); err != nil {
return fmt.Errorf("failed to set last committed index: %w", err)
Expand Down Expand Up @@ -373,10 +414,11 @@ func (s *Store) ProcessChainRevertUpdate(cru *chain.RevertUpdate) error {
return s.transaction(func(tx *txn) error {
utx := &updateTx{
tx: tx,
fullIndex: s.fullIndex,
relevantAddresses: make(map[types.Address]bool),
}

if err := wallet.RevertChainUpdate(utx, cru); err != nil {
if err := wallet.RevertChainUpdate(utx, cru, s.fullIndex); err != nil {
return fmt.Errorf("failed to revert update: %w", err)
} else if err := setLastCommittedIndex(tx, cru.State.Index); err != nil {
return fmt.Errorf("failed to set last committed index: %w", err)
Expand All @@ -399,3 +441,7 @@ func setLastCommittedIndex(tx *txn, index types.ChainIndex) error {
func insertAddressStatement(tx *txn) (*stmt, error) {
return tx.Prepare(`INSERT INTO sia_addresses (sia_address, siacoin_balance, immature_siacoin_balance, siafund_balance) VALUES ($1, $2, $2, $3) ON CONFLICT (sia_address) DO UPDATE SET sia_address=EXCLUDED.sia_address RETURNING id`)
}

func insertIndexStmt(tx *txn) (*stmt, error) {
return tx.Prepare(`INSERT INTO chain_indices (height, block_id) VALUES ($1, $2) ON CONFLICT (block_id) DO UPDATE SET height=EXCLUDED.height RETURNING id`)
}
Loading
Loading