Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: move utxo selection out of API and into wallet manager #210

Merged
merged 14 commits into from
Jan 9, 2025
Merged
1 change: 0 additions & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ type BalanceResponse wallet.Balance
type WalletReserveRequest struct {
SiacoinOutputs []types.SiacoinOutputID `json:"siacoinOutputs"`
SiafundOutputs []types.SiafundOutputID `json:"siafundOutputs"`
Duration time.Duration `json:"duration"`
}

// A WalletUpdateRequest is a request to update a wallet
Expand Down
1 change: 0 additions & 1 deletion api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,6 @@ func (c *WalletClient) Reserve(sc []types.SiacoinOutputID, sf []types.SiafundOut
err = c.c.POST(fmt.Sprintf("/wallets/%v/reserve", c.id), WalletReserveRequest{
SiacoinOutputs: sc,
SiafundOutputs: sf,
Duration: duration,
}, nil)
return
}
Expand Down
183 changes: 56 additions & 127 deletions api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@ import (
"fmt"
"net/http"
"net/http/pprof"
"reflect"
"runtime"
"sync"
"time"

"go.sia.tech/jape"
"go.uber.org/zap"
"lukechampine.com/frand"

"go.sia.tech/core/consensus"
"go.sia.tech/core/gateway"
Expand Down Expand Up @@ -101,6 +99,8 @@ type (
Addresses(id wallet.ID) ([]wallet.Address, error)
WalletEvents(id wallet.ID, offset, limit int) ([]wallet.Event, error)
WalletUnconfirmedEvents(id wallet.ID) ([]wallet.Event, error)
SelectSiacoinElements(walletID wallet.ID, amount types.Currency, useUnconfirmed bool) ([]types.SiacoinElement, types.ChainIndex, types.Currency, error)
SelectSiafundElements(walletID wallet.ID, amount uint64) ([]types.SiafundElement, types.ChainIndex, uint64, error)
UnspentSiacoinOutputs(id wallet.ID, offset, limit int) ([]types.SiacoinElement, error)
UnspentSiafundOutputs(id wallet.ID, offset, limit int) ([]types.SiafundElement, error)
WalletBalance(id wallet.ID) (wallet.Balance, error)
Expand All @@ -116,7 +116,8 @@ type (
SiacoinElement(types.SiacoinOutputID) (types.SiacoinElement, error)
SiafundElement(types.SiafundOutputID) (types.SiafundElement, error)

Reserve(ids []types.Hash256, duration time.Duration) error
Reserve([]types.Hash256) error
Release([]types.Hash256)
}
)

Expand All @@ -131,10 +132,6 @@ type server struct {
s Syncer
wm WalletManager

// for walletsReserveHandler
mu sync.Mutex
used map[types.Hash256]bool

scanMu sync.Mutex // for resubscribe
scanInProgress bool
scanInfo RescanResponse
Expand Down Expand Up @@ -574,95 +571,62 @@ func (s *server) walletsReserveHandler(jc jape.Context) {
ids = append(ids, types.Hash256(id))
}

if jc.Check("couldn't reserve outputs", s.wm.Reserve(ids, wrr.Duration)) != nil {
if jc.Check("couldn't reserve outputs", s.wm.Reserve(ids)) != nil {
return
}
jc.EmptyResonse()
}

func (s *server) walletsReleaseHandler(jc jape.Context) {
var name string
var wrr WalletReleaseRequest
if jc.DecodeParam("name", &name) != nil || jc.Decode(&wrr) != nil {
if jc.Decode(&wrr) != nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()

ids := make([]types.Hash256, 0, len(wrr.SiacoinOutputs)+len(wrr.SiafundOutputs))
for _, id := range wrr.SiacoinOutputs {
delete(s.used, types.Hash256(id))
ids = append(ids, types.Hash256(id))
}
for _, id := range wrr.SiafundOutputs {
delete(s.used, types.Hash256(id))
ids = append(ids, types.Hash256(id))
}
s.wm.Release(ids)
jc.EmptyResonse()
}

func (s *server) walletsFundHandler(jc jape.Context) {
fundTxn := func(txn *types.Transaction, amount types.Currency, utxos []types.SiacoinElement, changeAddr types.Address, pool []types.Transaction) ([]types.Hash256, error) {
s.mu.Lock()
defer s.mu.Unlock()
if amount.IsZero() {
return nil, nil
}
inPool := make(map[types.Hash256]bool)
for _, ptxn := range pool {
for _, in := range ptxn.SiacoinInputs {
inPool[types.Hash256(in.ParentID)] = true
}
}
frand.Shuffle(len(utxos), reflect.Swapper(utxos))
var outputSum types.Currency
var fundingElements []types.SiacoinElement
for _, sce := range utxos {
if s.used[types.Hash256(sce.ID)] || inPool[types.Hash256(sce.ID)] {
continue
}
fundingElements = append(fundingElements, sce)
outputSum = outputSum.Add(sce.SiacoinOutput.Value)
if outputSum.Cmp(amount) >= 0 {
break
}
}
if outputSum.Cmp(amount) < 0 {
return nil, errors.New("insufficient balance")
} else if outputSum.Cmp(amount) > 0 {
if changeAddr == types.VoidAddress {
return nil, errors.New("change address must be specified")
}
txn.SiacoinOutputs = append(txn.SiacoinOutputs, types.SiacoinOutput{
Value: outputSum.Sub(amount),
Address: changeAddr,
})
}

toSign := make([]types.Hash256, len(fundingElements))
for i, sce := range fundingElements {
txn.SiacoinInputs = append(txn.SiacoinInputs, types.SiacoinInput{
ParentID: types.SiacoinOutputID(sce.ID),
// UnlockConditions left empty for client to fill in
})
toSign[i] = types.Hash256(sce.ID)
s.used[types.Hash256(sce.ID)] = true
}

return toSign, nil
}

var id wallet.ID
var wfr WalletFundRequest
if jc.DecodeParam("id", &id) != nil || jc.Decode(&wfr) != nil {
return
}
utxos, err := s.wm.UnspentSiacoinOutputs(id, 0, 1000)
utxos, _, change, err := s.wm.SelectSiacoinElements(id, wfr.Amount, false)
if jc.Check("couldn't get utxos to fund transaction", err) != nil {
return
}

txn := wfr.Transaction
toSign, err := fundTxn(&txn, wfr.Amount, utxos, wfr.ChangeAddress, s.cm.PoolTransactions())
if jc.Check("couldn't fund transaction", err) != nil {
return
if !change.IsZero() {
if wfr.ChangeAddress == types.VoidAddress {
jc.Error(errors.New("change address must be specified"), http.StatusBadRequest)
return
}

txn.SiacoinOutputs = append(txn.SiacoinOutputs, types.SiacoinOutput{
Value: change,
Address: wfr.ChangeAddress,
})
}

toSign := make([]types.Hash256, 0, len(utxos))
for _, sce := range utxos {
txn.SiacoinInputs = append(txn.SiacoinInputs, types.SiacoinInput{
ParentID: sce.ID,
// UnlockConditions left empty for client to fill in
})
toSign = append(toSign, types.Hash256(sce.ID))
}

jc.Encode(WalletFundResponse{
Transaction: txn,
ToSign: toSign,
Expand All @@ -671,71 +635,37 @@ func (s *server) walletsFundHandler(jc jape.Context) {
}

func (s *server) walletsFundSFHandler(jc jape.Context) {
fundTxn := func(txn *types.Transaction, amount uint64, utxos []types.SiafundElement, changeAddr, claimAddr types.Address, pool []types.Transaction) ([]types.Hash256, error) {
s.mu.Lock()
defer s.mu.Unlock()
if amount == 0 {
return nil, nil
}
inPool := make(map[types.Hash256]bool)
for _, ptxn := range pool {
for _, in := range ptxn.SiafundInputs {
inPool[types.Hash256(in.ParentID)] = true
}
}
frand.Shuffle(len(utxos), reflect.Swapper(utxos))
var outputSum uint64
var fundingElements []types.SiafundElement
for _, sfe := range utxos {
if s.used[types.Hash256(sfe.ID)] || inPool[types.Hash256(sfe.ID)] {
continue
}
fundingElements = append(fundingElements, sfe)
outputSum += sfe.SiafundOutput.Value
if outputSum >= amount {
break
}
}
if outputSum < amount {
return nil, errors.New("insufficient balance")
} else if outputSum > amount {
if changeAddr == types.VoidAddress {
return nil, errors.New("change address must be specified")
}
txn.SiafundOutputs = append(txn.SiafundOutputs, types.SiafundOutput{
Value: outputSum - amount,
Address: changeAddr,
})
}

toSign := make([]types.Hash256, len(fundingElements))
for i, sfe := range fundingElements {
txn.SiafundInputs = append(txn.SiafundInputs, types.SiafundInput{
ParentID: types.SiafundOutputID(sfe.ID),
ClaimAddress: claimAddr,
// UnlockConditions left empty for client to fill in
})
toSign[i] = types.Hash256(sfe.ID)
s.used[types.Hash256(sfe.ID)] = true
}

return toSign, nil
}

var id wallet.ID
var wfr WalletFundSFRequest
if jc.DecodeParam("id", &id) != nil || jc.Decode(&wfr) != nil {
return
}
utxos, err := s.wm.UnspentSiafundOutputs(id, 0, 1000)
utxos, _, change, err := s.wm.SelectSiafundElements(id, wfr.Amount)
if jc.Check("couldn't get utxos to fund transaction", err) != nil {
return
}

txn := wfr.Transaction
toSign, err := fundTxn(&txn, wfr.Amount, utxos, wfr.ChangeAddress, wfr.ClaimAddress, s.cm.PoolTransactions())
if jc.Check("couldn't fund transaction", err) != nil {
return
if change > 0 {
if wfr.ChangeAddress == types.VoidAddress {
jc.Error(errors.New("change address must be specified"), http.StatusBadRequest)
return
}

txn.SiafundOutputs = append(txn.SiafundOutputs, types.SiafundOutput{
Value: change,
Address: wfr.ChangeAddress,
})
}

toSign := make([]types.Hash256, 0, len(utxos))
for _, sce := range utxos {
txn.SiafundInputs = append(txn.SiafundInputs, types.SiafundInput{
ParentID: sce.ID,
ClaimAddress: wfr.ChangeAddress,
// UnlockConditions left empty for client to fill in
})
toSign = append(toSign, types.Hash256(sce.ID))
}
jc.Encode(WalletFundResponse{
Transaction: txn,
Expand Down Expand Up @@ -924,10 +854,9 @@ func NewServer(cm ChainManager, s Syncer, wm WalletManager, opts ...ServerOption
publicEndpoints: false,
startTime: time.Now(),

cm: cm,
s: s,
wm: wm,
used: make(map[types.Hash256]bool),
cm: cm,
s: s,
wm: wm,
}
for _, opt := range opts {
opt(&srv)
Expand Down
1 change: 1 addition & 0 deletions knope.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ command = "git switch -c release"

[[workflows.steps]]
type = "PrepareRelease"
ignore_conventional_commits = true

[[workflows.steps]]
type = "Command"
Expand Down
66 changes: 47 additions & 19 deletions persist/sqlite/wallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,24 @@ func (s *Store) RemoveWalletAddress(id wallet.ID, address types.Address) error {
})
}

// WalletAddress returns an address registered to the wallet.
func (s *Store) WalletAddress(id wallet.ID, address types.Address) (addr wallet.Address, err error) {
err = s.transaction(func(tx *txn) error {
if err := walletExists(tx, id); err != nil {
return err
}

const query = `SELECT sa.sia_address, wa.description, wa.spend_policy, wa.extra_data
FROM wallet_addresses wa
INNER JOIN sia_addresses sa ON (sa.id = wa.address_id)
WHERE wa.wallet_id=$1 AND sa.sia_address=$2`

addr, err = scanWalletAddress(tx.QueryRow(query, id, encode(address)))
return err
})
return
}

// WalletAddresses returns a slice of addresses registered to the wallet.
func (s *Store) WalletAddresses(id wallet.ID) (addresses []wallet.Address, err error) {
err = s.transaction(func(tx *txn) error {
Expand All @@ -195,27 +213,11 @@ WHERE wa.wallet_id=$1`
defer rows.Close()

for rows.Next() {
var address wallet.Address
var decodedPolicy any
if err := rows.Scan(decode(&address.Address), &address.Description, &decodedPolicy, (*[]byte)(&address.Metadata)); err != nil {
addr, err := scanWalletAddress(rows)
if err != nil {
return fmt.Errorf("failed to scan address: %w", err)
}

if decodedPolicy != nil {
switch v := decodedPolicy.(type) {
case []byte:
dec := types.NewBufDecoder(v)
address.SpendPolicy = new(types.SpendPolicy)
address.SpendPolicy.DecodeFrom(dec)
if err := dec.Err(); err != nil {
return fmt.Errorf("failed to decode spend policy: %w", err)
}
default:
return fmt.Errorf("unexpected spend policy type: %T", decodedPolicy)
}
}

addresses = append(addresses, address)
addresses = append(addresses, addr)
}
return rows.Err()
})
Expand Down Expand Up @@ -611,6 +613,32 @@ RETURNING id`
return
}

func scanWalletAddress(s scanner) (wallet.Address, error) {
var address wallet.Address
var decodedPolicy any
if err := s.Scan(decode(&address.Address), &address.Description, &decodedPolicy, (*[]byte)(&address.Metadata)); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return wallet.Address{}, wallet.ErrNotFound
}
return wallet.Address{}, fmt.Errorf("failed to scan address: %w", err)
}

if decodedPolicy != nil {
switch v := decodedPolicy.(type) {
case []byte:
dec := types.NewBufDecoder(v)
address.SpendPolicy = new(types.SpendPolicy)
address.SpendPolicy.DecodeFrom(dec)
if err := dec.Err(); err != nil {
return wallet.Address{}, fmt.Errorf("failed to decode spend policy: %w", err)
}
default:
return wallet.Address{}, fmt.Errorf("unexpected spend policy type: %T", decodedPolicy)
}
}
return address, nil
}

func fillElementProofs(tx *txn, indices []uint64) (proofs [][]types.Hash256, _ error) {
if len(indices) == 0 {
return nil, nil
Expand Down
Loading
Loading