Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 2.0.0 support #115

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
5f090fc
Spark 2.0.0 support
a-roberts Jun 20, 2016
a6c3403
Fix dependency for mllib
a-roberts Jun 20, 2016
27e640f
Add 1.6 target folder back in
a-roberts Jun 20, 2016
5473506
Add ML changes for Spark 2, note requires Spark in lib folder currently!
a-roberts Jun 30, 2016
b1d1947
Add config file template back in
a-roberts Jun 30, 2016
cd7eb12
Add ability to override scala version and cleanup in mllib project file
a-roberts Jun 30, 2016
95d31d3
Add QA 1.6 fixes code back in
a-roberts Jun 30, 2016
8cb9e62
Remove .rej file too
a-roberts Jun 30, 2016
9e26cfc
Comment dep on 2.0.0 preview
a-roberts Jun 30, 2016
9cc8cae
Use original config.py but with SPARK_HOME recognised, default local …
a-roberts Jul 1, 2016
44f7a5f
Back to 3.2.9 for json native
a-roberts Jul 1, 2016
cf6d671
Tidying up
a-roberts Jul 1, 2016
9e171d4
Add default to 2.0 for spark mllib version
a-roberts Jul 1, 2016
b35c294
Use GA Spark 2 not preview
a-roberts Aug 15, 2016
533f6a6
Ensure we can override Spark version for all projects
a-roberts Aug 15, 2016
1eb427c
Add a missing comma for ml project file
a-roberts Aug 15, 2016
70963cf
Just use mllib provided artifact and remove core part
a-roberts Aug 26, 2016
1d1441b
Use 2.0.0 to resolve artifacts for mllib
a-roberts Aug 26, 2016
c873532
Remove feature-noise from glm regression so we can build and run
a-roberts Aug 26, 2016
e15a40e
Remove sparkVersion statement in SparkTestsBuild.scala
a-roberts Aug 26, 2016
0ad07c7
Remove spark version from streaming project file also
a-roberts Aug 26, 2016
e6cbaf9
Lower defaults, feature-noise if Spark 1.x only
a-roberts Sep 27, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 24 additions & 10 deletions config/config.py.template
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@ from sparkperf.config_utils import FlagSet, JavaOptionSet, OptionSet, ConstantOp
# Standard Configuration Options #
# ================================ #

DEFAULT_HOME=os.environ['HOME']

# Point to an installation of Spark on the cluster.
SPARK_HOME_DIR = "/root/spark"
SPARK_HOME_DIR = os.getenv('SPARK_HOME', DEFAULT_HOME)

# Use a custom configuration directory
SPARK_CONF_DIR = SPARK_HOME_DIR + "/conf"

# Master used when submitting Spark jobs.
# For local clusters: "spark://%s:7077" % socket.gethostname()
# for ec2 clusters: open("/root/spark-ec2/cluster-url", 'r').readline().strip()
# For local clusters (default): "spark://%s:7077" % socket.gethostname()
# For Yarn clusters: "yarn"
# Otherwise, the default uses the specified EC2 cluster
SPARK_CLUSTER_URL = open("/root/spark-ec2/cluster-url", 'r').readline().strip()

SPARK_CLUSTER_URL="spark://%s:7077" % socket.gethostname()

IS_YARN_MODE = "yarn" in SPARK_CLUSTER_URL
IS_MESOS_MODE = "mesos" in SPARK_CLUSTER_URL

