Skip to content

Commit

Permalink
add peer blocklist
Browse files Browse the repository at this point in the history
  • Loading branch information
denisu committed Nov 5, 2024
1 parent f85827c commit b65007c
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 16 deletions.
1 change: 1 addition & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ pub const DNS_RECORD_TTL: u32 = 300;
pub const MAX_CONCURRENT_TASKS: usize = 100;
pub const MAX_RECORDS_TO_RETURN: usize = 32;
pub const RECHECK_INTERVAL: Duration = Duration::from_secs(1800);
pub const PEER_BLOCKLIST_TTL: u64 = 86400;
59 changes: 43 additions & 16 deletions src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@ use futures_util::stream::{FuturesUnordered, StreamExt};
use std::{
collections::{HashSet, VecDeque},
net::SocketAddr,
sync::Arc,
time::Duration,
sync::{Arc, Mutex},
time::{Duration, Instant},
};
use tokio::time::timeout;
use tracing::info;

struct BlockedPeer {
addr: SocketAddr,
expires_at: Instant,
}

pub fn start_peer_crawler(
initial_peers: Vec<SocketAddr>,
tls: Connector,
Expand All @@ -23,6 +28,7 @@ pub fn start_peer_crawler(
let mut seen_peers = HashSet::new();
let mut peers_to_process = VecDeque::new();
let mut active_tasks = FuturesUnordered::new();
let blocked_peers = Arc::new(Mutex::new(Vec::<BlockedPeer>::new()));

for peer in initial_peers {
if seen_peers.insert(peer) {
Expand All @@ -31,11 +37,35 @@ pub fn start_peer_crawler(
}

loop {
// Clean expired entries
{
let mut blocklist = blocked_peers.lock().unwrap();
blocklist.retain(|peer| peer.expires_at > Instant::now());
}

while active_tasks.len() < MAX_CONCURRENT_TASKS {
if let Some(peer) = peers_to_process.pop_front() {
// Check if peer is blocked before processing
let is_blocked = {
let blocklist = blocked_peers.lock().unwrap();
blocklist.iter().any(|bp| bp.addr == peer)
};

if is_blocked {
info!("Skipping blocked peer: {:?}", peer);
continue;
}

let tls = Arc::clone(&tls);
let authority = Arc::clone(&authority);
active_tasks.push(process_peer(peer, tls, authority, network_id.clone()));
let blocklist = Arc::clone(&blocked_peers);
active_tasks.push(process_peer(
peer,
tls,
authority,
network_id.clone(),
blocklist,
));
} else {
break;
}
Expand All @@ -61,6 +91,7 @@ async fn process_peer(
tls: Arc<Connector>,
authority: Arc<RandomizedAuthority>,
network_id: String,
blocked_peers: Arc<Mutex<Vec<BlockedPeer>>>,
) -> Vec<SocketAddr> {
let mut new_peers = Vec::new();

Expand All @@ -71,31 +102,28 @@ async fn process_peer(
.await
{
Ok(Ok((peer, ws_stream))) => {
// Get more peers and collect them, only consider a peer up if we can get more peers from it
if let Ok(response) = peer.request_peers().await {
// Add to DNS
authority.add_peer(peer.socket_addr()).await;

for peer_info in response.peer_list {
if let Ok(ip) = peer_info.host.parse() {
let addr = SocketAddr::new(ip, peer_info.port);
// Only add peers that are on the same port as the peer we connected to, since we only provide ips over dns
if addr.port() == peer_info.port {
new_peers.push(addr);
// Only add peers that are on the same port as the peer we connect to, since we only provide ips over dns
if peer_addr.port() == peer_info.port {
new_peers.push(SocketAddr::new(ip, peer_info.port));
}
}
}
}

// Clean up
drop(ws_stream);
drop(peer);
}
Ok(Err(e)) => {
info!("Failed to connect to peer {}: {}", peer_addr, e);
}
Err(_) => {
info!("Timeout connecting to peer {}", peer_addr);
_ => {
info!("Failed to connect to peer: {}", peer_addr);
blocked_peers.lock().unwrap().push(BlockedPeer {
addr: peer_addr,
expires_at: Instant::now() + Duration::from_secs(PEER_BLOCKLIST_TTL),
});
}
}

Expand Down Expand Up @@ -144,7 +172,6 @@ pub async fn start_peer_rechecker(
}
}

// Wait for at least one task to complete
if !tasks.is_empty() {
tasks.next().await;
}
Expand Down

0 comments on commit b65007c

Please sign in to comment.