Skip to content

Commit

Permalink
sqlite: remove txn interface
Browse files Browse the repository at this point in the history
  • Loading branch information
n8maninger committed Jan 24, 2024
1 parent f9a7cf7 commit 7823eb8
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 116 deletions.
30 changes: 15 additions & 15 deletions persist/sqlite/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ type proofUpdater interface {
UpdateElementProof(*types.StateElement)
}

func insertChainIndex(tx txn, index types.ChainIndex) (id int64, err error) {
func insertChainIndex(tx *txn, index types.ChainIndex) (id int64, err error) {
err = tx.QueryRow(`INSERT INTO chain_indices (height, block_id) VALUES ($1, $2) ON CONFLICT (block_id) DO UPDATE SET height=EXCLUDED.height RETURNING id`, index.Height, encode(index.ID)).Scan(&id)
return
}

func applyEvents(tx txn, events []wallet.Event) error {
func applyEvents(tx *txn, events []wallet.Event) error {
stmt, err := tx.Prepare(`INSERT INTO events (date_created, index_id, event_type, event_data) VALUES ($1, $2, $3, $4) RETURNING id`)
if err != nil {
return fmt.Errorf("failed to prepare statement: %w", err)
Expand Down Expand Up @@ -64,7 +64,7 @@ func applyEvents(tx txn, events []wallet.Event) error {
return nil
}

func deleteSiacoinOutputs(tx txn, spent []types.SiacoinElement) error {
func deleteSiacoinOutputs(tx *txn, spent []types.SiacoinElement) error {
addrStmt, err := tx.Prepare(`SELECT id, siacoin_balance FROM sia_addresses WHERE sia_address=$1 LIMIT 1`)
if err != nil {
return fmt.Errorf("failed to prepare lookup statement: %w", err)
Expand Down Expand Up @@ -108,7 +108,7 @@ func deleteSiacoinOutputs(tx txn, spent []types.SiacoinElement) error {
return nil
}

func applySiacoinOutputs(tx txn, added map[types.Hash256]types.SiacoinElement) error {
func applySiacoinOutputs(tx *txn, added map[types.Hash256]types.SiacoinElement) error {
addrStmt, err := tx.Prepare(`SELECT id, siacoin_balance FROM sia_addresses WHERE sia_address=$1 LIMIT 1`)
if err != nil {
return fmt.Errorf("failed to prepare lookup statement: %w", err)
Expand Down Expand Up @@ -152,7 +152,7 @@ func applySiacoinOutputs(tx txn, added map[types.Hash256]types.SiacoinElement) e
return nil
}

func deleteSiafundOutputs(tx txn, spent []types.SiafundElement) error {
func deleteSiafundOutputs(tx *txn, spent []types.SiafundElement) error {
addrStmt, err := tx.Prepare(`SELECT id, siafund_balance FROM sia_addresses WHERE sia_address=$1 LIMIT 1`)
if err != nil {
return fmt.Errorf("failed to prepare lookup statement: %w", err)
Expand Down Expand Up @@ -199,7 +199,7 @@ func deleteSiafundOutputs(tx txn, spent []types.SiafundElement) error {
return nil
}

func applySiafundOutputs(tx txn, added map[types.Hash256]types.SiafundElement) error {
func applySiafundOutputs(tx *txn, added map[types.Hash256]types.SiafundElement) error {
addrStmt, err := tx.Prepare(`SELECT id, siafund_balance FROM sia_addresses WHERE sia_address=$1 LIMIT 1`)
if err != nil {
return fmt.Errorf("failed to prepare lookup statement: %w", err)
Expand Down Expand Up @@ -245,13 +245,13 @@ func applySiafundOutputs(tx txn, added map[types.Hash256]types.SiafundElement) e
return nil
}

func updateLastIndexedTip(tx txn, tip types.ChainIndex) error {
func updateLastIndexedTip(tx *txn, tip types.ChainIndex) error {
_, err := tx.Exec(`UPDATE global_settings SET last_indexed_tip=$1`, encode(tip))
return err
}

func getStateElementBatch(stmt *loggedStmt, offset, limit int) ([]types.StateElement, error) {
rows, err := stmt.Query(limit, offset)
func getStateElementBatch(s *stmt, offset, limit int) ([]types.StateElement, error) {
rows, err := s.Query(limit, offset)
if err != nil {
return nil, fmt.Errorf("failed to query siacoin elements: %w", err)
}
Expand All @@ -269,8 +269,8 @@ func getStateElementBatch(stmt *loggedStmt, offset, limit int) ([]types.StateEle
return updated, nil
}

func updateStateElement(stmt *loggedStmt, se types.StateElement) error {
res, err := stmt.Exec(encodeSlice(se.MerkleProof), se.LeafIndex, encode(se.ID))
func updateStateElement(s *stmt, se types.StateElement) error {
res, err := s.Exec(encodeSlice(se.MerkleProof), se.LeafIndex, encode(se.ID))
if err != nil {
return fmt.Errorf("failed to update siacoin element %q: %w", se.ID, err)
} else if n, err := res.RowsAffected(); err != nil {
Expand All @@ -282,7 +282,7 @@ func updateStateElement(stmt *loggedStmt, se types.StateElement) error {
}

// how slow is this going to be 😬?
func updateElementProofs(tx txn, table string, updater proofUpdater) error {
func updateElementProofs(tx *txn, table string, updater proofUpdater) error {
stmt, err := tx.Prepare(`SELECT id, merkle_proof, leaf_index FROM ` + table + ` LIMIT $1 OFFSET $2`)
if err != nil {
return fmt.Errorf("failed to prepare batch statement: %w", err)
Expand Down Expand Up @@ -314,7 +314,7 @@ func updateElementProofs(tx txn, table string, updater proofUpdater) error {
}

// applyChainUpdates applies the given chain updates to the database.
func applyChainUpdates(tx txn, updates []*chain.ApplyUpdate) error {
func applyChainUpdates(tx *txn, updates []*chain.ApplyUpdate) error {
stmt, err := tx.Prepare(`SELECT id FROM sia_addresses WHERE sia_address=$1 LIMIT 1`)
if err != nil {
return fmt.Errorf("failed to prepare statement: %w", err)
Expand Down Expand Up @@ -404,7 +404,7 @@ func (s *Store) ProcessChainApplyUpdate(cau *chain.ApplyUpdate, mayCommit bool)
s.updates = append(s.updates, cau)

if mayCommit {
return s.transaction(func(tx txn) error {
return s.transaction(func(tx *txn) error {
if err := applyChainUpdates(tx, s.updates); err != nil {
return err
}
Expand All @@ -424,7 +424,7 @@ func (s *Store) ProcessChainRevertUpdate(cru *chain.RevertUpdate) error {
}

// update has been committed, revert it
return s.transaction(func(tx txn) error {
return s.transaction(func(tx *txn) error {
stmt, err := tx.Prepare(`SELECT id FROM sia_addresses WHERE sia_address=$1 LIMIT 1`)
if err != nil {
return fmt.Errorf("failed to prepare statement: %w", err)
Expand Down
6 changes: 3 additions & 3 deletions persist/sqlite/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ import (
//go:embed init.sql
var initDatabase string

func initializeSettings(tx txn, target int64) error {
func initializeSettings(tx *txn, target int64) error {
_, err := tx.Exec(`INSERT INTO global_settings (id, db_version, last_indexed_tip) VALUES (0, ?, ?)`, target, encode(types.ChainIndex{}))
return err
}

func (s *Store) initNewDatabase(target int64) error {
return s.transaction(func(tx txn) error {
return s.transaction(func(tx *txn) error {
if _, err := tx.Exec(initDatabase); err != nil {
return fmt.Errorf("failed to initialize database: %w", err)
} else if err := initializeSettings(tx, target); err != nil {
Expand All @@ -48,7 +48,7 @@ func (s *Store) upgradeDatabase(current, target int64) error {
}
}()

return s.transaction(func(tx txn) error {
return s.transaction(func(tx *txn) error {
for _, fn := range migrations[current-1:] {
current++
start := time.Now()
Expand Down
2 changes: 1 addition & 1 deletion persist/sqlite/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ import (
// migrations is a list of functions that are run to migrate the database from
// one version to the next. Migrations are used to update existing databases to
// match the schema in init.sql.
var migrations = []func(tx txn, log *zap.Logger) error{}
var migrations = []func(tx *txn, log *zap.Logger) error{}
16 changes: 8 additions & 8 deletions persist/sqlite/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,22 @@ import (
"go.uber.org/zap"
)

func getPeerInfo(tx txn, peer string) (syncer.PeerInfo, error) {
func getPeerInfo(tx *txn, peer string) (syncer.PeerInfo, error) {
const query = `SELECT first_seen, last_connect, synced_blocks, sync_duration FROM syncer_peers WHERE peer_address=$1`
var info syncer.PeerInfo
err := tx.QueryRow(query, peer).Scan(decode(&info.FirstSeen), decode(&info.LastConnect), &info.SyncedBlocks, &info.SyncDuration)
return info, err
}

func (s *Store) updatePeerInfo(tx txn, peer string, info syncer.PeerInfo) error {
func (s *Store) updatePeerInfo(tx *txn, peer string, info syncer.PeerInfo) error {
const query = `UPDATE syncer_peers SET first_seen=$1, last_connect=$2, synced_blocks=$3, sync_duration=$4 WHERE peer_address=$5 RETURNING peer_address`
err := tx.QueryRow(query, encode(info.FirstSeen), encode(info.LastConnect), info.SyncedBlocks, info.SyncDuration, peer).Scan(&peer)
return err
}

// AddPeer adds the given peer to the store.
func (s *Store) AddPeer(peer string) {
err := s.transaction(func(tx txn) error {
err := s.transaction(func(tx *txn) error {
const query = `INSERT INTO syncer_peers (peer_address, first_seen, last_connect, synced_blocks, sync_duration) VALUES ($1, $2, 0, 0, 0) ON CONFLICT (peer_address) DO NOTHING`
_, err := tx.Exec(query, peer, encode(time.Now()))
return err
Expand All @@ -40,7 +40,7 @@ func (s *Store) AddPeer(peer string) {

// Peers returns the addresses of all known peers.
func (s *Store) Peers() (peers []string) {
err := s.transaction(func(tx txn) error {
err := s.transaction(func(tx *txn) error {
const query = `SELECT peer_address FROM syncer_peers`
rows, err := tx.Query(query)
if err != nil {
Expand All @@ -64,7 +64,7 @@ func (s *Store) Peers() (peers []string) {

// UpdatePeerInfo updates the info for the given peer.
func (s *Store) UpdatePeerInfo(peer string, fn func(*syncer.PeerInfo)) {
err := s.transaction(func(tx txn) error {
err := s.transaction(func(tx *txn) error {
info, err := getPeerInfo(tx, peer)
if err != nil {
return fmt.Errorf("failed to get peer info: %w", err)
Expand All @@ -81,7 +81,7 @@ func (s *Store) UpdatePeerInfo(peer string, fn func(*syncer.PeerInfo)) {
func (s *Store) PeerInfo(peer string) (syncer.PeerInfo, bool) {
var info syncer.PeerInfo
var err error
err = s.transaction(func(tx txn) error {
err = s.transaction(func(tx *txn) error {
info, err = getPeerInfo(tx, peer)
return err
})
Expand Down Expand Up @@ -134,7 +134,7 @@ func (s *Store) Ban(peer string, duration time.Duration, reason string) {
s.log.Error("failed to normalize peer", zap.Error(err))
return
}
err = s.transaction(func(tx txn) error {
err = s.transaction(func(tx *txn) error {
const query = `INSERT INTO syncer_bans (net_cidr, expiration, reason) VALUES ($1, $2, $3) ON CONFLICT (net_cidr) DO UPDATE SET expiration=EXCLUDED.expiration, reason=EXCLUDED.reason`
_, err := tx.Exec(query, address, encode(time.Now().Add(duration)), reason)
return err
Expand Down Expand Up @@ -176,7 +176,7 @@ func (s *Store) Banned(peer string) (banned bool) {
checkSubnets = append(checkSubnets, subnet.String())
}

err = s.transaction(func(tx txn) error {
err = s.transaction(func(tx *txn) error {
query := `SELECT net_cidr, expiration FROM syncer_bans WHERE net_cidr IN (` + queryPlaceHolders(len(checkSubnets)) + `) ORDER BY expiration DESC LIMIT 1`

var subnet string
Expand Down
Loading

0 comments on commit 7823eb8

Please sign in to comment.