Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update apache-spark benchmarks to use Spark 3.2.0 and Scala 2.13.7 #327

Merged
merged 2 commits into from
Nov 22, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions benchmarks/apache-spark/build.sbt
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
lazy val renaissanceCore = RootProject(uri("../../renaissance-core"))

val sparkVersion = "3.1.2"
val sparkVersion = "3.2.0"

lazy val apacheSpark = (project in file("."))
.settings(
name := "apache-spark",
version := (renaissanceCore / version).value,
organization := (renaissanceCore / organization).value,
scalaVersion := "2.12.15",
scalaVersion := "2.13.7",
scalacOptions ++= Seq("-deprecation"),
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-mllib" % sparkVersion,
// Not directly required, forces the use of newer version
"commons-io" % "commons-io" % "2.7"
"org.apache.spark" %% "spark-mllib" % sparkVersion
)
)
.dependsOn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import org.renaissance.License

import java.nio.file.Files
import java.nio.file.Path
import scala.jdk.CollectionConverters.asJavaCollectionConverter
import scala.util.Random

@Name("als")
Expand Down Expand Up @@ -66,14 +65,16 @@ final class Als extends Benchmark with SparkUtil {

for (userId <- 0 until userCount; itemId <- 0 until productCount) yield {
val rating = 1 + rand.nextInt(3) + rand.nextInt(3)
Rating[Int](userId, itemId, rating)
Rating[Int](userId, itemId, rating.toFloat)
}
}

private def storeRatings(ratings: Seq[Rating], file: Path): Path = {
import scala.jdk.CollectionConverters._

val header = Seq(Seq("user", "item", "rating").mkString(","))
val lines = header ++ ratings.map(r => Rating.unapply(r).get.productIterator.mkString(","))
Files.write(file, lines.asJavaCollection)
Files.write(file, lines.asJava)
}

private def loadRatings(file: Path) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import org.renaissance.License

import java.nio.file.Files
import java.nio.file.Path
import scala.jdk.CollectionConverters.asJavaCollectionConverter
import scala.util.Random

