Skip to content

Commit

Permalink
Merge pull request #39 from asonnino/remote-mempool-driver
Browse files Browse the repository at this point in the history
Remove async trait
  • Loading branch information
asonnino authored Mar 19, 2021
2 parents 1353070 + 6d59a04 commit 6455c7b
Show file tree
Hide file tree
Showing 36 changed files with 721 additions and 702 deletions.
2 changes: 1 addition & 1 deletion benchmark/aws/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def create_instances(self, instances):
client.run_instances(
ImageId=self._get_ami(client),
InstanceType=self.settings.instance_type,
KeyName='aws',
KeyName=self.settings.key_name,
MaxCount=instances,
MinCount=instances,
SecurityGroups=[self.SECURITY_GROUP_NAME],
Expand Down
16 changes: 11 additions & 5 deletions benchmark/aws/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,22 @@ class SettingsError(Exception):


class Settings:
def __init__(self, key_path, consensus_port, mempool_port, front_port, repo_name,
def __init__(self, key_name, key_path, consensus_port, mempool_port, front_port, repo_name,
repo_url, branch, instance_type, aws_regions):
regions = aws_regions if isinstance(aws_regions, list) else [aws_regions]
inputs_str = [key_path, repo_name, repo_url, branch, instance_type] + regions
regions = aws_regions if isinstance(
aws_regions, list) else [aws_regions]
inputs_str = [
key_name, key_path, repo_name, repo_url, branch, instance_type
]
inputs_str += regions
inputs_int = [consensus_port, mempool_port, front_port]
ok = all(isinstance(x, str) for x in inputs_str)
ok &= all(isinstance(x, int) for x in inputs_int)
ok &= len(regions) > 0
if not ok:
raise SettingsError('Invalid settings types')

self.key_name = key_name
self.key_path = key_path

self.consensus_port = consensus_port
Expand All @@ -37,7 +42,8 @@ def load(cls, filename):
data = load(f)

return cls(
data['key_path'],
data['key']['name'],
data['key']['path'],
data['ports']['consensus'],
data['ports']['mempool'],
data['ports']['front'],
Expand All @@ -51,4 +57,4 @@ def load(cls, filename):
raise SettingsError(str(e))

except KeyError as e:
raise SettingsError(f'Malformed settings: missing key {e}')
raise SettingsError(f'Malformed settings: missing key {e}')
1 change: 1 addition & 0 deletions benchmark/benchmark/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def __init__(self, json):
inputs += [json['consensus']['max_payload_size']]
inputs += [json['consensus']['min_block_delay']]
inputs += [json['mempool']['queue_capacity']]
inputs += [json['consensus']['sync_retry_delay']]
inputs += [json['mempool']['max_payload_size']]
inputs += [json['mempool']['min_block_delay']]
except KeyError as e:
Expand Down
8 changes: 5 additions & 3 deletions benchmark/fabfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def local(ctx):
},
'mempool': {
'queue_capacity': 10_000,
'sync_retry_delay': 100_000,
'max_payload_size': 15_000,
'min_block_delay': 0
}
Expand All @@ -39,7 +40,7 @@ def local(ctx):


@task
def create(ctx, nodes=4):
def create(ctx, nodes=2):
''' Create a testbed'''
try:
InstanceManager.make().create_instances(nodes)
Expand Down Expand Up @@ -96,8 +97,8 @@ def install(ctx):
def remote(ctx):
''' Run benchmarks on AWS '''
bench_params = {
'nodes': [30],
'rate': [60_000],
'nodes': [10],
'rate': [40_000],
'tx_size': 512,
'duration': 300,
'runs': 2,
Expand All @@ -111,6 +112,7 @@ def remote(ctx):
},
'mempool': {
'queue_capacity': 100_000,
'sync_retry_delay': 100_000,
'max_payload_size': 500_000,
'min_block_delay': 100
}
Expand Down
18 changes: 0 additions & 18 deletions benchmark/settings.json

This file was deleted.

3 changes: 1 addition & 2 deletions consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@ publish = false

[dependencies]
thiserror = "1.0.21"
tokio = { version = "1.1.0", features = ["rt", "time", "macros", "sync"] }
tokio = { version = "1.3.0", features = ["rt", "time", "macros", "sync"] }
ed25519-dalek = "1.0.1"
log = "0.4.0"
serde = { version = "1.0", features = ["derive"] }
bytes = "1.0.1"
bincode = "1.3.1"
futures = "0.3.8"
async-trait = "0.1.42"
async-recursion = "0.3.1"
base64 = "0.13.0"

Expand Down
2 changes: 1 addition & 1 deletion consensus/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl Aggregator {
}

pub fn add_vote(&mut self, vote: Vote) -> ConsensusResult<Option<QC>> {
// TODO: A bad node may make us run out of memory by sending many votes
// TODO [issue #7]: A bad node may make us run out of memory by sending many votes
// with different round numbers or different digests.

// Add the new vote to our aggregator and see if we have a QC.
Expand Down
22 changes: 12 additions & 10 deletions consensus/src/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use crate::config::{Committee, Parameters};
use crate::core::Core;
use crate::core::{ConsensusMessage, Core};
use crate::error::ConsensusResult;
use crate::leader::LeaderElector;
use crate::mempool::{MempoolDriver, NodeMempool};
use crate::mempool::{ConsensusMempoolMessage, MempoolDriver};
use crate::messages::Block;
use crate::synchronizer::Synchronizer;
use crypto::{PublicKey, SignatureService};
use log::info;
use network::{NetReceiver, NetSender};
use store::Store;
use tokio::sync::mpsc::{channel, Sender};
use tokio::sync::mpsc::{channel, Receiver, Sender};

#[cfg(test)]
#[path = "tests/consensus_tests.rs"]
Expand All @@ -18,14 +18,17 @@ pub mod consensus_tests;
pub struct Consensus;

impl Consensus {
pub async fn run<Mempool: 'static + NodeMempool>(
#[allow(clippy::too_many_arguments)]
pub async fn run(
name: PublicKey,
committee: Committee,
parameters: Parameters,
signature_service: SignatureService,
store: Store,
mempool: Mempool,
commit_channel: Sender<Block>,
signature_service: SignatureService,
tx_core: Sender<ConsensusMessage>,
rx_core: Receiver<ConsensusMessage>,
tx_consensus_mempool: Sender<ConsensusMempoolMessage>,
tx_commit: Sender<Block>,
) -> ConsensusResult<()> {
info!(
"Consensus timeout delay set to {} ms",
Expand All @@ -44,7 +47,6 @@ impl Consensus {
parameters.min_block_delay
);

let (tx_core, rx_core) = channel(1000);
let (tx_network, rx_network) = channel(1000);

// Make the network sender and receiver.
Expand All @@ -66,7 +68,7 @@ impl Consensus {
let leader_elector = LeaderElector::new(committee.clone());

// Make the mempool driver which will mediate our requests to the mempool.
let mempool_driver = MempoolDriver::new(mempool, tx_core.clone(), store.clone());
let mempool_driver = MempoolDriver::new(tx_consensus_mempool);

// Make the synchronizer. This instance runs in a background thread
// and asks other nodes for any block that we may be missing.
Expand All @@ -91,7 +93,7 @@ impl Consensus {
synchronizer,
/* core_channel */ rx_core,
/* network_channel */ tx_network,
commit_channel,
/* commit_channel */ tx_commit,
);
tokio::spawn(async move {
core.run().await;
Expand Down
Loading

0 comments on commit 6455c7b

Please sign in to comment.