Skip to content

Commit

Permalink
issueTx refactor (#357)
Browse files Browse the repository at this point in the history
* plugin/evm: add size and gas limit check to atomic mempool

* plugin/evm: fix test that relies on block building logic

* Fix tests

* plugin/evm: remove issueTx
  • Loading branch information
aaronbuchwald authored Oct 23, 2023
1 parent 1b205eb commit 3a0283f
Show file tree
Hide file tree
Showing 14 changed files with 120 additions and 169 deletions.
2 changes: 1 addition & 1 deletion plugin/evm/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (b *Block) Reject(context.Context) error {
log.Debug(fmt.Sprintf("Rejecting block %s (%s) at height %d", b.ID().Hex(), b.ID(), b.Height()))
for _, tx := range b.atomicTxs {
b.vm.mempool.RemoveTx(tx)
if err := b.vm.issueTx(tx, false /* set local to false when re-issuing */); err != nil {
if err := b.vm.mempool.AddTx(tx); err != nil {
log.Debug("Failed to re-issue transaction in rejected block", "txID", tx.ID(), "err", err)
}
}
Expand Down
8 changes: 4 additions & 4 deletions plugin/evm/export_tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func createExportTxOptions(t *testing.T, vm *VM, issuer chan engCommon.Message,
t.Fatal(err)
}

if err := vm.issueTx(importTx, true /*=local*/); err != nil {
if err := vm.mempool.AddLocalTx(importTx); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -371,7 +371,7 @@ func TestExportTxEVMStateTransfer(t *testing.T) {
t.Fatal(err)
}

if err := vm.issueTx(tx, true /*=local*/); err != nil {
if err := vm.mempool.AddLocalTx(tx); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -1726,7 +1726,7 @@ func TestNewExportTx(t *testing.T) {
t.Fatal(err)
}

if err := vm.issueTx(tx, true /*=local*/); err != nil {
if err := vm.mempool.AddLocalTx(tx); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -1916,7 +1916,7 @@ func TestNewExportTxMulticoin(t *testing.T) {
t.Fatal(err)
}

if err := vm.issueTx(tx, false); err != nil {
if err := vm.mempool.AddTx(tx); err != nil {
t.Fatal(err)
}

Expand Down
2 changes: 1 addition & 1 deletion plugin/evm/gossip_mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestAtomicMempoolIterate(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require := require.New(t)
m, err := NewMempool(ids.Empty, 10)
m, err := NewMempool(ids.Empty, 10, nil)
require.NoError(err)

for _, add := range tt.add {
Expand Down
2 changes: 1 addition & 1 deletion plugin/evm/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ func (h *GossipHandler) HandleAtomicTx(nodeID ids.NodeID, msg message.AtomicTxGo
}

h.stats.IncAtomicGossipReceivedNew()
if err := h.vm.issueTx(&tx, false /*=local*/); err != nil {
if err := h.vm.mempool.AddTx(&tx); err != nil {
log.Trace(
"AppGossip provided invalid transaction",
"peerID", nodeID,
Expand Down
4 changes: 2 additions & 2 deletions plugin/evm/gossiper_atomic_gossiping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestMempoolAtmTxsIssueTxAndGossiping(t *testing.T) {
assert.NoError(vm.SetState(context.Background(), snow.NormalOp))

// Optimistically gossip raw tx
assert.NoError(vm.issueTx(tx, true /*=local*/))
assert.NoError(vm.mempool.AddLocalTx(tx))
time.Sleep(500 * time.Millisecond)
gossipedLock.Lock()
assert.Equal(1, gossiped)
Expand All @@ -74,7 +74,7 @@ func TestMempoolAtmTxsIssueTxAndGossiping(t *testing.T) {
gossipedLock.Unlock()

// Attempt to gossip conflicting tx
assert.ErrorIs(vm.issueTx(conflictingTx, true /*=local*/), errConflictingAtomicTx)
assert.ErrorIs(vm.mempool.AddLocalTx(conflictingTx), errConflictingAtomicTx)
gossipedLock.Lock()
assert.Equal(1, gossiped)
gossipedLock.Unlock()
Expand Down
28 changes: 27 additions & 1 deletion plugin/evm/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,12 @@ type Mempool struct {
bloom *gossip.BloomFilter

metrics *mempoolMetrics

verify func(tx *Tx) error
}

// NewMempool returns a Mempool with [maxSize]
func NewMempool(AVAXAssetID ids.ID, maxSize int) (*Mempool, error) {
func NewMempool(AVAXAssetID ids.ID, maxSize int, verify func(tx *Tx) error) (*Mempool, error) {
bloom, err := gossip.NewBloomFilter(txGossipBloomMaxItems, txGossipBloomFalsePositiveRate)
if err != nil {
return nil, fmt.Errorf("failed to initialize bloom filter: %w", err)
Expand All @@ -95,6 +97,7 @@ func NewMempool(AVAXAssetID ids.ID, maxSize int) (*Mempool, error) {
utxoSpenders: make(map[ids.ID]*Tx),
bloom: bloom,
metrics: newMempoolMetrics(),
verify: verify,
}, nil
}

Expand Down Expand Up @@ -145,6 +148,24 @@ func (m *Mempool) AddTx(tx *Tx) error {
m.lock.Lock()
defer m.lock.Unlock()

err := m.addTx(tx, false)
if err != nil {
// unlike local txs, invalid remote txs are recorded as discarded
// so that they won't be requested again
txID := tx.ID()
m.discardedTxs.Put(tx.ID(), tx)
log.Debug("failed to issue remote tx to mempool",
"txID", txID,
"err", err,
)
}
return err
}

func (m *Mempool) AddLocalTx(tx *Tx) error {
m.lock.Lock()
defer m.lock.Unlock()

return m.addTx(tx, false)
}

Expand Down Expand Up @@ -203,6 +224,11 @@ func (m *Mempool) addTx(tx *Tx, force bool) error {
if _, exists := m.txHeap.Get(txID); exists {
return nil
}
if !force && m.verify != nil {
if err := m.verify(tx); err != nil {
return err
}
}

utxoSet := tx.InputUTXOs()
gasPrice, _ := m.atomicTxGasPrice(tx)
Expand Down
94 changes: 22 additions & 72 deletions plugin/evm/mempool_atomic_gossiping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,12 @@ package evm

import (
"context"
"math/big"
"testing"

"github.com/ava-labs/coreth/params"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils"
"github.com/ava-labs/avalanchego/utils/crypto/secp256k1"
"github.com/ava-labs/avalanchego/vms/components/avax"
"github.com/ava-labs/avalanchego/vms/components/chain"
"github.com/ava-labs/avalanchego/vms/secp256k1fx"

"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -49,13 +45,13 @@ func TestMempoolAddLocallyCreateAtomicTx(t *testing.T) {
conflictingTxID := conflictingTx.ID()

// add a tx to the mempool
err := vm.issueTx(tx, true /*=local*/)
err := vm.mempool.AddLocalTx(tx)
assert.NoError(err)
has := mempool.has(txID)
assert.True(has, "valid tx not recorded into mempool")

// try to add a conflicting tx
err = vm.issueTx(conflictingTx, true /*=local*/)
err = vm.mempool.AddLocalTx(conflictingTx)
assert.ErrorIs(err, errConflictingAtomicTx)
has = mempool.has(conflictingTxID)
assert.False(has, "conflicting tx in mempool")
Expand Down Expand Up @@ -118,88 +114,42 @@ func TestMempoolMaxMempoolSizeHandling(t *testing.T) {
assert.True(mempool.has(tx.ID()))
}

func createImportTx(t *testing.T, vm *VM, txID ids.ID, feeAmount uint64) *Tx {
var importAmount uint64 = 10000000
importTx := &UnsignedImportTx{
NetworkID: testNetworkID,
BlockchainID: testCChainID,
SourceChain: testXChainID,
ImportedInputs: []*avax.TransferableInput{
{
UTXOID: avax.UTXOID{
TxID: txID,
OutputIndex: uint32(0),
},
Asset: avax.Asset{ID: testAvaxAssetID},
In: &secp256k1fx.TransferInput{
Amt: importAmount,
Input: secp256k1fx.Input{
SigIndices: []uint32{0},
},
},
},
{
UTXOID: avax.UTXOID{
TxID: txID,
OutputIndex: uint32(1),
},
Asset: avax.Asset{ID: testAvaxAssetID},
In: &secp256k1fx.TransferInput{
Amt: importAmount,
Input: secp256k1fx.Input{
SigIndices: []uint32{0},
},
},
},
},
Outs: []EVMOutput{
{
Address: testEthAddrs[0],
Amount: importAmount - feeAmount,
AssetID: testAvaxAssetID,
},
{
Address: testEthAddrs[1],
Amount: importAmount,
AssetID: testAvaxAssetID,
},
},
}

// Sort the inputs and outputs to ensure the transaction is canonical
utils.Sort(importTx.ImportedInputs)
utils.Sort(importTx.Outs)

tx := &Tx{UnsignedAtomicTx: importTx}
// Sign with the correct key
if err := tx.Sign(vm.codec, [][]*secp256k1.PrivateKey{{testKeys[0]}}); err != nil {
t.Fatal(err)
}

return tx
}

// mempool will drop transaction with the lowest fee
func TestMempoolPriorityDrop(t *testing.T) {
assert := assert.New(t)

// we use AP3 genesis here to not trip any block fees
_, vm, _, _, _ := GenesisVM(t, true, genesisJSONApricotPhase3, "", "")
importAmount := uint64(50000000)
_, vm, _, _, _ := GenesisVMWithUTXOs(t, true, genesisJSONApricotPhase3, "", "", map[ids.ShortID]uint64{
testShortIDAddrs[0]: importAmount,
testShortIDAddrs[1]: importAmount,
})
defer func() {
err := vm.Shutdown(context.Background())
assert.NoError(err)
}()
mempool := vm.mempool
mempool.maxSize = 1

tx1 := createImportTx(t, vm, ids.ID{1}, params.AvalancheAtomicTxFee)
tx1, err := vm.newImportTx(vm.ctx.XChainID, testEthAddrs[0], initialBaseFee, []*secp256k1.PrivateKey{testKeys[0]})
if err != nil {
t.Fatal(err)
}
assert.NoError(mempool.AddTx(tx1))
assert.True(mempool.has(tx1.ID()))
tx2 := createImportTx(t, vm, ids.ID{2}, params.AvalancheAtomicTxFee)

tx2, err := vm.newImportTx(vm.ctx.XChainID, testEthAddrs[1], initialBaseFee, []*secp256k1.PrivateKey{testKeys[1]})
if err != nil {
t.Fatal(err)
}
assert.ErrorIs(mempool.AddTx(tx2), errInsufficientAtomicTxFee)
assert.True(mempool.has(tx1.ID()))
assert.False(mempool.has(tx2.ID()))
tx3 := createImportTx(t, vm, ids.ID{3}, 2*params.AvalancheAtomicTxFee)

tx3, err := vm.newImportTx(vm.ctx.XChainID, testEthAddrs[1], new(big.Int).Mul(initialBaseFee, big.NewInt(2)), []*secp256k1.PrivateKey{testKeys[1]})
if err != nil {
t.Fatal(err)
}
assert.NoError(mempool.AddTx(tx3))
assert.False(mempool.has(tx1.ID()))
assert.False(mempool.has(tx2.ID()))
Expand Down
2 changes: 1 addition & 1 deletion plugin/evm/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

func TestMempoolAddTx(t *testing.T) {
require := require.New(t)
m, err := NewMempool(ids.Empty, 5_000)
m, err := NewMempool(ids.Empty, 5_000, nil)
require.NoError(err)

txs := make([]*GossipAtomicTx, 0)
Expand Down
6 changes: 3 additions & 3 deletions plugin/evm/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (service *AvaxAPI) Import(_ *http.Request, args *ImportArgs, response *api.
}

response.TxID = tx.ID()
return service.vm.issueTx(tx, true /*=local*/)
return service.vm.mempool.AddLocalTx(tx)
}

// ExportAVAXArgs are the arguments to ExportAVAX
Expand Down Expand Up @@ -343,7 +343,7 @@ func (service *AvaxAPI) Export(_ *http.Request, args *ExportArgs, response *api.
}

response.TxID = tx.ID()
return service.vm.issueTx(tx, true /*=local*/)
return service.vm.mempool.AddLocalTx(tx)
}

// GetUTXOs gets all utxos for passed in addresses
Expand Down Expand Up @@ -449,7 +449,7 @@ func (service *AvaxAPI) IssueTx(r *http.Request, args *api.FormattedTx, response
service.vm.ctx.Lock.Lock()
defer service.vm.ctx.Lock.Unlock()

return service.vm.issueTx(tx, true /*=local*/)
return service.vm.mempool.AddLocalTx(tx)
}

// GetAtomicTxStatusReply defines the GetAtomicTxStatus replies returned from the API
Expand Down
4 changes: 2 additions & 2 deletions plugin/evm/syncervm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func createSyncServerAndClientVMs(t *testing.T, test syncTest) *syncVMSetup {
// spend the UTXOs from shared memory
importTx, err = serverVM.newImportTx(serverVM.ctx.XChainID, testEthAddrs[0], initialBaseFee, []*secp256k1.PrivateKey{testKeys[0]})
require.NoError(err)
require.NoError(serverVM.issueTx(importTx, true /*=local*/))
require.NoError(serverVM.mempool.AddLocalTx(importTx))
case 1:
// export some of the imported UTXOs to test exportTx is properly synced
exportTx, err = serverVM.newExportTx(
Expand All @@ -299,7 +299,7 @@ func createSyncServerAndClientVMs(t *testing.T, test syncTest) *syncVMSetup {
[]*secp256k1.PrivateKey{testKeys[0]},
)
require.NoError(err)
require.NoError(serverVM.issueTx(exportTx, true /*=local*/))
require.NoError(serverVM.mempool.AddLocalTx(exportTx))
default: // Generate simple transfer transactions.
pk := testKeys[0].ToECDSA()
tx := types.NewTransaction(gen.TxNonce(testEthAddrs[0]), testEthAddrs[1], common.Big1, params.TxGas, initialBaseFee, nil)
Expand Down
4 changes: 2 additions & 2 deletions plugin/evm/tx_gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestEthTxGossip(t *testing.T) {

importTx, err := vm.newImportTx(vm.ctx.XChainID, testEthAddrs[0], initialBaseFee, []*secp256k1.PrivateKey{testKeys[0]})
require.NoError(err)
require.NoError(vm.issueTx(importTx, true))
require.NoError(vm.mempool.AddLocalTx(importTx))
<-issuer

blk, err := vm.BuildBlock(context.Background())
Expand Down Expand Up @@ -228,7 +228,7 @@ func TestAtomicTxGossip(t *testing.T) {
importTx, err := vm.newImportTx(vm.ctx.XChainID, testEthAddrs[0], initialBaseFee, []*secp256k1.PrivateKey{testKeys[0]})
require.NoError(err)

require.NoError(vm.issueTx(importTx, true /*=local*/))
require.NoError(vm.mempool.AddLocalTx(importTx))
<-issuer

// wait so we aren't throttled by the vm
Expand Down
2 changes: 1 addition & 1 deletion plugin/evm/tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func executeTxTest(t *testing.T, test atomicTxTest) {
}
}

if err := vm.issueTx(tx, true /*=local*/); err != nil {
if err := vm.mempool.AddLocalTx(tx); err != nil {
t.Fatal(err)
}
<-issuer
Expand Down
Loading

0 comments on commit 3a0283f

Please sign in to comment.