Expand Down Expand Up @@ -124,7 +128,7 @@ PYTHON_MLLIB_OUTPUT_FILENAME = "results/python_mllib_perf_output_%s_%s" % (
# number of records in a generated dataset) if you are running the tests with more
# or fewer nodes. When developing new test suites, you might want to set this to a small
# value suitable for a single machine, such as 0.001.
SCALE_FACTOR = 1.0
SCALE_FACTOR = 0.01

assert SCALE_FACTOR > 0, "SCALE_FACTOR must be > 0."

Expand All @@ -151,7 +155,8 @@ COMMON_JAVA_OPTS = [
JavaOptionSet("spark.locality.wait", [str(60 * 1000 * 1000)])
]
# Set driver memory here
SPARK_DRIVER_MEMORY = "20g"
SPARK_DRIVER_MEMORY = "1g"

# The following options value sets are shared among all tests.
COMMON_OPTS = [
# How many times to run each experiment - used to warm up system caches.
Expand Down Expand Up @@ -370,7 +375,8 @@ MLLIB_PERF_TEST_RUNNER = "mllib.perf.TestRunner"
# * Build Spark locally by running `build/sbt assembly; build/sbt publishLocal` in the Spark root directory
# * Set `USE_CLUSTER_SPARK = True` and `MLLIB_SPARK_VERSION = {desired Spark version, e.g. 1.5}`
# * Don't use PREP_MLLIB_TESTS = True; instead manually run `cd mllib-tests; sbt/sbt -Dspark.version=1.5.0-SNAPSHOT clean assembly` to build perf tests
MLLIB_SPARK_VERSION = 1.5

MLLIB_SPARK_VERSION = 2.0

MLLIB_JAVA_OPTS = COMMON_JAVA_OPTS
if MLLIB_SPARK_VERSION >= 1.1:
Expand Down Expand Up @@ -398,10 +404,8 @@ MLLIB_REGRESSION_CLASSIFICATION_TEST_OPTS = MLLIB_COMMON_OPTS + [
]

# Generalized Linear Model (GLM) Tests #

MLLIB_GLM_TEST_OPTS = MLLIB_REGRESSION_CLASSIFICATION_TEST_OPTS + [
# The scale factor for the noise in feature values.
# Currently ignored for regression.
OptionSet("feature-noise", [1.0]),
# The number of features per example
OptionSet("num-features", [10000], can_scale=False),
# The number of iterations for SGD
Expand All @@ -413,12 +417,22 @@ MLLIB_GLM_TEST_OPTS = MLLIB_REGRESSION_CLASSIFICATION_TEST_OPTS + [
# Regularization parameter
OptionSet("reg-param", [0.1])
]

if MLLIB_SPARK_VERSION >= 1.5:
MLLIB_GLM_TEST_OPTS += [
# Ignored, but required for config
OptionSet("elastic-net-param", [0.0])
]

if MLLIB_SPARK_VERSION < 2.0:
MLLIB_GLM_TEST_OPTS += [
# The scale factor for the noise in feature values.
# Currently ignored for regression.
# Only available in Spark 1.x
OptionSet("feature-noise", [1.0])
]


# GLM Regression Tests #
MLLIB_GLM_REGRESSION_TEST_OPTS = MLLIB_GLM_TEST_OPTS + [
# Optimization algorithm: sgd
Expand Down
2 changes: 1 addition & 1 deletion lib/sparkperf/testsuites.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ class MLlibTests(JVMPerfTestSuite, MLlibTestHelper):

@classmethod
def build(cls, spark_version):
run_cmd("cd %s/mllib-tests; %s -Dspark.version=%s.0 clean assembly" % (PROJ_DIR, SBT_CMD, spark_version))
run_cmd("cd %s/mllib-tests; %s -Dspark.version=%s clean assembly" % (PROJ_DIR, SBT_CMD, spark_version))

@classmethod
def process_output(cls, config, short_name, opt_list, stdout_filename, stderr_filename):
Expand Down
17 changes: 10 additions & 7 deletions mllib-tests/project/MLlibTestsBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@ object MLlibTestsBuild extends Build {
lazy val commonSettings = Seq(
organization := "org.spark-project",
version := "0.1",
scalaVersion := "2.10.4",
sparkVersion := sys.props.getOrElse("spark.version", default="1.5.2"),
scalaVersion := sys.props.getOrElse("scala.version", default="2.11.8"),
sparkVersion := sys.props.getOrElse("spark.version", default="2.0.0"),
libraryDependencies ++= Seq(
"net.sf.jopt-simple" % "jopt-simple" % "4.6",
"org.scalatest" %% "scalatest" % "2.2.1" % "test",
"org.slf4j" % "slf4j-log4j12" % "1.7.2",
"org.json4s" %% "json4s-native" % "3.2.9",
"org.apache.spark" %% "spark-mllib" % sparkVersion.value % "provided"
// Allow the user to set the Spark version but default to look
// for the Spark 2.0.0 artifact. Uncomment below to use spark.version
// "org.apache.spark" %% "spark-mllib" % sparkVersion.value % "provided"
"org.apache.spark" %% "spark-mllib" % "2.0.0" % "provided"
)
)

Expand All @@ -34,10 +37,10 @@ object MLlibTestsBuild extends Build {
scalaSource in Compile := {
val targetFolder = sparkVersion.value match {
case v if v.startsWith("1.4.") => "v1p4"
case v if v.startsWith("1.5.") => "v1p5"
case v if v.startsWith("1.6.") =>
"v1p5" // acceptable for now, but change later when new algs are added
case _ => throw new IllegalArgumentException(s"Do not support Spark ${sparkVersion.value}.")
case v if v.startsWith("1.5.") => "v1p5" // acceptable for now, but change later when new algs are added
case v if v.startsWith("1.6.") => "v1p5"
case v if v.startsWith("2.0") => "v2p0"
case _ => throw new IllegalArgumentException(s"This Spark version isn't supported: ${sparkVersion.value}.")
}
baseDirectory.value / targetFolder / "src" / "main" / "scala"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package mllib.perf

import org.json4s.JsonAST.JValue
import org.json4s.JsonDSL._

import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg.distributed.RowMatrix

import mllib.perf.util.DataGenerator

/** Parent class for linear algebra tests which run on a large dataset.
* Generated this way so that SVD / PCA can be added easily
*/
abstract class LinearAlgebraTests(sc: SparkContext) extends PerfTest {

def runTest(rdd: RowMatrix, rank: Int)

val NUM_ROWS = ("num-rows", "number of rows of the matrix")
val NUM_COLS = ("num-cols", "number of columns of the matrix")
val RANK = ("rank", "number of leading singular values")

longOptions = Seq(NUM_ROWS)
intOptions = intOptions ++ Seq(RANK, NUM_COLS)

var rdd: RowMatrix = _

val options = intOptions ++ stringOptions ++ booleanOptions ++ doubleOptions ++ longOptions
addOptionsToParser()
override def createInputData(seed: Long) = {
val m: Long = longOptionValue(NUM_ROWS)
val n: Int = intOptionValue(NUM_COLS)
val numPartitions: Int = intOptionValue(NUM_PARTITIONS)

rdd = DataGenerator.generateDistributedSquareMatrix(sc, m, n, numPartitions, seed)
}

override def run(): JValue = {
val rank = intOptionValue(RANK)

val start = System.currentTimeMillis()
runTest(rdd, rank)
val end = System.currentTimeMillis()
val time = (end - start).toDouble / 1000.0

Map("time" -> time)
}
}


class SVDTest(sc: SparkContext) extends LinearAlgebraTests(sc) {
override def runTest(data: RowMatrix, rank: Int) {
data.computeSVD(rank, computeU = true)
}
}

class PCATest(sc: SparkContext) extends LinearAlgebraTests(sc) {
override def runTest(data: RowMatrix, rank: Int) {
val principal = data.computePrincipalComponents(rank)
sc.broadcast(principal)
data.multiply(principal)
}
}

class ColumnSummaryStatisticsTest(sc: SparkContext) extends LinearAlgebraTests(sc) {
override def runTest(data: RowMatrix, rank: Int) {
data.computeColumnSummaryStatistics()
}
}
Loading