diff --git a/src/config.rs b/src/config.rs index 2cf6f83..7995da2 100644 --- a/src/config.rs +++ b/src/config.rs @@ -5,3 +5,4 @@ pub const DNS_RECORD_TTL: u32 = 300; pub const MAX_CONCURRENT_TASKS: usize = 100; pub const MAX_RECORDS_TO_RETURN: usize = 32; pub const RECHECK_INTERVAL: Duration = Duration::from_secs(1800); +pub const PEER_BLOCKLIST_TTL: u64 = 86400; diff --git a/src/peer.rs b/src/peer.rs index daca9ce..2e25d27 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -6,12 +6,17 @@ use futures_util::stream::{FuturesUnordered, StreamExt}; use std::{ collections::{HashSet, VecDeque}, net::SocketAddr, - sync::Arc, - time::Duration, + sync::{Arc, Mutex}, + time::{Duration, Instant}, }; use tokio::time::timeout; use tracing::info; +struct BlockedPeer { + addr: SocketAddr, + expires_at: Instant, +} + pub fn start_peer_crawler( initial_peers: Vec, tls: Connector, @@ -23,6 +28,7 @@ pub fn start_peer_crawler( let mut seen_peers = HashSet::new(); let mut peers_to_process = VecDeque::new(); let mut active_tasks = FuturesUnordered::new(); + let blocked_peers = Arc::new(Mutex::new(Vec::::new())); for peer in initial_peers { if seen_peers.insert(peer) { @@ -31,11 +37,35 @@ pub fn start_peer_crawler( } loop { + // Clean expired entries + { + let mut blocklist = blocked_peers.lock().unwrap(); + blocklist.retain(|peer| peer.expires_at > Instant::now()); + } + while active_tasks.len() < MAX_CONCURRENT_TASKS { if let Some(peer) = peers_to_process.pop_front() { + // Check if peer is blocked before processing + let is_blocked = { + let blocklist = blocked_peers.lock().unwrap(); + blocklist.iter().any(|bp| bp.addr == peer) + }; + + if is_blocked { + info!("Skipping blocked peer: {:?}", peer); + continue; + } + let tls = Arc::clone(&tls); let authority = Arc::clone(&authority); - active_tasks.push(process_peer(peer, tls, authority, network_id.clone())); + let blocklist = Arc::clone(&blocked_peers); + active_tasks.push(process_peer( + peer, + tls, + authority, + network_id.clone(), + blocklist, + )); } else { break; } @@ -61,6 +91,7 @@ async fn process_peer( tls: Arc, authority: Arc, network_id: String, + blocked_peers: Arc>>, ) -> Vec { let mut new_peers = Vec::new(); @@ -71,31 +102,28 @@ async fn process_peer( .await { Ok(Ok((peer, ws_stream))) => { - // Get more peers and collect them, only consider a peer up if we can get more peers from it if let Ok(response) = peer.request_peers().await { - // Add to DNS authority.add_peer(peer.socket_addr()).await; for peer_info in response.peer_list { if let Ok(ip) = peer_info.host.parse() { - let addr = SocketAddr::new(ip, peer_info.port); - // Only add peers that are on the same port as the peer we connected to, since we only provide ips over dns - if addr.port() == peer_info.port { - new_peers.push(addr); + // Only add peers that are on the same port as the peer we connect to, since we only provide ips over dns + if peer_addr.port() == peer_info.port { + new_peers.push(SocketAddr::new(ip, peer_info.port)); } } } } - // Clean up drop(ws_stream); drop(peer); } - Ok(Err(e)) => { - info!("Failed to connect to peer {}: {}", peer_addr, e); - } - Err(_) => { - info!("Timeout connecting to peer {}", peer_addr); + _ => { + info!("Failed to connect to peer: {}", peer_addr); + blocked_peers.lock().unwrap().push(BlockedPeer { + addr: peer_addr, + expires_at: Instant::now() + Duration::from_secs(PEER_BLOCKLIST_TTL), + }); } } @@ -144,7 +172,6 @@ pub async fn start_peer_rechecker( } } - // Wait for at least one task to complete if !tasks.is_empty() { tasks.next().await; }