Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sdk 142 - Dns client #1

Open
wants to merge 9 commits into
base: dev
Choose a base branch
from
2 changes: 2 additions & 0 deletions src/main/scala/scorex/core/network/NetworkController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,8 @@ object NetworkController {

case object GetConnectedPeers

case class EmptyPeerDatabase()

/**
* Get p2p network status
*/
Expand Down
13 changes: 10 additions & 3 deletions src/main/scala/scorex/core/network/PeerSynchronizer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout
import scorex.core.network.NetworkController.ReceivableMessages.{PenalizePeer, RegisterMessageSpecs, SendToNetwork}
import scorex.core.network.PeerSynchronizer.ReceivableMessages.GetNewPeers
import scorex.core.network.PeerSynchronizer.ReceivableMessages.{GetNewPeers, LookupResponse}
import scorex.core.network.dns.DnsClient.ReceivableMessages.LookupRequest
import scorex.core.network.dns.strategy.Response.LookupResponse
import scorex.core.network.dns.strategy.Strategy.LeastNodeQuantity
import scorex.core.network.message.{GetPeersSpec, Message, MessageSpec, PeersSpec}
import scorex.core.network.peer.PeerManager.ReceivableMessages.{AddOrUpdatePeer, AddPeerIfEmpty, SeenPeers}
Expand All @@ -15,7 +14,7 @@ import scorex.core.settings.NetworkSettings
import scorex.util.ScorexLogging
import shapeless.syntax.typeable._

import java.net.InetSocketAddress
import java.net.{InetAddress, InetSocketAddress}
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

Expand Down Expand Up @@ -49,6 +48,12 @@ class PeerSynchronizer(val networkControllerRef: ActorRef,
val msg = Message[Unit](GetPeersSpec, Right(Unit), None)
val stn = SendToNetwork(msg, SendToRandom)
context.system.scheduler.scheduleWithFixedDelay(2.seconds, settings.getPeersInterval, networkControllerRef, stn)

performFirstDnsSeedersLookup()
}

private def performFirstDnsSeedersLookup(): Unit = {
self ! GetNewPeers()
}

override def receive: Receive = {
Expand Down Expand Up @@ -104,6 +109,8 @@ class PeerSynchronizer(val networkControllerRef: ActorRef,
object PeerSynchronizer {
object ReceivableMessages {
case class GetNewPeers()

case class LookupResponse(ipv4Addresses: Seq[InetAddress], ipv6Addresses: Seq[InetAddress])
}
}

Expand Down
7 changes: 7 additions & 0 deletions src/main/scala/scorex/core/network/dns/DnsClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import scorex.core.network.dns.strategy.LookupStrategy
import scorex.util.ScorexLogging

class DnsClient(dnsClientParams: DnsClientInput) extends Actor with ScorexLogging {

import scorex.core.network.dns.DnsClient.ReceivableMessages.LookupRequest

override def receive: Receive =
Expand All @@ -25,12 +26,18 @@ object DnsClient {
object ReceivableMessages {
case class LookupRequest(s: LookupStrategy)
}

object Exception {
final case class DnsLookupException(private val message: String = "")
extends Exception(message)
}
}

object DnsClientRef {
def props(params: DnsClientInput): Props = {
Props(new DnsClient(params))
}

def apply(dnsClientParams: DnsClientInput)(implicit system: ActorSystem): ActorRef = {
system.actorOf(props(dnsClientParams))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package scorex.core.network.dns.model
import scorex.core.network.dns.model.DnsClientInput.defaultLookupFunction

import java.net.InetAddress
import scala.util.Try

case class DnsClientInput(dnsSeeders: Seq[DnsSeederDomain], lookupFunction: DnsSeederDomain => Seq[InetAddress] = defaultLookupFunction)
case class DnsClientInput(dnsSeeders: Seq[DnsSeederDomain], lookupFunction: DnsSeederDomain => Try[Seq[InetAddress]] = defaultLookupFunction)

object DnsClientInput {
def defaultLookupFunction(url: DnsSeederDomain): Seq[InetAddress] = {
InetAddress.getAllByName(url.domainName)
def defaultLookupFunction(url: DnsSeederDomain): Try[Seq[InetAddress]] = {
Try(InetAddress.getAllByName(url.domainName))
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package scorex.core.network.dns.strategy

import scorex.core.network.PeerSynchronizer.ReceivableMessages.LookupResponse
import scorex.core.network.dns.DnsClient.Exception.DnsLookupException
import scorex.core.network.dns.model.DnsClientInput
import scorex.core.network.dns.strategy.Response.LookupResponse

import java.net.{Inet4Address, Inet6Address, InetAddress}
import scala.util.{Failure, Success}

trait LookupStrategy {
def apply(dnsClientParams: DnsClientInput): LookupResponse
Expand Down Expand Up @@ -33,22 +35,28 @@ object Strategy {
val seeders = dnsClientParams.dnsSeeders
val lookupFunction = dnsClientParams.lookupFunction

val exceptionMessages = Seq[String]()

val (ipv4Addresses, ipv6Addresses) = seeders.foldLeft(Seq[InetAddress]()) { (acc, curr) =>
val newElements = if (acc.size < nodesThreshold) {
lookupFunction(curr)
lookupFunction(curr) match {
case Success(value) => value
case Failure(exception) =>
exceptionMessages ++ exception.getMessage
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log the exception also please

Seq[InetAddress]()
}
} else {
Seq[InetAddress]()
}

acc ++ newElements
}.partition {
case _: Inet4Address => true
case _: Inet6Address => false
}

if (ipv4Addresses.isEmpty && ipv6Addresses.isEmpty) throw DnsLookupException("All dns lookups failed: " + exceptionMessages.mkString(", "))

LookupResponse(ipv4Addresses, ipv6Addresses)
}
}

object Response {
case class LookupResponse(ipv4Addresses: Seq[InetAddress], ipv6Addresses: Seq[InetAddress])
}
4 changes: 1 addition & 3 deletions src/main/scala/scorex/core/network/peer/PeerManager.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package scorex.core.network.peer

import java.net.{InetAddress, InetSocketAddress}

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import scorex.core.app.ScorexContext
import scorex.core.network.NetworkController.ReceivableMessages.EmptyPeerDatabase
import scorex.core.network._
import scorex.core.settings.ScorexSettings
import scorex.core.utils.NetworkUtils
Expand Down Expand Up @@ -123,8 +123,6 @@ object PeerManager {

case class RemovePeer(address: InetSocketAddress)

case class EmptyPeerDatabase()

/**
* Message to get peers from known peers map filtered by `choose` function
*/
Expand Down
11 changes: 7 additions & 4 deletions src/test/scala/scorex/core/network/dns/DnsClientSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ import akka.util.Timeout
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.must.Matchers.be
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
import scorex.core.network.PeerSynchronizer.ReceivableMessages.LookupResponse
import scorex.core.network.dns.DnsClient.ReceivableMessages.LookupRequest
import scorex.core.network.dns.model.{DnsClientInput, DnsSeederDomain}
import scorex.core.network.dns.strategy.Response.LookupResponse
import scorex.core.network.dns.strategy.Strategy.{LeastNodeQuantity, MaxNodeQuantity, ThresholdNodeQuantity}

import java.net.InetAddress
import scala.concurrent.Await
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
import scala.util.Try

class DnsClientSpec extends AnyFlatSpec {
private val fakeURLOne = new DnsSeederDomain("fake-url.com")
Expand All @@ -28,10 +29,12 @@ class DnsClientSpec extends AnyFlatSpec {
private val fakeURLThree = new DnsSeederDomain("fake-url3.com")
private val fakeIpSeqThree = Seq(InetAddress.getByName("127.0.1.1"), InetAddress.getByName("127.0.1.2"))

private val badURL = new DnsSeederDomain("bad-url.com")

private val mockLookupFunction = (url: DnsSeederDomain) => url match {
case `fakeURLOne` => fakeIpSeqOne
case `fakeURLTwo` => fakeIpSeqTwo
case `fakeURLThree` => fakeIpSeqThree
case `fakeURLOne` => Try(fakeIpSeqOne)
case `fakeURLTwo` => Try(fakeIpSeqTwo)
case `fakeURLThree` => Try(fakeIpSeqThree)
case _ => throw new IllegalArgumentException("Unexpected url in test")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package scorex.core.network.peer
import akka.actor.{ActorRef, ActorSystem}
import akka.testkit.TestProbe
import scorex.core.app.ScorexContext
import scorex.core.network.peer.PeerManager.ReceivableMessages.{EmptyPeerDatabase, RemovePeer}
import scorex.core.network.NetworkController.ReceivableMessages.EmptyPeerDatabase
import scorex.core.network.peer.PeerManager.ReceivableMessages.RemovePeer
import scorex.network.NetworkTests

import java.net.InetSocketAddress
Expand Down
32 changes: 28 additions & 4 deletions src/test/scala/scorex/flow/DnsLookupFlow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@ import akka.actor.{ActorRef, ActorSystem}
import akka.io.Tcp.{Bind, Bound, Message => _}
import akka.testkit.TestProbe
import scorex.core.app.ScorexContext
import scorex.core.network.NetworkController.ReceivableMessages.EmptyPeerDatabase
import scorex.core.network._
import scorex.core.network.dns.DnsClientRef
import scorex.core.network.dns.model.{DnsClientInput, DnsSeederDomain}
import scorex.core.network.message._
import scorex.core.network.peer.PeerManager.ReceivableMessages.{EmptyPeerDatabase, GetAllPeers}
import scorex.core.network.peer.PeerManager.ReceivableMessages.GetAllPeers
import scorex.core.network.peer._
import scorex.core.settings.ScorexSettings
import scorex.core.utils.LocalTimeProvider
import scorex.network.NetworkTests

import java.net.{InetAddress, InetSocketAddress}
import scala.util.Try

class DnsLookupFlow extends NetworkTests {
type Data = Map[InetSocketAddress, PeerInfo]
Expand All @@ -33,9 +35,9 @@ class DnsLookupFlow extends NetworkTests {
private val fakeIpSeqThree = Seq(InetAddress.getByName("127.0.1.1"), InetAddress.getByName("127.0.1.2"))

private val mockLookupFunction = (url: DnsSeederDomain) => url match {
case `fakeURLOne` => fakeIpSeqOne
case `fakeURLTwo` => fakeIpSeqTwo
case `fakeURLThree` => fakeIpSeqThree
case `fakeURLOne` => Try(fakeIpSeqOne)
case `fakeURLTwo` => Try(fakeIpSeqTwo)
case `fakeURLThree` => Try(fakeIpSeqThree)
case _ => throw new IllegalArgumentException("Unexpected url in test")
}
private val featureSerializers = Map(LocalAddressPeerFeature.featureId -> LocalAddressPeerFeatureSerializer)
Expand Down Expand Up @@ -65,6 +67,28 @@ class DnsLookupFlow extends NetworkTests {
system.terminate()
}

it should "fill peer database with dns response when PeerSynchronizer is instantiated" in {
// Arrange
implicit val system: ActorSystem = ActorSystem()

val tcpManagerProbe = TestProbe()

val probe = TestProbe("probe")(system)
implicit val defaultSender: ActorRef = probe.testActor

val nodeAddr = new InetSocketAddress("88.77.66.55", 12345)
val scorexSettings = settings.copy(network = settings.network.copy(bindAddress = nodeAddr))
val (_, peerManager) = createNetworkController(scorexSettings, tcpManagerProbe)

// Assert
Thread.sleep(1000)
peerManager ! GetAllPeers
val data = probe.expectMsgClass(classOf[Data])
data.size should be(fakeIpSeqOne.size)

system.terminate()
}

private def createNetworkController(settings: ScorexSettings, tcpManagerProbe: TestProbe, upnp: Option[UPnPGateway] = None)(implicit system: ActorSystem) = {
val timeProvider = LocalTimeProvider
val externalAddr = settings.network.declaredAddress
Expand Down