Skip to content

Commit

Permalink
Bump spark version to 2.4.0 (#670)
Browse files Browse the repository at this point in the history
  • Loading branch information
dieu authored and johnynek committed Nov 8, 2018
1 parent 5fdb079 commit fae1c33
Show file tree
Hide file tree
Showing 6 changed files with 267 additions and 210 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ matrix:
script: sbt ++$TRAVIS_SCALA_VERSION clean test mimaReportBinaryIssues
#script: ./sbt ++$TRAVIS_SCALA_VERSION clean test

- scala: 2.11.11
- scala: 2.11.12
script: sbt ++$TRAVIS_SCALA_VERSION clean coverage test coverageReport mimaReportBinaryIssues
#script: ./sbt ++$TRAVIS_SCALA_VERSION clean coverage test coverageReport
after_success:
- bash <(curl -s https://codecov.io/bash)

- scala: 2.12.6
- scala: 2.12.7
jdk: oraclejdk8
script: sbt "++$TRAVIS_SCALA_VERSION clean" "++$TRAVIS_SCALA_VERSION test" "scalafmt::test" "test:scalafmt::test" "++$TRAVIS_SCALA_VERSION mimaReportBinaryIssues" "++$TRAVIS_SCALA_VERSION docs/makeMicrosite"
#script: ./sbt "+++$TRAVIS_SCALA_VERSION clean" "+++$TRAVIS_SCALA_VERSION test" "++$TRAVIS_SCALA_VERSION docs/makeMicrosite"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package com.twitter.algebird

import com.twitter.algebird._
import org.apache.spark.rdd.{ RDD, PairRDDFunctions }
import org.apache.spark.rdd.{PairRDDFunctions, RDD}
import org.apache.spark.Partitioner
import scala.reflect.ClassTag

/**
* import com.twitter.algebird.spark.ToAlgebird
* to get the enrichment to do:
Expand All @@ -12,6 +13,7 @@ import scala.reflect.ClassTag
* This adds methods to Spark RDDs to use Algebird
*/
class AlgebirdRDD[T](val rdd: RDD[T]) extends AnyVal {

/**
* Apply an Aggregator to return a single value for the whole RDD.
* If the RDD is empty, None is returned
Expand All @@ -20,47 +22,53 @@ class AlgebirdRDD[T](val rdd: RDD[T]) extends AnyVal {
*/
def aggregateOption[B: ClassTag, C](agg: Aggregator[T, B, C]): Option[C] = {
val pr = rdd.mapPartitions({ data =>
if (data.isEmpty) Iterator.empty else {
if (data.isEmpty) Iterator.empty
else {
val b = agg.prepare(data.next)
Iterator(agg.appendAll(b, data))
}
}, preservesPartitioning = true)
pr.repartition(1)
.mapPartitions(pr => Iterator(agg.semigroup.sumOption(pr)))
.collect.head.map(agg.present)
.collect
.head
.map(agg.present)
}

/**
* This will throw if you use a non-MonoidAggregator with an empty RDD
* requires a commutative Semigroup. To generalize to non-commutative, we need a sorted partition for
* T.
*/
def aggregate[B: ClassTag, C](agg: Aggregator[T, B, C]): C = (aggregateOption[B, C](agg), agg.semigroup) match {
case (Some(c), _) => c
case (None, m: Monoid[B]) => agg.present(m.zero)
case (None, _) => None.get // no such element
}
def aggregate[B: ClassTag, C](agg: Aggregator[T, B, C]): C =
(aggregateOption[B, C](agg), agg.semigroup) match {
case (Some(c), _) => c
case (None, m: Monoid[B]) => agg.present(m.zero)
case (None, _) => None.get // no such element
}

/**
* Apply an Aggregator to the values for each key.
* requires a commutative Semigroup. To generalize to non-commutative, we need a sorted partition for
* T.
*/
def aggregateByKey[K: ClassTag, V1, U: ClassTag, V2](
agg: Aggregator[V1, U, V2])(implicit ev: T <:< (K, V1), ordK: Priority[Ordering[K], DummyImplicit]): RDD[(K, V2)] =
def aggregateByKey[K: ClassTag, V1, U: ClassTag, V2](agg: Aggregator[V1, U, V2])(
implicit ev: T <:< (K, V1),
ordK: Priority[Ordering[K], DummyImplicit]): RDD[(K, V2)] =
aggregateByKey(Partitioner.defaultPartitioner(rdd), agg)

private def toPair[K: ClassTag, V: ClassTag](kv: RDD[(K, V)])(implicit ordK: Priority[Ordering[K], DummyImplicit]) =
private def toPair[K: ClassTag, V: ClassTag](kv: RDD[(K, V)])(
implicit ordK: Priority[Ordering[K], DummyImplicit]) =
new PairRDDFunctions(kv)(implicitly, implicitly, ordK.getPreferred.orNull)

/**
* Apply an Aggregator to the values for each key with a custom Partitioner.
* requires a commutative Semigroup. To generalize to non-commutative, we need a sorted partition for
* T.
*/
def aggregateByKey[K: ClassTag, V1, U: ClassTag, V2](
part: Partitioner,
agg: Aggregator[V1, U, V2])(implicit ev: T <:< (K, V1), ordK: Priority[Ordering[K], DummyImplicit]): RDD[(K, V2)] = {
def aggregateByKey[K: ClassTag, V1, U: ClassTag, V2](part: Partitioner, agg: Aggregator[V1, U, V2])(
implicit ev: T <:< (K, V1),
ordK: Priority[Ordering[K], DummyImplicit]): RDD[(K, V2)] = {
/*
* This mapValues implementation allows us to avoid needing the V1 ClassTag, which would
* be required to use the implementation in PairRDDFunctions
Expand All @@ -69,8 +77,9 @@ class AlgebirdRDD[T](val rdd: RDD[T]) extends AnyVal {
it.map { case (k, v) => (k, agg.prepare(v)) }
}, preservesPartitioning = true)

toPair(toPair(prepared)
.reduceByKey(part, agg.reduce(_, _)))
toPair(
toPair(prepared)
.reduceByKey(part, agg.reduce(_, _)))
.mapValues(agg.present)
}

Expand All @@ -81,7 +90,8 @@ class AlgebirdRDD[T](val rdd: RDD[T]) extends AnyVal {
* requires a commutative Semigroup. To generalize to non-commutative, we need a sorted partition for
* T.
*/
def sumByKey[K: ClassTag, V: ClassTag: Semigroup](implicit ev: T <:< (K, V), ord: Priority[Ordering[K], DummyImplicit]): RDD[(K, V)] =
def sumByKey[K: ClassTag, V: ClassTag: Semigroup](implicit ev: T <:< (K, V),
ord: Priority[Ordering[K], DummyImplicit]): RDD[(K, V)] =
sumByKey(Partitioner.defaultPartitioner(rdd))

/**
Expand All @@ -91,7 +101,7 @@ class AlgebirdRDD[T](val rdd: RDD[T]) extends AnyVal {
* Unfortunately we need to use a different name than sumByKey in scala 2.11
*/
def sumByKey[K: ClassTag, V: ClassTag: Semigroup](
part: Partitioner)(implicit ev: T <:< (K, V), ord: Priority[Ordering[K], DummyImplicit]): RDD[(K, V)] =
part: Partitioner)(implicit ev: T <:< (K, V), ord: Priority[Ordering[K], DummyImplicit]): RDD[(K, V)] =
toPair(keyed).reduceByKey(part, implicitly[Semigroup[V]].plus _)

/**
Expand All @@ -108,15 +118,17 @@ class AlgebirdRDD[T](val rdd: RDD[T]) extends AnyVal {
* T.
*/
def sumOption(implicit sg: Semigroup[T], ct: ClassTag[T]): Option[T] = {
val partialReduce: RDD[T] = rdd.mapPartitions(
{ itT => sg.sumOption(itT).toIterator },
preservesPartitioning = true)
val partialReduce: RDD[T] = rdd.mapPartitions({ itT =>
sg.sumOption(itT).toIterator
}, preservesPartitioning = true)

// my reading of the docs is that we do want a shuffle at this stage to
// to make sure the upstream work is done in parallel.
val results = partialReduce.repartition(1).mapPartitions({ it =>
Iterator(sg.sumOption(it))
}, preservesPartitioning = true)
val results = partialReduce
.repartition(1)
.mapPartitions({ it =>
Iterator(sg.sumOption(it))
}, preservesPartitioning = true)
.collect

assert(results.size == 1, s"Should only be 1 item: ${results.toList}")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.twitter.algebird

import com.twitter.algebird._
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
Expand All @@ -10,6 +9,7 @@ import scala.reflect.ClassTag
* import com.twitter.algebird.spark._
*/
package object spark {

/**
* spark exposes an Aggregator type, so this is here to avoid shadowing
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.twitter.algebird.spark

import com.twitter.algebird.{ MapAlgebra, Semigroup, Monoid, Min }
import com.twitter.algebird.{MapAlgebra, Min, Monoid, Semigroup}
import org.apache.spark._
import org.apache.spark.rdd._
import org.scalatest._
Expand All @@ -14,6 +14,7 @@ package test {
def sum[T: Monoid: ClassTag](r: RDD[T]) = r.algebird.sum
}
}

/**
* This test almost always times out on travis.
* Leaving at least a compilation test of using with spark
Expand Down Expand Up @@ -43,8 +44,8 @@ class AlgebirdRDDTest extends FunSuite with BeforeAndAfter {
implicit def optEq[V](implicit eq: Equiv[V]): Equiv[Option[V]] = Equiv.fromFunction[Option[V]] { (o1, o2) =>
(o1, o2) match {
case (Some(v1), Some(v2)) => eq.equiv(v1, v2)
case (None, None) => true
case _ => false
case (None, None) => true
case _ => false
}
}

Expand All @@ -55,7 +56,8 @@ class AlgebirdRDDTest extends FunSuite with BeforeAndAfter {
assertEq(sc.makeRDD(s).algebird.aggregate(agg), agg(s))
}

def aggregateByKey[K: ClassTag, T: ClassTag, U: ClassTag, V: Equiv](s: Seq[(K, T)], agg: AlgebirdAggregator[T, U, V]) {
def aggregateByKey[K: ClassTag, T: ClassTag, U: ClassTag, V: Equiv](s: Seq[(K, T)],
agg: AlgebirdAggregator[T, U, V]) {
val resMap = sc.makeRDD(s).algebird.aggregateByKey[K, T, U, V](agg).collect.toMap
implicit val sg = agg.semigroup
val algMap = MapAlgebra.sumByKey(s.map { case (k, t) => k -> agg.prepare(t) }).mapValues(agg.present)
Expand Down
38 changes: 20 additions & 18 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ val scalaTestVersion = "3.0.1"
val scalacheckVersion = "1.13.4"
val utilVersion = "6.20.0"
val utilVersion212 = "6.39.0"
val sparkVersion = "2.4.0"

def scalaBinaryVersion(scalaVersion: String) = scalaVersion match {
case version if version startsWith "2.10" => "2.10"
Expand All @@ -28,8 +29,6 @@ def isScala212x(scalaVersion: String) = scalaBinaryVersion(scalaVersion) == "2.1
* cause problems generating the documentation on scala 2.10. As the APIs for 2.10
* and 2.11 are the same this has no effect on the resultant documentation, though
* it does mean that the scaladocs cannot be generated when the build is in 2.10 mode.
*
* TODO: enable algebirdSpark when we turn on the algebirdSpark 2.12 build.
*/
def docsSourcesAndProjects(sv: String): (Boolean, Seq[ProjectReference]) =
CrossVersion.partialVersion(sv) match {
Expand All @@ -38,15 +37,15 @@ def docsSourcesAndProjects(sv: String): (Boolean, Seq[ProjectReference]) =
algebirdTest,
algebirdCore,
algebirdUtil,
algebirdBijection
// algebirdSpark
algebirdBijection,
algebirdSpark
))
}

val sharedSettings = Seq(
organization := "com.twitter",
scalaVersion := "2.11.11",
crossScalaVersions := Seq("2.10.6", "2.11.11", "2.12.6"),
scalaVersion := "2.11.12",
crossScalaVersions := Seq("2.10.6", "2.11.12", "2.12.7"),

resolvers ++= Seq(
Opts.resolver.sonatypeSnapshots,
Expand Down Expand Up @@ -155,7 +154,7 @@ lazy val noPublishSettings = Seq(
* This returns the previous jar we released that is compatible with
* the current.
*/
val noBinaryCompatCheck = Set[String]("benchmark", "caliper", "generic")
val noBinaryCompatCheck = Set[String]("benchmark", "caliper", "generic", "spark")

def previousVersion(subProj: String) =
Some(subProj)
Expand All @@ -169,14 +168,16 @@ lazy val algebird = Project(
.settings(noPublishSettings)
.settings(coverageExcludedPackages := "<empty>;.*\\.benchmark\\..*")
.aggregate(
algebirdTest,
algebirdCore,
algebirdUtil,
algebirdBijection,
algebirdBenchmark,
algebirdGeneric
//algebirdSpark
)
{
// workaround: https://github.com/sbt/sbt/issues/4181, simple workaround .settings(crossScalaVersions := List()) doesn't work
val projects = if (Option(sys.props("TRAVIS_SCALA_VERSION")).forall(_.startsWith("2.10.")))
Seq(algebirdCore, algebirdTest, algebirdUtil, algebirdBijection, algebirdBenchmark, algebirdGeneric)
else
Seq(algebirdCore, algebirdTest, algebirdUtil, algebirdBijection, algebirdBenchmark, algebirdGeneric, algebirdSpark)

projects.map(p => LocalProject(p.id))
}: _*
)

def module(name: String) = {
val id = "algebird-%s".format(name)
Expand Down Expand Up @@ -242,9 +243,10 @@ lazy val algebirdBijection = module("bijection").settings(
).dependsOn(algebirdCore, algebirdTest % "test->test")

lazy val algebirdSpark = module("spark").settings(
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.3.0" % "provided",
crossScalaVersions := crossScalaVersions.value.filterNot(_.startsWith("2.12"))
).dependsOn(algebirdCore, algebirdTest % "test->test")
libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
crossScalaVersions := crossScalaVersions.value.filterNot(isScala210x),
scalacOptions := scalacOptions.value.filterNot(_.contains("inline")) // Disable optimizations for now: https://github.com/scala/bug/issues/11247
).dependsOn(algebirdCore, algebirdTest % "test->test")

lazy val algebirdGeneric = module("generic").settings(
addCompilerPlugin("org.scalamacros" % "paradise" % paradiseVersion cross CrossVersion.full),
Expand Down
Loading

0 comments on commit fae1c33

Please sign in to comment.