diff --git a/README.md b/README.md index ffd9fcf..aa6c54d 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,12 @@ -# solrs - async solr client for scala +# solrs - async solr client for java/scala [![Build Status](https://travis-ci.org/inoio/solrs.png?branch=master)](https://travis-ci.org/inoio/solrs) [![Maven Central](https://maven-badges.herokuapp.com/maven-central/io.ino/solrs_2.11/badge.svg)](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22io.ino%22%20AND%20a%3Asolrs*_2.11) [![Join the chat at https://gitter.im/inoio/solrs](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/inoio/solrs) -This is a solr client for scala providing a query interface like SolrJ, just asynchronously / non-blocking. +This is a java/scala solr client providing a query interface like SolrJ, just asynchronously / non-blocking +(built on top of [async-http-client](https://github.com/AsyncHttpClient/async-http-client) / [netty](https://github.com/netty/netty)). +For java it supports `CompletableFuture`, for scala you can choose between twitter's `Future` or the standard/SDK `Future`. ## Contents @@ -24,44 +26,101 @@ You must add the library to the dependencies of the build file, e.g. add to `bui libraryDependencies += "io.ino" %% "solrs" % "1.5.0" +or for java projects to pom.xml: + + + io.ino + solrs_2.11 + 1.5.0 + + solrs is published to maven central for both scala 2.10 and 2.11. ## Usage -At first an instance of `AsyncSolrClient` must be created with the url to the Solr server. -This client can then be used to query solr and process future responses. +Solrs supports different `Future` implementations, which affects the result type of `AsyncSolrClient.query`. +For scala there's support for the standard `scala.concurrent.Future` and for twitters `com.twitter.util.Future`. +Which one is chosen is defined by the `io.ino.solrs.future.FutureFactory` that's in scope when building the `AsyncSolrClient` (as shown +below in the code samples). + +For java there's support for `CompletableFuture`. Because java does not support higher kinded types there's +a separate class `JavaAsyncSolrClient` that allows to create new instances and to perform a `query`. -A complete example: +In the following it's shown how to use `JavaAsyncSolrClient`/`AsyncSolrClient`: +[java] +```java +import io.ino.solrs.JavaAsyncSolrClient; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.response.QueryResponse; +import java.util.concurrent.CompletableFuture; + +JavaAsyncSolrClient solr = JavaAsyncSolrClient.create("http://localhost:8983/solr/collection1"); +CompletableFuture response = solr.query(new SolrQuery("java")); +response.thenAccept(r -> System.out.println("found " + r.getResults().getNumFound() + " docs")); + +// At EOL... +solr.shutdown(); +``` + +[scala/SDK] ```scala import io.ino.solrs.AsyncSolrClient -import io.ino.solrs.future.ScalaFutureFactory.Implicit +import io.ino.solrs.future.ScalaFutureFactory.Implicit // or TwitterFutureFactory import org.apache.solr.client.solrj.response.QueryResponse import org.apache.solr.client.solrj.SolrQuery import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global val solr = AsyncSolrClient("http://localhost:8983/solr/collection1") +val response: Future[QueryResponse] = solr.query(new SolrQuery("scala")) +response.onSuccess { + case qr => println(s"found ${qr.getResults.getNumFound} docs") +} -val query = new SolrQuery("scala") -val response: Future[QueryResponse] = solr.query(query) +// Don't forget... +solr.shutdown() +``` + +[scala/twitter] +```scala +import io.ino.solrs.AsyncSolrClient +import io.ino.solrs.future.TwitterFutureFactory.Implicit +import org.apache.solr.client.solrj.SolrQuery +import org.apache.solr.client.solrj.response.QueryResponse +import com.twitter.util.Future +val solr = AsyncSolrClient("http://localhost:8983/solr") +val response: Future[QueryResponse] = solr.query(new SolrQuery("scala")) response.onSuccess { - case qr => println(s"found ${qr.getResults.getNumFound} docs") + qr => println(s"found ${qr.getResults.getNumFound} docs") } -// Just included to present the 'shutdown'... -solr.shutdown +// Finally... +solr.shutdown() ``` The `AsyncSolrClient` can further be configured with an `AsyncHttpClient` instance and the response parser via the `AsyncSolrClient.Builder` (other configuration properties are described in greater detail below): +[java] +```java +import io.ino.solrs.JavaAsyncSolrClient; +import org.apache.solr.client.solrj.impl.XMLResponseParser; +import org.asynchttpclient.DefaultAsyncHttpClient; + +JavaAsyncSolrClient solr = JavaAsyncSolrClient.builder("http://localhost:8983/solr/collection1") + .withHttpClient(new DefaultAsyncHttpClient()) + .withResponseParser(new XMLResponseParser()) + .build(); +``` + +[scala] ```scala -import org.asynchttpclient.DefaultAsyncHttpClient import io.ino.solrs.AsyncSolrClient import io.ino.solrs.future.ScalaFutureFactory.Implicit import org.apache.solr.client.solrj.impl.XMLResponseParser +import org.asynchttpclient.DefaultAsyncHttpClient val solr = AsyncSolrClient.Builder("http://localhost:8983/solr/collection1") .withHttpClient(new DefaultAsyncHttpClient()) @@ -69,18 +128,6 @@ val solr = AsyncSolrClient.Builder("http://localhost:8983/solr/collection1") .build ``` -### Several Future Implementations - -Solrs supports different future implementations, which affects the result type of `AsyncSolrClient.query`. -Perhaps you've already noticed the `import io.ino.solrs.future.ScalaFutureFactory.Implicit` in -the examples above - this brings the factory for standard scala futures in scope (the various `AsyncSolrClient` -builder methods expect a `io.ino.solrs.future.FutureFactory` to be implicitely available). - -Out of the box standard scala futures and twitter futures (`i.i.s.future.TwitterFutureFactory`) are supported. - -Want to roll your own? Just implement `io.ino.solrs.future.FutureFactory` (and the related `Promise` and `Future` traits) -and bring an instance of your future factory in scope. - ### Load Balancing Solrs supports load balancing of queries over multiple solr servers. There are 2 load balancers provided out of the box, @@ -95,6 +142,16 @@ reading them from ZooKeeper (see [Solr Cloud Support](#solr-cloud-support) for m To run solrs with a `RoundRobinLB` you have to pass it to the `Builder` +[java] +```java +import io.ino.solrs.*; +import java.util.Arrays; + +RoundRobinLB lb = RoundRobinLB.create(Arrays.asList("http://localhost:8983/solr/collection1", "http://localhost:8984/solr/collection1")); +JavaAsyncSolrClient solr = JavaAsyncSolrClient.builder(lb).build(); +``` + +[scala] ```scala import io.ino.solrs._ import io.ino.solrs.future.ScalaFutureFactory.Implicit @@ -136,6 +193,25 @@ This can be overridden with `initialTestRuns`. Here's a code sample of the `FastestServerLB`: +[java] +```java +import io.ino.solrs.*; +import java.util.Arrays; +import scala.Tuple2; +import static java.util.concurrent.TimeUnit.*; + +StaticSolrServers servers = StaticSolrServers.create(Arrays.asList("http://localhost:8983/solr/collection1", "http://localhost:8984/solr/collection1")); +Tuple2 col1TestQuery = new Tuple2<>("collection1", new SolrQuery("*:*").setRows(0)); +Function> collectionAndTestQuery = server -> col1TestQuery; +FastestServerLB lb = FastestServerLB.builder(servers, collectionAndTestQuery) + .withMinDelay(50, MILLISECONDS) + .withMaxDelay(5, SECONDS) + .withInitialTestRuns(50) + .build(); +JavaAsyncSolrClient solr = JavaAsyncSolrClient.builder(lb).build(); +``` + +[scala] ```scala import io.ino.solrs._ import scala.concurrent.duration._ @@ -176,6 +252,15 @@ Solr Cloud is supported with the following properties / restrictions: To run solrs connected to SolrCloud / ZooKeeper, you pass an instance of `CloudSolrServers` to `RoundRobinLB`/`FastestServerLB`. The simplest case looks like this: +[java] +```java +import io.ino.solrs.*; + +CloudSolrServers servers = CloudSolrServers.builder("localhost:2181").build(); +JavaAsyncSolrClient solr = JavaAsyncSolrClient.builder(new RoundRobinLB(servers)).build(); +``` + +[scala] ```scala import io.ino.solrs._ import io.ino.solrs.future.ScalaFutureFactory.Implicit @@ -186,22 +271,41 @@ val solr = AsyncSolrClient.Builder(RoundRobinLB(servers)).build Here's an example that shows all configuration properties in use: +[java] +```java +import io.ino.solrs.*; +import java.util.Collections; +import static java.util.concurrent.TimeUnit.*; + +CloudSolrServers servers = CloudSolrServers.builder("host1:2181,host2:2181") + .withZkClientTimeout(15, SECONDS) + .withZkConnectTimeout(10, SECONDS) + .withClusterStateUpdateInterval(1, SECONDS) + .withDefaultCollection("collection1") + .withWarmupQueries((collection) -> Collections.singletonList(new SolrQuery("*:*")), 10) + .build(); +JavaAsyncSolrClient solr = JavaAsyncSolrClient.builder(new RoundRobinLB(servers)).build(); +``` + +[scala] ```scala import io.ino.solrs._ import io.ino.solrs.future.ScalaFutureFactory.Implicit import scala.concurrent.duration._ -val servers = new CloudSolrServers(zkHost = "host1:2181,host2:2181", - zkClientTimeout = 15 seconds, - zkConnectTimeout = 10 seconds, - clusterStateUpdateInterval = 1 second, - defaultCollection = Some("collection1"), - warmupQueries = WarmupQueries("collection1" => Seq(new SolrQuery("*:*")), count = 10)) +val servers = new CloudSolrServers( + zkHost = "host1:2181,host2:2181", + zkClientTimeout = 15 seconds, + zkConnectTimeout = 10 seconds, + clusterStateUpdateInterval = 1 second, + defaultCollection = Some("collection1"), + warmupQueries = WarmupQueries("collection1" => Seq(new SolrQuery("*:*")), count = 10)) val solr = AsyncSolrClient.Builder(RoundRobinLB(servers)).build ``` -Remember to either specify a default collection or set the collection to use per query: +Remember to either specify a default collection (as shown above) or set the collection to use per query: +[scala] ```scala import org.apache.solr.client.solrj.SolrQuery @@ -226,13 +330,23 @@ currently provided (you can implement your own of course): The retry policy can be configured via the `Builder`, like this: +[java] +```java +import io.ino.solrs.*; + +JavaAsyncSolrClient solr = JavaAsyncSolrClient.builder(new RoundRobinLB(CloudSolrServers.builder("host1:2181,host2:2181").build())) + .withRetryPolicy(RetryPolicy.TryAvailableServers()) + .build(); +``` + +[scala] ```scala import io.ino.solrs._ import io.ino.solrs.future.ScalaFutureFactory.Implicit val solr = AsyncSolrClient.Builder(RoundRobinLB(new CloudSolrServers("host1:2181,host2:2181"))) - .withRetryPolicy(RetryPolicy.TryAvailableServers) - .build + .withRetryPolicy(RetryPolicy.TryAvailableServers) + .build ``` There's not yet support for delaying retries, raise an issue or submit a pull request for this if you need it. @@ -241,6 +355,7 @@ There's not yet support for delaying retries, raise an issue or submit a pull re Solrs allows to intercept queries sent to Solr, here's an example that shows how to log details about each request: +[scala] ```scala import io.ino.solrs._ import io.ino.solrs.future.ScalaFutureFactory.Implicit @@ -269,6 +384,7 @@ happy with this great [metrics library](http://metrics.codahale.com/) :-) To configure solrs with the `Metrics` implementation just pass an initialized instance like this: +[scala] ```scala import io.ino.solrs._ import io.ino.solrs.future.ScalaFutureFactory.Implicit diff --git a/build.sbt b/build.sbt index b03e0c2..42d16f2 100644 --- a/build.sbt +++ b/build.sbt @@ -19,7 +19,7 @@ javacOptions ++= Seq("-source", "1.8", "-target", "1.8", "-Xlint") initialize := { val _ = initialize.value if (sys.props("java.specification.version") != "1.8") - sys.error("Java 8 is required for this project.") + sys.error(s"Java 8 is required for this project. Running: ${sys.props("java.specification.version")}") } resolvers ++= Seq( @@ -51,6 +51,7 @@ libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-actor" % "2.3.15", "org.slf4j" % "slf4j-simple" % slf4jVersion % "test", "org.scalatest" %% "scalatest" % "2.2.6" % "test", + "com.novocode" % "junit-interface" % "0.11" % "test", "org.mockito" % "mockito-core" % "1.10.19" % "test", "org.clapper" %% "grizzled-scala" % "2.5.0" % "test", // Cloud testing, solr-core for ZkController (upconfig), curator-test for ZK TestingServer diff --git a/src/main/scala/io/ino/solrs/AsyncSolrClient.scala b/src/main/scala/io/ino/solrs/AsyncSolrClient.scala index 4c7496e..68b30af 100644 --- a/src/main/scala/io/ino/solrs/AsyncSolrClient.scala +++ b/src/main/scala/io/ino/solrs/AsyncSolrClient.scala @@ -4,6 +4,7 @@ import java.io.IOException import java.util.Locale import akka.actor.ActorSystem +import io.ino.solrs.AsyncSolrClient.Builder import io.ino.solrs.HttpUtils._ import io.ino.solrs.RetryDecision.Result import io.ino.solrs.future.Future @@ -32,43 +33,67 @@ import scala.util.control.NonFatal object AsyncSolrClient { + /* The function that creates that actual instance of AsyncSolrClient */ + private[solrs] type ASCFactory[F[_], ASC <: AsyncSolrClient[F]] = (LoadBalancer, AsyncHttpClient, /*shutdownHttpClient*/ Boolean, + Option[RequestInterceptor], ResponseParser, Metrics, Option[ServerStateObservation[F]], RetryPolicy) => ASC + def apply[F[_]](baseUrl: String)(implicit futureFactory: FutureFactory[F] = ScalaFutureFactory) = - new Builder(new SingleServerLB(baseUrl)).build + new Builder(new SingleServerLB(baseUrl), ascFactory[F] _).build def apply[F[_]](loadBalancer: LoadBalancer)(implicit futureFactory: FutureFactory[F]) = - new Builder(loadBalancer).build + new Builder(loadBalancer, ascFactory[F] _).build object Builder { def apply[F[_]](baseUrl: String)(implicit futureFactory: FutureFactory[F]) = - new Builder(baseUrl) + new Builder(baseUrl, ascFactory[F] _) def apply[F[_]](loadBalancer: LoadBalancer)(implicit futureFactory: FutureFactory[F]) = - new Builder(loadBalancer) + new Builder(loadBalancer, ascFactory[F] _) } - case class Builder[F[_]] private (loadBalancer: LoadBalancer, + private[solrs] def ascFactory[F[_]](loadBalancer: LoadBalancer, + httpClient: AsyncHttpClient, + shutdownHttpClient: Boolean, + requestInterceptor: Option[RequestInterceptor], + responseParser: ResponseParser, + metrics: Metrics, + serverStateObservation: Option[ServerStateObservation[F]], + retryPolicy: RetryPolicy)(implicit futureFactory: FutureFactory[F]): AsyncSolrClient[F] = + new AsyncSolrClient[F]( + loadBalancer, + httpClient, + shutdownHttpClient, + requestInterceptor, + responseParser, + metrics, + serverStateObservation, + retryPolicy + ) + + case class Builder[F[_], ASC <: AsyncSolrClient[F]] protected (loadBalancer: LoadBalancer, httpClient: Option[AsyncHttpClient], shutdownHttpClient: Boolean, requestInterceptor: Option[RequestInterceptor] = None, responseParser: Option[ResponseParser] = None, metrics: Option[Metrics] = None, serverStateObservation: Option[ServerStateObservation[F]] = None, - retryPolicy: RetryPolicy = RetryPolicy.TryOnce)(implicit futureFactory: FutureFactory[F]) { + retryPolicy: RetryPolicy = RetryPolicy.TryOnce, + factory: ASCFactory[F, ASC])(implicit futureFactory: FutureFactory[F]) { - def this(loadBalancer: LoadBalancer)(implicit futureFactory: FutureFactory[F]) = this(loadBalancer, None, true) - def this(baseUrl: String)(implicit futureFactory: FutureFactory[F]) = this(new SingleServerLB(baseUrl)) + def this(loadBalancer: LoadBalancer, factory: ASCFactory[F, ASC])(implicit futureFactory: FutureFactory[F]) = this(loadBalancer, None, true, factory = factory) + def this(baseUrl: String, factory: ASCFactory[F, ASC])(implicit futureFactory: FutureFactory[F]) = this(new SingleServerLB(baseUrl), factory = factory) - def withHttpClient(httpClient: AsyncHttpClient): Builder[F] = { + def withHttpClient(httpClient: AsyncHttpClient): Builder[F, ASC] = { copy(httpClient = Some(httpClient), shutdownHttpClient = false) } - def withRequestInterceptor(requestInterceptor: RequestInterceptor): Builder[F] = { + def withRequestInterceptor(requestInterceptor: RequestInterceptor): Builder[F, ASC] = { copy(requestInterceptor = Some(requestInterceptor)) } - def withResponseParser(responseParser: ResponseParser): Builder[F] = { + def withResponseParser(responseParser: ResponseParser): Builder[F, ASC] = { copy(responseParser = Some(responseParser)) } - def withMetrics(metrics: Metrics): Builder[F] = { + def withMetrics(metrics: Metrics): Builder[F, ASC] = { copy(metrics = Some(metrics)) } @@ -77,14 +102,14 @@ object AsyncSolrClient { */ def withServerStateObservation(serverStateObserver: ServerStateObserver[F], checkInterval: FiniteDuration, - actorSystem: ActorSystem): Builder[F] = { + actorSystem: ActorSystem): Builder[F, ASC] = { copy(serverStateObservation = Some(ServerStateObservation[F](serverStateObserver, checkInterval, actorSystem, futureFactory))) } /** * Configure the retry policy to apply for failed requests. */ - def withRetryPolicy(retryPolicy: RetryPolicy): Builder[F] = { + def withRetryPolicy(retryPolicy: RetryPolicy): Builder[F, ASC] = { copy(retryPolicy = retryPolicy) } @@ -109,8 +134,8 @@ object AsyncSolrClient { set(loadBalancer, solr) } - def build: AsyncSolrClient[F] = { - val res = new AsyncSolrClient[F]( + def build: ASC = { + val res = factory( loadBalancer, httpClient.getOrElse(createHttpClient), shutdownHttpClient, @@ -134,14 +159,14 @@ object AsyncSolrClient { * * @author Martin Grotzke */ -class AsyncSolrClient[F[_]] private (val loadBalancer: LoadBalancer, - val httpClient: AsyncHttpClient, - shutdownHttpClient: Boolean, - requestInterceptor: Option[RequestInterceptor] = None, - responseParser: ResponseParser = new BinaryResponseParser, - val metrics: Metrics = NoopMetrics, - serverStateObservation: Option[ServerStateObservation[F]] = None, - retryPolicy: RetryPolicy = RetryPolicy.TryOnce)(implicit futureFactory: FutureFactory[F]) { +class AsyncSolrClient[F[_]] protected (private[solrs] val loadBalancer: LoadBalancer, + httpClient: AsyncHttpClient, + shutdownHttpClient: Boolean, + requestInterceptor: Option[RequestInterceptor] = None, + responseParser: ResponseParser = new BinaryResponseParser, + metrics: Metrics = NoopMetrics, + serverStateObservation: Option[ServerStateObservation[F]] = None, + retryPolicy: RetryPolicy = RetryPolicy.TryOnce)(implicit futureFactory: FutureFactory[F]) { private val UTF_8 = "UTF-8" private val DEFAULT_PATH = "/select" @@ -383,6 +408,49 @@ class AsyncSolrClient[F[_]] private (val loadBalancer: LoadBalancer, } +trait TypedAsyncSolrClient[F[_], ASC <: AsyncSolrClient[F]] { + + protected implicit def futureFactory: FutureFactory[F] + + protected def build(loadBalancer: LoadBalancer, + httpClient: AsyncHttpClient, + shutdownHttpClient: Boolean, + requestInterceptor: Option[RequestInterceptor], + responseParser: ResponseParser, + metrics: Metrics, + serverStateObservation: Option[ServerStateObservation[F]], + retryPolicy: RetryPolicy): ASC + + def builder(baseUrl: String) = new Builder[F, ASC](new SingleServerLB(baseUrl), build _) + def builder(loadBalancer: LoadBalancer) = new Builder[F, ASC](loadBalancer, build _) + +} + +import scala.concurrent.{Future => SFuture} + +// TODO: can be removed? +class ScalaAsyncSolrClient(override private[solrs] val loadBalancer: LoadBalancer, + httpClient: AsyncHttpClient, + shutdownHttpClient: Boolean, + requestInterceptor: Option[RequestInterceptor] = None, + responseParser: ResponseParser = new BinaryResponseParser, + metrics: Metrics = NoopMetrics, + serverStateObservation: Option[ServerStateObservation[SFuture]] = None, + retryPolicy: RetryPolicy = RetryPolicy.TryOnce) + extends AsyncSolrClient[SFuture](loadBalancer, httpClient, shutdownHttpClient, requestInterceptor, responseParser, metrics, serverStateObservation, retryPolicy)(ScalaFutureFactory) { + + /** + * @inheritdoc + */ + override def query(q: SolrQuery): SFuture[QueryResponse] = super.query(q) + + /** + * @inheritdoc + */ + override def queryPreferred(q: SolrQuery, preferred: Option[SolrServer]): SFuture[(QueryResponse, SolrServer)] = super.queryPreferred(q, preferred) + +} + trait AsyncSolrClientAware[F[_]] { /** diff --git a/src/main/scala/io/ino/solrs/JavaAsyncSolrClient.scala b/src/main/scala/io/ino/solrs/JavaAsyncSolrClient.scala new file mode 100644 index 0000000..ad07b5c --- /dev/null +++ b/src/main/scala/io/ino/solrs/JavaAsyncSolrClient.scala @@ -0,0 +1,77 @@ +package io.ino.solrs + +import java.util.concurrent.CompletableFuture + +import org.asynchttpclient.AsyncHttpClient +import io.ino.solrs.AsyncSolrClient.Builder +import io.ino.solrs.future.{JavaFutureFactory, FutureFactory} +import org.apache.solr.client.solrj.response.QueryResponse +import org.apache.solr.client.solrj.{SolrQuery, ResponseParser} +import org.apache.solr.client.solrj.impl.BinaryResponseParser + +import scala.language.higherKinds + +/** + * Java API: Async, non-blocking Solr Server that just allows to `query(SolrQuery)`. + * The usage shall be similar to the solrj SolrServer, + * so query returns a [[java.util.concurrent.CompletableFuture CompletableFuture]] of a + * [[org.apache.solr.client.solrj.response.QueryResponse QueryResponse]]. + * + * Example usage: + * {{{ + * JavaAsyncSolrClient solr = JavaAsyncSolrClient.create("http://localhost:" + solrRunner.port + "/solr/collection1"); + * CompletableFuture response = solr.query(new SolrQuery("*:*")); + * response.thenAccept(r -> System.out.println("found "+ r.getResults().getNumFound() +" docs")); + * }}} + */ +class JavaAsyncSolrClient(override private[solrs] val loadBalancer: LoadBalancer, + httpClient: AsyncHttpClient, + shutdownHttpClient: Boolean, + requestInterceptor: Option[RequestInterceptor] = None, + responseParser: ResponseParser = new BinaryResponseParser, + metrics: Metrics = NoopMetrics, + serverStateObservation: Option[ServerStateObservation[CompletableFuture]] = None, + retryPolicy: RetryPolicy = RetryPolicy.TryOnce) + extends AsyncSolrClient[CompletableFuture](loadBalancer, httpClient, shutdownHttpClient, requestInterceptor, responseParser, metrics, serverStateObservation, retryPolicy)(JavaFutureFactory) { + + /** + * @inheritdoc + */ + override def query(q: SolrQuery): CompletableFuture[QueryResponse] = super.query(q) + + /** + * @inheritdoc + */ + override def queryPreferred(q: SolrQuery, preferred: Option[SolrServer]): CompletableFuture[(QueryResponse, SolrServer)] = super.queryPreferred(q, preferred) + +} + +object JavaAsyncSolrClient extends TypedAsyncSolrClient[CompletableFuture, JavaAsyncSolrClient] { + + private implicit val ff = JavaFutureFactory + + override protected def futureFactory: FutureFactory[CompletableFuture] = JavaFutureFactory + + def create(url: String): JavaAsyncSolrClient = builder(url).build + + override def builder(url: String): Builder[CompletableFuture, JavaAsyncSolrClient] = new Builder(url, build _) + override def builder(loadBalancer: LoadBalancer): Builder[CompletableFuture, JavaAsyncSolrClient] = new Builder(loadBalancer, build _) + + override protected def build(loadBalancer: LoadBalancer, + httpClient: AsyncHttpClient, + shutdownHttpClient: Boolean, + requestInterceptor: Option[RequestInterceptor], + responseParser: ResponseParser, + metrics: Metrics, + serverStateObservation: Option[ServerStateObservation[CompletableFuture]], + retryPolicy: RetryPolicy): JavaAsyncSolrClient = + new JavaAsyncSolrClient( + loadBalancer, + httpClient, + shutdownHttpClient, + requestInterceptor, + responseParser, + metrics, + serverStateObservation, + retryPolicy) +} \ No newline at end of file diff --git a/src/main/scala/io/ino/solrs/LoadBalancer.scala b/src/main/scala/io/ino/solrs/LoadBalancer.scala index 2ef3117..9688e0e 100644 --- a/src/main/scala/io/ino/solrs/LoadBalancer.scala +++ b/src/main/scala/io/ino/solrs/LoadBalancer.scala @@ -1,9 +1,13 @@ package io.ino.solrs +import java.util.concurrent.CompletableFuture import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.{Executors, TimeUnit} +import io.ino.solrs.CloudSolrServers.Builder +import io.ino.solrs.CloudSolrServers.WarmupQueries import io.ino.solrs.ServerStateChangeObservable.{Removed, StateChange, StateChanged} +import io.ino.solrs.future.JavaFutureFactory import io.ino.solrs.future.{Future, FutureFactory} import io.ino.time.Clock import io.ino.time.Units.Millisecond @@ -90,6 +94,10 @@ class RoundRobinLB(override val solrServers: SolrServers) extends LoadBalancer { object RoundRobinLB { def apply(solrServers: SolrServers): RoundRobinLB = new RoundRobinLB(solrServers) def apply(baseUrls: IndexedSeq[String]): RoundRobinLB = new RoundRobinLB(StaticSolrServers(baseUrls)) + + /* Java API */ + import scala.collection.JavaConversions._ + def create(baseUrls: java.lang.Iterable[String]): RoundRobinLB = apply(baseUrls.toIndexedSeq) } /** @@ -376,6 +384,78 @@ object FastestServerLB { val TestQueryClass = "testQuery" + /* Java API */ + import java.lang.{Boolean => JBoolean} + import java.lang.{Long => JLong} + import java.util.function.{Function => JFunction} + import java.util.function.BiFunction + import java.util.function.LongFunction + case class Builder(solrServers: SolrServers, + collectionAndTestQuery: SolrServer => (String, SolrQuery), + minDelay: Duration = 100 millis, + maxDelay: Duration = 10 seconds, + initialTestRuns: Int = 10, + filterFastServers: Long => ((SolrServer, Long)) => Boolean = + average => { + case (_, duration) => duration <= average * 1.1 + 5 + }, + mapPredictedResponseTime: Long => Long = identity, + clock: Clock = Clock.systemDefault) { + + /** The minimum delay between the response of a test and the start of the next test (to limit test frequency) */ + def withMinDelay(value: Long, unit: TimeUnit): Builder = copy(minDelay = FiniteDuration(value, unit)) + + /** The delay between tests for slow servers (or all servers if there are no real requests) */ + def withMaxDelay(value: Long, unit: TimeUnit): Builder = copy(maxDelay = FiniteDuration(value, unit)) + + /** On start each active server is tested the given number of times to gather initial stats and determine fast/slow servers. */ + def withInitialTestRuns(count: Int): Builder = copy(initialTestRuns = count) + + /** A function to filter fast / preferred servers. The function takes the calculated average duration of all servers + * of a collection, and returns a function for a SolrServer->Duration tuple that returns true/false to indicate if + * a server should be considered "fast".
+ * The default value for filterFastServers uses `duration <= average * 1.1 + 5` (use 1.1 as multiplier to accept + * some deviation, for smaller values like 1 or 2 millis also add some fix value to allow normal deviation). */ + def withFilterFastServers(filter: LongFunction[BiFunction[SolrServer, JLong, JBoolean]]): Builder = copy( + filterFastServers = average => { + case (server, duration) => filter(average)(server, duration) + } + ) + + /** A function that's applied to the predicted response time. This can e.g. be used to quantize the time so that minor differences are ignored. */ + def withMapPredictedResponseTime(mapPredictedResponseTime: JFunction[JLong, JLong]): Builder = copy(mapPredictedResponseTime = input => mapPredictedResponseTime(input)) + + /** The clock to get the current time from. */ + def withClock(clock: Clock): Builder = copy(clock = clock) + + def build(): FastestServerLB[CompletableFuture] = build(JavaFutureFactory) + + def build[F[_]](implicit futureFactory: FutureFactory[F]) = new FastestServerLB[F]( + solrServers, + collectionAndTestQuery, + minDelay, + maxDelay, + initialTestRuns, + filterFastServers, + mapPredictedResponseTime, + clock + ) + + } + + /** + * + * @param solrServers solr servers to load balance, those are regularly tested. + * @param collectionAndTestQuery a function that returns the collection name and a testQuery for the given server. + * The collection is used to partition server when classifying "fast"/"slow" servers, + * because for different collections response times will be different. + * It's somehow similar with the testQuery: it might be different per server, e.g. + * some server might only provide a /suggest handler while others provide /select + * (which can be specified via the "qt" query param in the test query). + */ + def builder(solrServers: SolrServers, collectionAndTestQuery: JFunction[SolrServer, (String, SolrQuery)]): Builder = + Builder(solrServers, server => collectionAndTestQuery(server)) + } import javax.management.openmbean.{CompositeData, TabularData} diff --git a/src/main/scala/io/ino/solrs/RetryPolicy.scala b/src/main/scala/io/ino/solrs/RetryPolicy.scala index 0eb6bcf..b70e991 100644 --- a/src/main/scala/io/ino/solrs/RetryPolicy.scala +++ b/src/main/scala/io/ino/solrs/RetryPolicy.scala @@ -5,7 +5,7 @@ import scala.annotation.tailrec /** * Specifies a policy for retrying query failures. */ -trait RetryPolicy { +abstract class RetryPolicy { /** * Determines whether the framework should retry a query for the given * exception, the failed server and the query context (provides information about previously failed diff --git a/src/main/scala/io/ino/solrs/SolrServers.scala b/src/main/scala/io/ino/solrs/SolrServers.scala index d7363f8..4d12aff 100644 --- a/src/main/scala/io/ino/solrs/SolrServers.scala +++ b/src/main/scala/io/ino/solrs/SolrServers.scala @@ -1,18 +1,21 @@ package io.ino.solrs +import java.util.concurrent.CompletableFuture import java.util.concurrent.{Executors, ScheduledExecutorService, ThreadFactory} import org.asynchttpclient.{AsyncCompletionHandler, AsyncHttpClient, Response} import io.ino.solrs.CloudSolrServers.WarmupQueries import io.ino.solrs.ServerStateChangeObservable.StateChange +import io.ino.solrs.future.JavaFutureFactory import org.apache.solr.client.solrj.response.QueryResponse import org.apache.solr.client.solrj.{SolrQuery, SolrServerException} import org.apache.solr.common.cloud._ import org.slf4j.LoggerFactory - import io.ino.solrs.future.{Future, FutureFactory, ScalaFutureFactory} + import scala.collection.mutable -import scala.language.{postfixOps, higherKinds} +import scala.language.postfixOps +import scala.language.{higherKinds, postfixOps} import scala.util.control.NonFatal import scala.util.{Failure, Success, Try} @@ -36,6 +39,10 @@ class StaticSolrServers(override val all: IndexedSeq[SolrServer]) extends SolrSe } object StaticSolrServers { def apply(baseUrls: IndexedSeq[String]): StaticSolrServers = new StaticSolrServers(baseUrls.map(SolrServer(_))) + + /* Java API */ + import scala.collection.JavaConversions._ + def create(baseUrls: java.lang.Iterable[String]): StaticSolrServers = apply(baseUrls.toIndexedSeq) } import java.util.concurrent.TimeUnit @@ -259,6 +266,37 @@ object CloudSolrServers { private val logger = LoggerFactory.getLogger(getClass) + /* Java API */ + case class Builder(zkHost: String, + zkClientTimeout: Duration = 15 seconds, /* default from Solr Core, see also SOLR-5221*/ + zkConnectTimeout: Duration = 10 seconds, /* default from solrj CloudSolrServer*/ + clusterStateUpdateInterval: Duration = 1 second, + defaultCollection: Option[String] = None, + warmupQueries: Option[WarmupQueries] = None) { + def withZkClientTimeout(value: Long, unit: TimeUnit): Builder = copy(zkClientTimeout = FiniteDuration(value, unit)) + def withZkConnectTimeout(value: Long, unit: TimeUnit): Builder = copy(zkConnectTimeout = FiniteDuration(value, unit)) + def withClusterStateUpdateInterval(value: Long, unit: TimeUnit): Builder = copy(clusterStateUpdateInterval = FiniteDuration(value, unit)) + def withDefaultCollection(collection: String): Builder = copy(defaultCollection = Some(collection)) + import java.util.function.{Function => JFunction} + import java.lang.{Iterable => JIterable} + def withWarmupQueries(queriesByCollection: JFunction[String, JIterable[SolrQuery]], count: Int): Builder = { + def delegate(collection: String): Seq[SolrQuery] = { + val res = queriesByCollection(collection) + import scala.collection.convert.WrapAsScala._ + res.toList + } + copy(warmupQueries = Some(WarmupQueries(delegate, count))) + } + + def build(): CloudSolrServers[CompletableFuture] = new CloudSolrServers(zkHost, zkConnectTimeout, zkConnectTimeout, clusterStateUpdateInterval, defaultCollection, warmupQueries)(JavaFutureFactory) + + def build[F[_]](implicit futureFactory: FutureFactory[F]): CloudSolrServers[F] = + new CloudSolrServers(zkHost, zkConnectTimeout, zkConnectTimeout, clusterStateUpdateInterval, defaultCollection, warmupQueries) + } + + /* Java API */ + def builder(zkHost: String): Builder = Builder(zkHost) + private[solrs] def getCollectionToServers(clusterState: ClusterState): Map[String, IndexedSeq[SolrServer]] = { import scala.collection.JavaConversions._ diff --git a/src/main/scala/io/ino/solrs/future/JavaFutureFactory.scala b/src/main/scala/io/ino/solrs/future/JavaFutureFactory.scala index 63b5c31..7ffe3a9 100644 --- a/src/main/scala/io/ino/solrs/future/JavaFutureFactory.scala +++ b/src/main/scala/io/ino/solrs/future/JavaFutureFactory.scala @@ -96,3 +96,5 @@ class JavaFutureFactory extends FutureFactory[CompletableFuture] { def newPromise[T] = new JavaPromise[T] } + +object JavaFutureFactory extends JavaFutureFactory \ No newline at end of file diff --git a/src/test/java/io/ino/solrs/JavaAPITest.java b/src/test/java/io/ino/solrs/JavaAPITest.java new file mode 100644 index 0000000..fc205ab --- /dev/null +++ b/src/test/java/io/ino/solrs/JavaAPITest.java @@ -0,0 +1,131 @@ +package io.ino.solrs; + +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.impl.XMLResponseParser; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.asynchttpclient.DefaultAsyncHttpClient; +import org.junit.BeforeClass; +import org.junit.Test; +import org.scalatest.junit.JUnitSuite; +import scala.Option; +import scala.Tuple2; +import scala.concurrent.duration.FiniteDuration; + +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class JavaAPITest extends JUnitSuite { + + private static final long serialVersionUID = 1; + + private static SolrRunner solrRunner; + + @BeforeClass + public static void beforeClass() { + solrRunner = SolrRunner.startOnce(8888, Option.empty()).awaitReady(10, SECONDS); + } + + @Test + public void testAsyncSolrClient() throws ExecutionException, InterruptedException { + JavaAsyncSolrClient solr = JavaAsyncSolrClient.create("http://localhost:" + solrRunner.port() + "/solr/collection1"); + CompletableFuture response = solr.query(new SolrQuery("*:*")); + response.thenAccept(r -> System.out.println("found "+ r.getResults().getNumFound() +" docs")); + assertNotNull(response.get().getResults()); + } + + @Test + public void testAsyncSolrClientBuilderUrl() throws ExecutionException, InterruptedException { + JavaAsyncSolrClient solr = JavaAsyncSolrClient.builder("http://localhost:" + solrRunner.port() + "/solr/collection1") + .withHttpClient(new DefaultAsyncHttpClient()) + .withResponseParser(new XMLResponseParser()) + .build(); + CompletableFuture response = solr.query(new SolrQuery("*:*")); + response.thenAccept(r -> System.out.println("found "+ r.getResults().getNumFound() +" docs")); + assertNotNull(response.get().getResults()); + } + + @Test + public void testRoundRobinLB() { + RoundRobinLB lb = RoundRobinLB.create(Arrays.asList("http://localhost:8983/solr/collection1", "http://localhost:8984/solr/collection1")); + assertNotNull(lb); + } + + @Test + public void testFastestServerLB() { + StaticSolrServers servers = StaticSolrServers.create(Arrays.asList("http://localhost:8983/solr/collection1", "http://localhost:8984/solr/collection1")); + // TODO: api feels unnatural + Tuple2 col1TestQuery = new Tuple2<>("collection1", new SolrQuery("*:*").setRows(0)); + Function> collectionAndTestQuery = server -> col1TestQuery; + FastestServerLB lb = null; + try { + lb = FastestServerLB.builder(servers, collectionAndTestQuery) + .withMinDelay(50, TimeUnit.MILLISECONDS) + .withMaxDelay(5, TimeUnit.SECONDS) + .withFilterFastServers(averageDuration -> (server, duration) -> duration <= averageDuration) + .withMapPredictedResponseTime(prediction -> prediction * 10) + .withInitialTestRuns(50) + .build(); + + assertNotNull(lb); + + JavaAsyncSolrClient solrs = JavaAsyncSolrClient.builder(lb).build(); + assertNotNull(solrs.loadBalancer()); + } finally { + if(lb != null) lb.shutdown(); + } + } + + @Test + public void testAsyncSolrClientBuilderLB() throws ExecutionException, InterruptedException { + JavaAsyncSolrClient solr = JavaAsyncSolrClient.builder(new SingleServerLB("http://localhost:" + solrRunner.port() + "/solr/collection1")) + .withHttpClient(new DefaultAsyncHttpClient()) + .withResponseParser(new XMLResponseParser()) + .build(); + CompletableFuture response = solr.query(new SolrQuery("*:*")); + response.thenAccept(r -> System.out.println("found "+ r.getResults().getNumFound() +" docs")); + assertNotNull(response.get().getResults()); + } + + @Test + public void testStaticSolrServers() { + StaticSolrServers servers = StaticSolrServers.create(Arrays.asList("http://localhost:8983/solr/collection1", "http://localhost:8984/solr/collection1")); + assertNotNull(servers); + } + + @Test + public void testCloudSolrServers() { + CloudSolrServers.Builder builder = CloudSolrServers.builder("host1:2181,host2:2181") + .withZkClientTimeout(15, SECONDS) + .withZkConnectTimeout(10, SECONDS) + .withClusterStateUpdateInterval(1, SECONDS) + .withDefaultCollection("collection1") + .withWarmupQueries((collection) -> Collections.singletonList(new SolrQuery("*:*")), 10); + assertNotNull(builder); + assertEquals("host1:2181,host2:2181", builder.zkHost()); + assertEquals(FiniteDuration.apply(15, SECONDS), builder.zkClientTimeout()); + assertEquals(FiniteDuration.apply(10, SECONDS), builder.zkConnectTimeout()); + assertEquals(FiniteDuration.apply(1, SECONDS), builder.clusterStateUpdateInterval()); + assertEquals(Option.apply("collection1"), builder.defaultCollection()); + assertTrue(builder.warmupQueries().isDefined()); + assertEquals(10, builder.warmupQueries().get().count()); + + CloudSolrServers build = builder.build(); + } + + @Test + public void testRetryPolicy() { + assertNotNull(RetryPolicy.AtMost(2)); + assertNotNull(RetryPolicy.TryAvailableServers()); + assertNotNull(RetryPolicy.TryOnce()); + } + +} diff --git a/src/test/java/io/ino/solrs/usage/Usage.java b/src/test/java/io/ino/solrs/usage/Usage.java new file mode 100644 index 0000000..b9dc35a --- /dev/null +++ b/src/test/java/io/ino/solrs/usage/Usage.java @@ -0,0 +1,93 @@ +package io.ino.solrs.usage; + +import io.ino.solrs.AsyncSolrClient; +import io.ino.solrs.CloudSolrServers; +import io.ino.solrs.FastestServerLB; +import io.ino.solrs.JavaAsyncSolrClient; +import io.ino.solrs.RequestInterceptor; +import io.ino.solrs.RetryPolicy; +import io.ino.solrs.RoundRobinLB; +import io.ino.solrs.SolrServer; +import io.ino.solrs.StaticSolrServers; +import io.ino.solrs.future.Future; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.impl.XMLResponseParser; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.asynchttpclient.DefaultAsyncHttpClient; +import scala.Function1; +import scala.Function2; +import scala.Tuple2; + +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import static java.util.concurrent.TimeUnit.*; + +public class Usage { + + { + JavaAsyncSolrClient solr = JavaAsyncSolrClient.create("http://localhost:8983/solr/collection1"); + CompletableFuture response = solr.query(new SolrQuery("java")); + response.thenAccept(r -> System.out.println("found " + r.getResults().getNumFound() + " docs")); + } + + { + JavaAsyncSolrClient solr = JavaAsyncSolrClient.builder("http://localhost:8983/solr/collection1") + .withHttpClient(new DefaultAsyncHttpClient()) + .withResponseParser(new XMLResponseParser()) + .build(); + } + + { + RoundRobinLB lb = RoundRobinLB.create(Arrays.asList("http://localhost:8983/solr/collection1", "http://localhost:8984/solr/collection1")); + JavaAsyncSolrClient solr = JavaAsyncSolrClient.builder(lb).build(); + } + + { + StaticSolrServers servers = StaticSolrServers.create(Arrays.asList("http://localhost:8983/solr/collection1", "http://localhost:8984/solr/collection1")); + // TODO: api feels unnatural + Tuple2 col1TestQuery = new Tuple2<>("collection1", new SolrQuery("*:*").setRows(0)); + Function> collectionAndTestQuery = server -> col1TestQuery; + FastestServerLB lb = FastestServerLB.builder(servers, collectionAndTestQuery) + .withMinDelay(50, MILLISECONDS) + .withMaxDelay(5, SECONDS) + .withInitialTestRuns(50) + .build(); + JavaAsyncSolrClient solr = JavaAsyncSolrClient.builder(lb).build(); + + // finally also shutdown the lb... + lb.shutdown(); + } + + { + CloudSolrServers servers = CloudSolrServers.builder("localhost:2181").build(); + JavaAsyncSolrClient solr = JavaAsyncSolrClient.builder(new RoundRobinLB(servers)).build(); + } + + { + CloudSolrServers servers = CloudSolrServers.builder("host1:2181,host2:2181") + .withZkClientTimeout(15, SECONDS) + .withZkConnectTimeout(10, SECONDS) + .withClusterStateUpdateInterval(1, SECONDS) + .withDefaultCollection("collection1") + .withWarmupQueries((collection) -> Collections.singletonList(new SolrQuery("*:*")), 10) + .build(); + JavaAsyncSolrClient solr = JavaAsyncSolrClient.builder(new RoundRobinLB(servers)).build(); + } + + { + JavaAsyncSolrClient solr = JavaAsyncSolrClient + .builder(new RoundRobinLB(CloudSolrServers.builder("localhost:2181").build())) + .withRetryPolicy(RetryPolicy.TryAvailableServers()) + .build(); + + + } + + { + } + +} \ No newline at end of file diff --git a/src/test/scala/io/ino/solrs/AsyncSolrClientSpec.scala b/src/test/scala/io/ino/solrs/AsyncSolrClientSpec.scala index 9dd9020..7f80098 100644 --- a/src/test/scala/io/ino/solrs/AsyncSolrClientSpec.scala +++ b/src/test/scala/io/ino/solrs/AsyncSolrClientSpec.scala @@ -49,7 +49,7 @@ class AsyncSolrClientSpec extends StandardFunSpec { it("should shutdown http client if it was not provided") { val ahcMock = mock[AsyncHttpClient] - val solr = new AsyncSolrClient.Builder("http://localhost:12345/solr") { + val solr = new AsyncSolrClient.Builder("http://localhost:12345/solr", ascFactory) { override def createHttpClient = ahcMock }.build diff --git a/src/test/scala/io/ino/solrs/CloudSolrServersIntegrationSpec.scala b/src/test/scala/io/ino/solrs/CloudSolrServersIntegrationSpec.scala index ce3ebdf..4f871f3 100644 --- a/src/test/scala/io/ino/solrs/CloudSolrServersIntegrationSpec.scala +++ b/src/test/scala/io/ino/solrs/CloudSolrServersIntegrationSpec.scala @@ -14,7 +14,7 @@ import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.{Millis, Span} import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.{Future, Promise} +import scala.concurrent.Future import scala.concurrent.duration._ import scala.language.postfixOps diff --git a/src/test/scala/io/ino/solrs/FastestServerLBSpec.scala b/src/test/scala/io/ino/solrs/FastestServerLBSpec.scala index 351c43d..6b775a5 100644 --- a/src/test/scala/io/ino/solrs/FastestServerLBSpec.scala +++ b/src/test/scala/io/ino/solrs/FastestServerLBSpec.scala @@ -274,7 +274,7 @@ class FastestServerLBSpec extends StandardFunSpec { cut = new FastestServerLB(solrServers, _ => ("collection1", testQuery), minDelay, maxDelay, clock = clock) // we use a spy to have a real async solr client for that we can verify interactions var spyClient: AsyncSolrClient = null - val realClient: AsyncSolrClient = new AsyncSolrClient.Builder(cut) { + val realClient: AsyncSolrClient = new AsyncSolrClient.Builder(cut, ascFactory) { override protected def setOnAsyncSolrClientAwares(solr: AsyncSolrClient): Unit = { spyClient = spy(solr) mockQueries(spyClient) diff --git a/src/test/scala/io/ino/solrs/SolrRunner.scala b/src/test/scala/io/ino/solrs/SolrRunner.scala index d77d980..26dbb3a 100644 --- a/src/test/scala/io/ino/solrs/SolrRunner.scala +++ b/src/test/scala/io/ino/solrs/SolrRunner.scala @@ -1,9 +1,15 @@ package io.ino.solrs import java.io.File -import java.net.{URL, URLDecoder} -import java.nio.file.{Files, Path, Paths} -import java.util.jar.{Attributes, JarFile} +import java.net.URL +import java.net.URLDecoder +import java.nio.file.Files +import java.nio.file.Path +import java.nio.file.Paths +import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeoutException +import java.util.jar.Attributes +import java.util.jar.JarFile import java.util.logging.Level import org.apache.catalina.Context @@ -11,8 +17,14 @@ import org.apache.catalina.LifecycleState import org.apache.catalina.startup.Tomcat import org.apache.commons.io.FileUtils import org.apache.curator.test.TestingServer +import org.apache.solr.client.solrj.SolrQuery +import org.apache.solr.client.solrj.impl.HttpSolrClient import org.apache.solr.common.cloud.ZkStateReader -import org.slf4j.{Logger, LoggerFactory} +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +import scala.concurrent.duration._ +import scala.util.control.NonFatal case class ZooKeeperOptions(zkHost: String, bootstrapConfig: Option[String] = None) @@ -69,6 +81,30 @@ class SolrRunner(val port: Int, this } + def awaitReady(value: Long, unit: TimeUnit): SolrRunner = { + awaitReady(Duration(value, unit)) + } + + def awaitReady(timeout: Duration): SolrRunner = { + val solrClient = new HttpSolrClient.Builder(s"http://localhost:$port/solr/collection1").build() + + def await(left: Duration): SolrRunner = { + if(left.toMillis <= 0) { + throw new TimeoutException(s"Solr not available after $timeout") + } + try { + solrClient.query(new SolrQuery("*:*")) + this + } catch { + case NonFatal(e) => + Thread.sleep(50) + await(left - 50.millis) + } + } + + await(timeout) + } + protected def processZkOptionsBeforeStart(): Unit = { // ZooKeeper / SolrCloud support maybeZkOptions.foreach { zkOptions => @@ -227,18 +263,18 @@ object SolrRunner { * Also registers a shutdown hook to shutdown tomcat when the jvm exits. */ def startOnce(port: Int, zkOptions: Option[ZooKeeperOptions] = None): SolrRunner = { - solrRunners.get(port).getOrElse { + solrRunners.getOrElse(port, { val solrRunner = start(port, zkOptions) solrRunners += port -> solrRunner - Runtime.getRuntime().addShutdownHook(new Thread() { + Runtime.getRuntime.addShutdownHook(new Thread() { override def run() { solrRunner.stop() } }) solrRunner - } + }) } private def initSolrHome(port: Int): File = { diff --git a/src/test/scala/io/ino/solrs/StandardFunSpec.scala b/src/test/scala/io/ino/solrs/StandardFunSpec.scala index 4f6cb97..7824d70 100644 --- a/src/test/scala/io/ino/solrs/StandardFunSpec.scala +++ b/src/test/scala/io/ino/solrs/StandardFunSpec.scala @@ -3,6 +3,8 @@ package io.ino.solrs import io.ino.solrs.future.ScalaFutureFactory import org.scalatest.{Matchers, BeforeAndAfterEach, BeforeAndAfterAll, FunSpec} +import scala.concurrent.Future + /** * Default FunSpec mixing in various standard traits, and also ScalaFutureFactory. */ @@ -10,4 +12,6 @@ abstract class StandardFunSpec extends FunSpec with BeforeAndAfterAll with Befor protected implicit val futureFactory = ScalaFutureFactory + protected val ascFactory = AsyncSolrClient.ascFactory[Future] _ + } diff --git a/src/test/scala/io/ino/solrs/usage/UsageScala.scala b/src/test/scala/io/ino/solrs/usage/UsageScala.scala new file mode 100644 index 0000000..c934277 --- /dev/null +++ b/src/test/scala/io/ino/solrs/usage/UsageScala.scala @@ -0,0 +1,81 @@ +package io.ino.solrs.usage + +import org.slf4j.LoggerFactory + +class UsageScala1 { + + { + import io.ino.solrs.AsyncSolrClient + import io.ino.solrs.future.ScalaFutureFactory.Implicit + import org.apache.solr.client.solrj.SolrQuery + import org.apache.solr.client.solrj.response.QueryResponse + import scala.concurrent.ExecutionContext.Implicits.global + import scala.concurrent.Future + + val solr = AsyncSolrClient("http://localhost:8983/solr") + val response: Future[QueryResponse] = solr.query(new SolrQuery("scala")) + response.onSuccess { + case qr => println(s"found ${qr.getResults.getNumFound} docs") + } + + // Just included to present the 'shutdown'... + solr.shutdown() + } + + { + import io.ino.solrs._ + import io.ino.solrs.future.Future + import io.ino.solrs.future.ScalaFutureFactory.Implicit + import org.apache.solr.client.solrj.SolrQuery + import org.apache.solr.client.solrj.response.QueryResponse + + val logger = LoggerFactory.getLogger(getClass) + + val loggingInterceptor = new RequestInterceptor { + override def interceptQuery(f: (SolrServer, SolrQuery) => Future[QueryResponse]) + (solrServer: SolrServer, q: SolrQuery): Future[QueryResponse] = { + val start = System.currentTimeMillis() + f(solrServer, q).map { qr => + val requestTime = System.currentTimeMillis() - start + logger.info(s"Query $q to $solrServer took $requestTime ms (query time in solr: ${qr.getQTime} ms).") + qr + } + } + } + + val solr = AsyncSolrClient.Builder("http://localhost:8983/solr/collection1") + .withRequestInterceptor(loggingInterceptor).build + } + +} + +class UsageScalaTwitter1 { + + import io.ino.solrs.AsyncSolrClient + import io.ino.solrs.future.TwitterFutureFactory.Implicit + import org.apache.solr.client.solrj.SolrQuery + import org.apache.solr.client.solrj.response.QueryResponse + import com.twitter.util.Future + + val solr = AsyncSolrClient("http://localhost:8983/solr") + val response: Future[QueryResponse] = solr.query(new SolrQuery("scala")) + response.onSuccess { + qr => println(s"found ${qr.getResults.getNumFound} docs") + } + + // Just included to present the 'shutdown'... + solr.shutdown() + +} + +class UsageScala2 { + import io.ino.solrs.AsyncSolrClient + import io.ino.solrs.future.ScalaFutureFactory.Implicit + import org.apache.solr.client.solrj.impl.XMLResponseParser + import org.asynchttpclient.DefaultAsyncHttpClient + + val solr = AsyncSolrClient.Builder("http://localhost:8983/solr") + .withHttpClient(new DefaultAsyncHttpClient()) + .withResponseParser(new XMLResponseParser()) + .build +} \ No newline at end of file