@Name("chi-square")
Expand Down Expand Up @@ -51,6 +50,8 @@ final class ChiSquare extends Benchmark with SparkUtil {
private var outputTestResults: Array[ChiSqTestResult] = _

private def prepareInput(pointCount: Int, componentCount: Int, outputFile: Path): Path = {
import scala.jdk.CollectionConverters._

// TODO: Use a Renaissance-provided random generator.
val rand = new Random(0L)

Expand All @@ -69,7 +70,7 @@ final class ChiSquare extends Benchmark with SparkUtil {
}

// Write output using UTF-8 encoding.
Files.write(outputFile, lines.asJavaCollection)
Files.write(outputFile, lines.asJava)
}

private def loadData(inputFile: Path) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import org.renaissance.License

import java.nio.file.Files
import java.nio.file.Path
import scala.jdk.CollectionConverters.asJavaCollectionConverter
import scala.util.Random

@Name("gauss-mix")
Expand Down Expand Up @@ -62,6 +61,8 @@ final class GaussMix extends Benchmark with SparkUtil {
private var outputGaussianMixture: GaussianMixtureModel = _

private def prepareInput(pointCount: Int, componentCount: Int, outputFile: Path): Path = {
import scala.jdk.CollectionConverters._

// TODO: Use a Renaissance-provided random generator.
val rand = new Random(0L)

Expand All @@ -74,7 +75,7 @@ final class GaussMix extends Benchmark with SparkUtil {
}

// Write output using UTF-8 encoding.
Files.write(outputFile, lines.asJavaCollection)
Files.write(outputFile, lines.asJava)
}

private def loadData(inputFile: Path) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import org.renaissance.apache.spark.ResourceUtil.writeResourceToFile
import java.net.URL
import java.nio.file.Path
import scala.collection.Map
import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter

@Name("movie-lens")
@Group("apache-spark")
Expand Down Expand Up @@ -110,7 +109,7 @@ final class MovieLens extends Benchmark with SparkUtil {
val lines = sparkContext.parallelize(linesFromUrl(url))
val ratings = parseRatingsCsvLines(lines).values.filter { _.rating > 0.0 }

if (ratings.isEmpty) {
if (ratings.isEmpty()) {
// TODO Fail the benchmark here.
sys.error("No ratings provided.")
} else {
Expand Down Expand Up @@ -206,9 +205,9 @@ final class MovieLens extends Benchmark with SparkUtil {

// Create a naive baseline and compare it with the best model.

val meanRating = training.union(validation).map(_.rating).mean
val meanRating = training.union(validation).map(_.rating).mean()
val baselineRmse = math.sqrt(
test.map(x => (meanRating - x.rating) * (meanRating - x.rating)).mean
test.map(x => (meanRating - x.rating) * (meanRating - x.rating)).mean()
)

val improvement = (baselineRmse - testRmse) / baselineRmse * 100
Expand Down Expand Up @@ -248,6 +247,8 @@ final class MovieLens extends Benchmark with SparkUtil {
}

override def setUpBeforeAll(bc: BenchmarkContext): Unit = {
import scala.jdk.CollectionConverters._

//
// Without a checkpoint directory set, JMH runs of this
// benchmark in Travis CI tend to crash with stack overflow.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import org.renaissance.apache.spark.ResourceUtil.sourceFromZipResource
import java.nio.file.Files
import java.nio.file.Path
import java.util.regex.Pattern
import scala.collection.JavaConverters._
import scala.io.BufferedSource

@Name("page-rank")
Expand Down Expand Up @@ -76,14 +75,16 @@ final class PageRank extends Benchmark with SparkUtil {
lineCount: Int,
outputFile: Path
): Path = {
import scala.jdk.CollectionConverters._

try {
var lines = source.getLines().dropWhile(_.startsWith("#"))
if (lineCount >= 0) {
lines = lines.take(lineCount)
}

// Write output using UTF-8 encoding.
Files.write(outputFile, lines.toSeq.asJavaCollection)
Files.write(outputFile, lines.toSeq.asJava)
} finally {
source.close()
}
Expand Down Expand Up @@ -142,10 +143,12 @@ final class PageRank extends Benchmark with SparkUtil {
// by rank, and the ordering is then relaxed so that neighboring ranks that
// differ only slightly are considered equal.
//
override def validate() = {
override def validate(): Unit = {
import scala.jdk.CollectionConverters._

Assert.assertEquals(expectedRankCountParam, rankCount, "ranks count")

val preSortedEntries = ranks.sortBy { case (_, rank) => rank }.collect
val preSortedEntries = ranks.sortBy { case (_, rank) => rank }.collect()
val sortedEntries = relaxedRanks(preSortedEntries, rankDifferenceLimit).sortBy {
case (url, rank) => (rank, url)
}
Expand All @@ -154,7 +157,7 @@ final class PageRank extends Benchmark with SparkUtil {
Validators.hashing(expectedRankHashParam, rankLines.asJava).validate()
}

private def relaxedRanks(entries: Seq[(Int, Double)], diffLimit: Double) = {
private def relaxedRanks(entries: Array[(Int, Double)], diffLimit: Double) = {
var prevRank = entries.head._2
var groupRank = prevRank

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import java.nio.file.{StandardOpenOption => OpenOption}
import java.util.zip.ZipInputStream
import scala.io.BufferedSource
import scala.io.Source
import scala.jdk.CollectionConverters.asJavaCollectionConverter

private object ResourceUtil {

Expand Down Expand Up @@ -72,7 +71,9 @@ private object ResourceUtil {
* @return [[Path]] to the output file
*/
def duplicateLinesFromUrl(url: URL, copyCount: Int, outputFile: Path): Path = {
val lines = linesFromUrl(url).asJavaCollection
import scala.jdk.CollectionConverters._

val lines = linesFromUrl(url).asJava

for (_ <- 0 until copyCount) {
Files.write(outputFile, lines, OpenOption.CREATE, OpenOption.APPEND)
Expand All @@ -90,7 +91,7 @@ private object ResourceUtil {
def linesFromUrl(url: URL): Seq[String] = {
val source = Source.fromURL(url)
try {
source.getLines().to[Seq]
source.getLines().toSeq
} finally {
source.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,10 @@ trait SparkUtil {
// "(via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN)."
"org.apache.spark.SparkConf" -> Level.ERROR,
// Masks the following warnings:
// "Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS"
// "Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS"
// "Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK"
// "Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK"
"com.github.fommil.netlib" -> Level.ERROR,
// "Failed to load implementation from: dev.ludovic.netlib.blas.JNIBLAS"
// "Failed to load implementation from: dev.ludovic.netlib.blas.ForeignLinkerBLAS"
// "Failed to load implementation from: dev.ludovic.netlib.lapack.JNILAPACK"
"dev.ludovic.netlib" -> Level.ERROR,
// Masks the following warning:
// "URL.setURLStreamHandlerFactory failed to set FsUrlStreamHandlerFactory"
"org.apache.spark.sql.internal.SharedState" -> Level.ERROR
Expand Down Expand Up @@ -126,7 +125,7 @@ trait SparkUtil {
}

private def dropFirstLine(lines: RDD[String]): RDD[String] = {
val first = lines.first
val first = lines.first()
lines.filter { line => line != first }
}

Expand All @@ -136,7 +135,7 @@ trait SparkUtil {

// Used to find sources of log messages.
private def printCurrentLoggers() = {
import scala.jdk.CollectionConverters.enumerationAsScalaIteratorConverter
import scala.jdk.CollectionConverters._

println(
LogManager.getCurrentLoggers.asScala
Expand Down