Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sdk 142 - Dns client #1

Open
wants to merge 9 commits into
base: dev
Choose a base branch
from
3 changes: 3 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ scorex {
# List of IP addresses of well known nodes.
knownPeers = []

# List of known DNS seeder.
knownSeeders = []

# Interval between GetPeers messages to be send by our node to a random one
getPeersInterval = 2m

Expand Down
14 changes: 9 additions & 5 deletions src/main/scala/scorex/core/app/Application.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package scorex.core.app

import java.net.InetSocketAddress

import akka.actor.{ActorRef, ActorSystem}
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.{ExceptionHandler, RejectionHandler, Route}
import scorex.core.api.http.{ApiErrorHandler, ApiRejectionHandler, ApiRoute, CompositeHttpService}
import scorex.core.network._
import scorex.core.network.dns.DnsClientRef
import scorex.core.network.dns.model.DnsClientInput
import scorex.core.network.message._
import scorex.core.network.peer.PeerManagerRef
import scorex.core.settings.ScorexSettings
Expand All @@ -15,6 +15,7 @@ import scorex.core.utils.NetworkTimeProvider
import scorex.core.{NodeViewHolder, PersistentNodeViewModifier}
import scorex.util.ScorexLogging

import java.net.InetSocketAddress
import scala.concurrent.ExecutionContext

trait Application extends ScorexLogging {
Expand Down Expand Up @@ -75,21 +76,24 @@ trait Application extends ScorexLogging {
}
}

val scorexContext = ScorexContext(
val scorexContext: ScorexContext = ScorexContext(
messageSpecs = basicSpecs ++ additionalMessageSpecs,
features = features,
upnpGateway = upnpGateway,
timeProvider = timeProvider,
externalNodeAddress = externalSocketAddress
)

val peerManagerRef = PeerManagerRef(settings, scorexContext)
val peerManagerRef: ActorRef = PeerManagerRef(settings, scorexContext)

val networkControllerRef: ActorRef = NetworkControllerRef(
"networkController", settings.network, peerManagerRef, scorexContext)

val dnsClientParams = new DnsClientInput(settings.network.knownSeeders)
val dnsClient: ActorRef = DnsClientRef(dnsClientParams)

val peerSynchronizer: ActorRef = PeerSynchronizerRef("PeerSynchronizer",
networkControllerRef, peerManagerRef, settings.network, featureSerializers)
networkControllerRef, peerManagerRef, dnsClient, settings.network, featureSerializers)

lazy val combinedRoute: Route = CompositeHttpService(actorSystem, apiRoutes, settings.restApi, swaggerConfig).compositeRoute

Expand Down
11 changes: 8 additions & 3 deletions src/main/scala/scorex/core/network/NetworkController.scala
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
package scorex.core.network

import java.net._

import akka.actor._
import akka.io.Tcp._
import akka.io.{IO, Tcp}
import akka.pattern.ask
import akka.util.Timeout
import scorex.core.app.{ScorexContext, Version}
import scorex.core.network.NodeViewSynchronizer.ReceivableMessages.{DisconnectedPeer, HandshakedPeer}
import scorex.core.network.PeerSynchronizer.ReceivableMessages.GetNewPeers
import scorex.core.network.message.Message.MessageCode
import scorex.core.network.message.{Message, MessageSpec}
import scorex.core.network.peer.PeerManager.ReceivableMessages._
import scorex.core.network.peer.{LocalAddressPeerFeature, PeerInfo, PeerManager, PeersStatus, PenaltyType, SessionIdPeerFeature}
import scorex.core.network.peer._
import scorex.core.settings.NetworkSettings
import scorex.core.utils.TimeProvider.Time
import scorex.core.utils.{NetworkUtils, TimeProvider}
import scorex.util.ScorexLogging

import java.net._
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.language.{existentials, postfixOps}
Expand Down Expand Up @@ -139,6 +139,9 @@ class NetworkController(settings: NetworkSettings,

case Blacklisted(peerAddress) =>
closeConnection(peerAddress)

case EmptyPeerDatabase() =>
i-Alex marked this conversation as resolved.
Show resolved Hide resolved
context.system.eventStream.publish(GetNewPeers())
}

private def connectionEvents: Receive = {
Expand Down Expand Up @@ -501,6 +504,8 @@ object NetworkController {

case object GetConnectedPeers

case class EmptyPeerDatabase()

/**
* Get p2p network status
*/
Expand Down
46 changes: 39 additions & 7 deletions src/main/scala/scorex/core/network/PeerSynchronizer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@ import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout
import scorex.core.network.NetworkController.ReceivableMessages.{PenalizePeer, RegisterMessageSpecs, SendToNetwork}
import scorex.core.network.PeerSynchronizer.ReceivableMessages.{GetNewPeers, LookupResponse}
import scorex.core.network.dns.DnsClient.ReceivableMessages.LookupRequest
import scorex.core.network.dns.strategy.Strategy.LeastNodeQuantity
import scorex.core.network.message.{GetPeersSpec, Message, MessageSpec, PeersSpec}
import scorex.core.network.peer.PeerManager.ReceivableMessages.{AddOrUpdatePeer, AddPeerIfEmpty, SeenPeers}
import scorex.core.network.peer.{PeerInfo, PenaltyType}
import scorex.core.network.peer.PeerManager.ReceivableMessages.{AddPeerIfEmpty, SeenPeers}
import scorex.core.settings.NetworkSettings
import scorex.util.ScorexLogging
import shapeless.syntax.typeable._

import java.net.{InetAddress, InetSocketAddress}
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

Expand All @@ -19,6 +23,7 @@ import scala.concurrent.duration._
*/
class PeerSynchronizer(val networkControllerRef: ActorRef,
peerManager: ActorRef,
dnsClientRef: ActorRef,
settings: NetworkSettings,
featureSerializers: PeerFeature.Serializers)
(implicit ec: ExecutionContext) extends Actor with Synchronizer with ScorexLogging {
Expand All @@ -38,16 +43,35 @@ class PeerSynchronizer(val networkControllerRef: ActorRef,

networkControllerRef ! RegisterMessageSpecs(Seq(GetPeersSpec, peersSpec), self)

context.system.eventStream.subscribe(self, classOf[GetNewPeers])

val msg = Message[Unit](GetPeersSpec, Right(Unit), None)
val stn = SendToNetwork(msg, SendToRandom)
context.system.scheduler.scheduleWithFixedDelay(2.seconds, settings.getPeersInterval, networkControllerRef, stn)

performFirstDnsSeedersLookup()
}

private def performFirstDnsSeedersLookup(): Unit = {
self ! GetNewPeers()
}

override def receive: Receive = {

// data received from a remote peer
case Message(spec, Left(msgBytes), Some(source)) => parseAndHandle(spec, msgBytes, source)

// TODO: it could be a good idea to pass the peers left in the db or some other criteria
// TODO: to decide the lookup strategy
case GetNewPeers() =>
dnsClientRef ! LookupRequest(LeastNodeQuantity())

case LookupResponse(ipv4Addresses, _) =>
paologalligit marked this conversation as resolved.
Show resolved Hide resolved
val defaultPort = settings.bindAddress.getPort
ipv4Addresses.foreach(
address => peerManager ! AddOrUpdatePeer(PeerInfo.fromAddress(new InetSocketAddress(address, defaultPort)))
)

// fall-through method for reporting unhandled messages
case nonsense: Any => log.warn(s"PeerSynchronizer: got unexpected input $nonsense from ${sender()}")
}
Expand Down Expand Up @@ -82,16 +106,24 @@ class PeerSynchronizer(val networkControllerRef: ActorRef,
}
}

object PeerSynchronizer {
object ReceivableMessages {
case class GetNewPeers()

case class LookupResponse(ipv4Addresses: Seq[InetAddress], ipv6Addresses: Seq[InetAddress])
}
}

object PeerSynchronizerRef {
def props(networkControllerRef: ActorRef, peerManager: ActorRef, settings: NetworkSettings,
def props(networkControllerRef: ActorRef, peerManager: ActorRef, dnsClientRef: ActorRef, settings: NetworkSettings,
featureSerializers: PeerFeature.Serializers)(implicit ec: ExecutionContext): Props =
Props(new PeerSynchronizer(networkControllerRef, peerManager, settings, featureSerializers))
Props(new PeerSynchronizer(networkControllerRef, peerManager, dnsClientRef, settings, featureSerializers))

def apply(networkControllerRef: ActorRef, peerManager: ActorRef, settings: NetworkSettings,
def apply(networkControllerRef: ActorRef, peerManager: ActorRef, dnsClientRef: ActorRef, settings: NetworkSettings,
featureSerializers: PeerFeature.Serializers)(implicit system: ActorSystem, ec: ExecutionContext): ActorRef =
system.actorOf(props(networkControllerRef, peerManager, settings, featureSerializers))
system.actorOf(props(networkControllerRef, peerManager, dnsClientRef, settings, featureSerializers))

def apply(name: String, networkControllerRef: ActorRef, peerManager: ActorRef, settings: NetworkSettings,
def apply(name: String, networkControllerRef: ActorRef, peerManager: ActorRef, dnsClientRef: ActorRef, settings: NetworkSettings,
featureSerializers: PeerFeature.Serializers)(implicit system: ActorSystem, ec: ExecutionContext): ActorRef =
system.actorOf(props(networkControllerRef, peerManager, settings, featureSerializers), name)
system.actorOf(props(networkControllerRef, peerManager, dnsClientRef, settings, featureSerializers), name)
}
50 changes: 50 additions & 0 deletions src/main/scala/scorex/core/network/dns/DnsClient.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package scorex.core.network.dns

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import scorex.core.network.dns.model.DnsClientInput
import scorex.core.network.dns.strategy.LookupStrategy
import scorex.util.ScorexLogging

import scala.util.{Failure, Success}

class DnsClient(dnsClientParams: DnsClientInput) extends Actor with ScorexLogging {

import scorex.core.network.dns.DnsClient.ReceivableMessages.LookupRequest

override def receive: Receive =
dnsLookup orElse nonsense

private def dnsLookup: Receive = {
case LookupRequest(strategy: LookupStrategy) =>
strategy.apply(dnsClientParams) match {
case Success(response) => sender() ! response
case Failure(exception) => log.error("Failed to perform a dns lookup: " + exception.getMessage)
}
}

private def nonsense: Receive = {
case nonsense: Any =>
log.warn(s"DnsClient: got unexpected input $nonsense")
}
}

object DnsClient {
object ReceivableMessages {
case class LookupRequest(s: LookupStrategy)
}

object Exception {
final case class DnsLookupException(private val message: String = "")
extends Exception(message)
}
}

object DnsClientRef {
def props(params: DnsClientInput): Props = {
Props(new DnsClient(params))
}

def apply(dnsClientParams: DnsClientInput)(implicit system: ActorSystem): ActorRef = {
system.actorOf(props(dnsClientParams))
}
}
14 changes: 14 additions & 0 deletions src/main/scala/scorex/core/network/dns/model/DnsClientInput.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package scorex.core.network.dns.model

import scorex.core.network.dns.model.DnsClientInput.defaultLookupFunction

import java.net.InetAddress
import scala.util.Try

case class DnsClientInput(dnsSeeders: Seq[DnsSeederDomain], lookupFunction: DnsSeederDomain => Try[Seq[InetAddress]] = defaultLookupFunction)

object DnsClientInput {
def defaultLookupFunction(url: DnsSeederDomain): Try[Seq[InetAddress]] = {
Try(InetAddress.getAllByName(url.domainName))
}
}
10 changes: 10 additions & 0 deletions src/main/scala/scorex/core/network/dns/model/DnsSeederDomain.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package scorex.core.network.dns.model

class DnsSeederDomain(val domainName: String) {
private val WEB_DOMAIN_REGEX = "^(((?!\\-))(xn\\-\\-)?[a-z0-9\\-_]{0,61}[a-z0-9]{1,1}\\.)*(xn\\-\\-)?([a-z0-9\\-]{1,61}|[a-z0-9\\-]{1,30})\\.[a-z]{2,}$".r

domainName match {
case e if e.trim.isEmpty || WEB_DOMAIN_REGEX.findFirstMatchIn(e).isEmpty => throw new IllegalArgumentException("Domain name is not well formed")
case _ =>
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package scorex.core.network.dns.strategy

import scorex.core.network.PeerSynchronizer.ReceivableMessages.LookupResponse
import scorex.core.network.dns.DnsClient.Exception.DnsLookupException
import scorex.core.network.dns.model.DnsClientInput

import java.net.{Inet4Address, Inet6Address, InetAddress}
import scala.util.{Failure, Success, Try}

trait LookupStrategy {
def apply(dnsClientParams: DnsClientInput): Try[LookupResponse]
}

object Strategy {
case class LeastNodeQuantity() extends LookupStrategy {
private val LEAST_NODE_QUANTITY = 1

override def apply(dnsClientParams: DnsClientInput): Try[LookupResponse] = thresholdNodeQuantity(dnsClientParams, LEAST_NODE_QUANTITY)
}

case class MaxNodeQuantity() extends LookupStrategy {
private val MAX_NODE_QUANTITY = Int.MaxValue

override def apply(dnsClientParams: DnsClientInput): Try[LookupResponse] = {
thresholdNodeQuantity(dnsClientParams, MAX_NODE_QUANTITY)
}
}

case class ThresholdNodeQuantity(nodeThreshold: Int) extends LookupStrategy {
override def apply(dnsClientParams: DnsClientInput): Try[LookupResponse] = thresholdNodeQuantity(dnsClientParams, nodeThreshold)
}

private def thresholdNodeQuantity(dnsClientParams: DnsClientInput, nodesThreshold: Int): Try[LookupResponse] = {
if (nodesThreshold <= 0) {
Failure(new IllegalArgumentException("The nodes threshold must be greater than 0"))
} else {
val seeders = dnsClientParams.dnsSeeders
val lookupFunction = dnsClientParams.lookupFunction

val exceptionMessages = Seq[String]()

val (ipv4Addresses, ipv6Addresses) = seeders.foldLeft(Seq[InetAddress]()) { (acc, curr) =>
val newElements = if (acc.size < nodesThreshold) {
lookupFunction(curr) match {
case Success(value) => value
case Failure(exception) =>
exceptionMessages ++ exception.getMessage
Seq[InetAddress]()
}
} else {
Seq[InetAddress]()
}

acc ++ newElements
}.partition {
case _: Inet4Address => true
case _: Inet6Address => false
}

if (ipv4Addresses.isEmpty && ipv6Addresses.isEmpty) {
Failure(DnsLookupException("All dns lookups failed: " + exceptionMessages.mkString(", ")))
} else {
Success(LookupResponse(ipv4Addresses, ipv6Addresses))
}
}
}
}
13 changes: 11 additions & 2 deletions src/main/scala/scorex/core/network/peer/PeerManager.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package scorex.core.network.peer

import java.net.{InetAddress, InetSocketAddress}

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import scorex.core.app.ScorexContext
import scorex.core.network.NetworkController.ReceivableMessages.EmptyPeerDatabase
import scorex.core.network._
import scorex.core.settings.ScorexSettings
import scorex.core.utils.NetworkUtils
Expand All @@ -23,7 +23,11 @@ class PeerManager(settings: ScorexSettings, scorexContext: ScorexContext) extend

if (peerDatabase.isEmpty) {
// fill database with peers from config file if empty
settings.network.knownPeers.foreach { address =>
fillPeerDatabase(settings.network.knownPeers)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also fire a GetNewPeers here , to first start the process at bootstrap


private def fillPeerDatabase(addresses: Seq[InetSocketAddress]): Unit = {
addresses.foreach { address =>
if (!isSelf(address)) {
peerDatabase.addOrUpdateKnownPeer(PeerInfo.fromAddress(address))
}
Expand Down Expand Up @@ -65,6 +69,9 @@ class PeerManager(settings: ScorexSettings, scorexContext: ScorexContext) extend
case RemovePeer(address) =>
log.info(s"$address removed from peers database")
peerDatabase.remove(address)
if (peerDatabaseHasOnlyKnownPeersFromConfig()) {
sender() ! EmptyPeerDatabase
Copy link
Collaborator

@paolocappelletti paolocappelletti Jul 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is really needed to fire this EmptyPeerDatabase message?
Maybe is enough to fire direclty GetNewPeers, unless there is a reason to go always through the NetworkController
also: instead of firing the update when the new peers is empty maybe is worth to "ping" the seeders every a reasonable amount of time (like 30 minutes?). in this way we don't have to restart a node if the list returned by any of the seeds is changed. Is it possible to set Akka to fire a message every x minutes?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like one of the options that we took into consideration: if we don't receive a new block in 15 minutes, then we need new peers because we're not connected to any forger node. The solution to check the internal database is simpler and we don't have to schedule any jobs.
Regarding passing through the NetworkController, it's more a pattern that is implementing scorex. So the peer manager, which is just a storage, detects that we don't have any more peer and tells it to the network controller, who's responsible to coordinate the different actors.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after today discussion let'ls discard my comment (in reality the new peer will be broadcasted to the others anyway when it first tries to connect to another one).
Just one final note: I would change the message name from EmptyPeerDatabase to PeerDatabaseIsEmpty (more clear it notifies a state)

}

case get: GetPeers[_] =>
sender() ! get.choose(peerDatabase.knownPeers, peerDatabase.blacklistedPeers, scorexContext)
Expand All @@ -91,6 +98,8 @@ class PeerManager(settings: ScorexSettings, scorexContext: ScorexContext) extend
peerSpec.declaredAddress.exists(isSelf) || peerSpec.localAddressOpt.exists(isSelf)
}

private def peerDatabaseHasOnlyKnownPeersFromConfig(): Boolean = peerDatabase.knownPeers.size <= settings.network.knownPeers.size

Copy link
Collaborator

@paolocappelletti paolocappelletti Jun 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should compare also the single element's value, not only the size

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're comparing the size because it's considering a new feature we'll add. We're going to keep in the peer database all the addresses of the knownPeers, instead of deleting them after the first time we cannot connect. So, it's implicit that, at most, the internal peer database will contain a number of peers equal to the number of known peers

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok got it, but let's add a comment

}

object PeerManager {
Expand Down
Loading