diff --git a/config/config.py.template b/config/config.py.template index a348b6f..aeb39ee 100755 --- a/config/config.py.template +++ b/config/config.py.template @@ -17,17 +17,21 @@ from sparkperf.config_utils import FlagSet, JavaOptionSet, OptionSet, ConstantOp # Standard Configuration Options # # ================================ # +DEFAULT_HOME=os.environ['HOME'] + # Point to an installation of Spark on the cluster. -SPARK_HOME_DIR = "/root/spark" +SPARK_HOME_DIR = os.getenv('SPARK_HOME', DEFAULT_HOME) # Use a custom configuration directory SPARK_CONF_DIR = SPARK_HOME_DIR + "/conf" # Master used when submitting Spark jobs. -# For local clusters: "spark://%s:7077" % socket.gethostname() +# for ec2 clusters: open("/root/spark-ec2/cluster-url", 'r').readline().strip() +# For local clusters (default): "spark://%s:7077" % socket.gethostname() # For Yarn clusters: "yarn" -# Otherwise, the default uses the specified EC2 cluster -SPARK_CLUSTER_URL = open("/root/spark-ec2/cluster-url", 'r').readline().strip() + +SPARK_CLUSTER_URL="spark://%s:7077" % socket.gethostname() + IS_YARN_MODE = "yarn" in SPARK_CLUSTER_URL IS_MESOS_MODE = "mesos" in SPARK_CLUSTER_URL @@ -124,7 +128,7 @@ PYTHON_MLLIB_OUTPUT_FILENAME = "results/python_mllib_perf_output_%s_%s" % ( # number of records in a generated dataset) if you are running the tests with more # or fewer nodes. When developing new test suites, you might want to set this to a small # value suitable for a single machine, such as 0.001. -SCALE_FACTOR = 1.0 +SCALE_FACTOR = 0.01 assert SCALE_FACTOR > 0, "SCALE_FACTOR must be > 0." @@ -151,7 +155,8 @@ COMMON_JAVA_OPTS = [ JavaOptionSet("spark.locality.wait", [str(60 * 1000 * 1000)]) ] # Set driver memory here -SPARK_DRIVER_MEMORY = "20g" +SPARK_DRIVER_MEMORY = "1g" + # The following options value sets are shared among all tests. COMMON_OPTS = [ # How many times to run each experiment - used to warm up system caches. @@ -370,7 +375,8 @@ MLLIB_PERF_TEST_RUNNER = "mllib.perf.TestRunner" # * Build Spark locally by running `build/sbt assembly; build/sbt publishLocal` in the Spark root directory # * Set `USE_CLUSTER_SPARK = True` and `MLLIB_SPARK_VERSION = {desired Spark version, e.g. 1.5}` # * Don't use PREP_MLLIB_TESTS = True; instead manually run `cd mllib-tests; sbt/sbt -Dspark.version=1.5.0-SNAPSHOT clean assembly` to build perf tests -MLLIB_SPARK_VERSION = 1.5 + +MLLIB_SPARK_VERSION = 2.0 MLLIB_JAVA_OPTS = COMMON_JAVA_OPTS if MLLIB_SPARK_VERSION >= 1.1: @@ -398,10 +404,8 @@ MLLIB_REGRESSION_CLASSIFICATION_TEST_OPTS = MLLIB_COMMON_OPTS + [ ] # Generalized Linear Model (GLM) Tests # + MLLIB_GLM_TEST_OPTS = MLLIB_REGRESSION_CLASSIFICATION_TEST_OPTS + [ - # The scale factor for the noise in feature values. - # Currently ignored for regression. - OptionSet("feature-noise", [1.0]), # The number of features per example OptionSet("num-features", [10000], can_scale=False), # The number of iterations for SGD @@ -413,12 +417,22 @@ MLLIB_GLM_TEST_OPTS = MLLIB_REGRESSION_CLASSIFICATION_TEST_OPTS + [ # Regularization parameter OptionSet("reg-param", [0.1]) ] + if MLLIB_SPARK_VERSION >= 1.5: MLLIB_GLM_TEST_OPTS += [ # Ignored, but required for config OptionSet("elastic-net-param", [0.0]) ] +if MLLIB_SPARK_VERSION < 2.0: + MLLIB_GLM_TEST_OPTS += [ + # The scale factor for the noise in feature values. + # Currently ignored for regression. + # Only available in Spark 1.x + OptionSet("feature-noise", [1.0]) + ] + + # GLM Regression Tests # MLLIB_GLM_REGRESSION_TEST_OPTS = MLLIB_GLM_TEST_OPTS + [ # Optimization algorithm: sgd diff --git a/lib/sparkperf/testsuites.py b/lib/sparkperf/testsuites.py index 573ecf5..46ab349 100644 --- a/lib/sparkperf/testsuites.py +++ b/lib/sparkperf/testsuites.py @@ -252,7 +252,7 @@ class MLlibTests(JVMPerfTestSuite, MLlibTestHelper): @classmethod def build(cls, spark_version): - run_cmd("cd %s/mllib-tests; %s -Dspark.version=%s.0 clean assembly" % (PROJ_DIR, SBT_CMD, spark_version)) + run_cmd("cd %s/mllib-tests; %s -Dspark.version=%s clean assembly" % (PROJ_DIR, SBT_CMD, spark_version)) @classmethod def process_output(cls, config, short_name, opt_list, stdout_filename, stderr_filename): diff --git a/mllib-tests/project/MLlibTestsBuild.scala b/mllib-tests/project/MLlibTestsBuild.scala index 3347db4..3622734 100644 --- a/mllib-tests/project/MLlibTestsBuild.scala +++ b/mllib-tests/project/MLlibTestsBuild.scala @@ -16,14 +16,17 @@ object MLlibTestsBuild extends Build { lazy val commonSettings = Seq( organization := "org.spark-project", version := "0.1", - scalaVersion := "2.10.4", - sparkVersion := sys.props.getOrElse("spark.version", default="1.5.2"), + scalaVersion := sys.props.getOrElse("scala.version", default="2.11.8"), + sparkVersion := sys.props.getOrElse("spark.version", default="2.0.0"), libraryDependencies ++= Seq( "net.sf.jopt-simple" % "jopt-simple" % "4.6", "org.scalatest" %% "scalatest" % "2.2.1" % "test", "org.slf4j" % "slf4j-log4j12" % "1.7.2", "org.json4s" %% "json4s-native" % "3.2.9", - "org.apache.spark" %% "spark-mllib" % sparkVersion.value % "provided" + // Allow the user to set the Spark version but default to look + // for the Spark 2.0.0 artifact. Uncomment below to use spark.version + // "org.apache.spark" %% "spark-mllib" % sparkVersion.value % "provided" + "org.apache.spark" %% "spark-mllib" % "2.0.0" % "provided" ) ) @@ -34,10 +37,10 @@ object MLlibTestsBuild extends Build { scalaSource in Compile := { val targetFolder = sparkVersion.value match { case v if v.startsWith("1.4.") => "v1p4" - case v if v.startsWith("1.5.") => "v1p5" - case v if v.startsWith("1.6.") => - "v1p5" // acceptable for now, but change later when new algs are added - case _ => throw new IllegalArgumentException(s"Do not support Spark ${sparkVersion.value}.") + case v if v.startsWith("1.5.") => "v1p5" // acceptable for now, but change later when new algs are added + case v if v.startsWith("1.6.") => "v1p5" + case v if v.startsWith("2.0") => "v2p0" + case _ => throw new IllegalArgumentException(s"This Spark version isn't supported: ${sparkVersion.value}.") } baseDirectory.value / targetFolder / "src" / "main" / "scala" }, diff --git a/mllib-tests/v2p0/src/main/scala/mllib/perf/LinearAlgebraTests.scala b/mllib-tests/v2p0/src/main/scala/mllib/perf/LinearAlgebraTests.scala new file mode 100644 index 0000000..b992173 --- /dev/null +++ b/mllib-tests/v2p0/src/main/scala/mllib/perf/LinearAlgebraTests.scala @@ -0,0 +1,68 @@ +package mllib.perf + +import org.json4s.JsonAST.JValue +import org.json4s.JsonDSL._ + +import org.apache.spark.SparkContext +import org.apache.spark.mllib.linalg.distributed.RowMatrix + +import mllib.perf.util.DataGenerator + +/** Parent class for linear algebra tests which run on a large dataset. + * Generated this way so that SVD / PCA can be added easily + */ +abstract class LinearAlgebraTests(sc: SparkContext) extends PerfTest { + + def runTest(rdd: RowMatrix, rank: Int) + + val NUM_ROWS = ("num-rows", "number of rows of the matrix") + val NUM_COLS = ("num-cols", "number of columns of the matrix") + val RANK = ("rank", "number of leading singular values") + + longOptions = Seq(NUM_ROWS) + intOptions = intOptions ++ Seq(RANK, NUM_COLS) + + var rdd: RowMatrix = _ + + val options = intOptions ++ stringOptions ++ booleanOptions ++ doubleOptions ++ longOptions + addOptionsToParser() + override def createInputData(seed: Long) = { + val m: Long = longOptionValue(NUM_ROWS) + val n: Int = intOptionValue(NUM_COLS) + val numPartitions: Int = intOptionValue(NUM_PARTITIONS) + + rdd = DataGenerator.generateDistributedSquareMatrix(sc, m, n, numPartitions, seed) + } + + override def run(): JValue = { + val rank = intOptionValue(RANK) + + val start = System.currentTimeMillis() + runTest(rdd, rank) + val end = System.currentTimeMillis() + val time = (end - start).toDouble / 1000.0 + + Map("time" -> time) + } +} + + +class SVDTest(sc: SparkContext) extends LinearAlgebraTests(sc) { + override def runTest(data: RowMatrix, rank: Int) { + data.computeSVD(rank, computeU = true) + } +} + +class PCATest(sc: SparkContext) extends LinearAlgebraTests(sc) { + override def runTest(data: RowMatrix, rank: Int) { + val principal = data.computePrincipalComponents(rank) + sc.broadcast(principal) + data.multiply(principal) + } +} + +class ColumnSummaryStatisticsTest(sc: SparkContext) extends LinearAlgebraTests(sc) { + override def runTest(data: RowMatrix, rank: Int) { + data.computeColumnSummaryStatistics() + } +} diff --git a/mllib-tests/v2p0/src/main/scala/mllib/perf/MLAlgorithmTests.scala b/mllib-tests/v2p0/src/main/scala/mllib/perf/MLAlgorithmTests.scala new file mode 100644 index 0000000..37409b7 --- /dev/null +++ b/mllib-tests/v2p0/src/main/scala/mllib/perf/MLAlgorithmTests.scala @@ -0,0 +1,775 @@ +package mllib.perf + +import org.json4s.JsonAST._ +import org.json4s.JsonDSL._ + +import org.apache.spark.SparkContext +import org.apache.spark.ml.PredictionModel +import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier, RandomForestClassificationModel, RandomForestClassifier, LogisticRegression} +import org.apache.spark.ml.regression.{GBTRegressionModel, GBTRegressor, RandomForestRegressionModel, RandomForestRegressor, LinearRegression} +import org.apache.spark.mllib.classification._ +import org.apache.spark.mllib.clustering.{KMeans, KMeansModel} +import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.mllib.optimization.{SquaredL2Updater, L1Updater, SimpleUpdater} +import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating} +import org.apache.spark.mllib.regression._ +import org.apache.spark.mllib.tree.{GradientBoostedTrees, RandomForest} +import org.apache.spark.mllib.tree.configuration.{Algo, BoostingStrategy, QuantileStrategy, Strategy} +import org.apache.spark.mllib.tree.impurity.Variance +import org.apache.spark.mllib.tree.loss.{LogLoss, SquaredError} +import org.apache.spark.mllib.tree.model.{GradientBoostedTreesModel, RandomForestModel} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Row +import org.apache.spark.sql.SQLContext + +import mllib.perf.util.{DataGenerator, DataLoader} + +/** Parent class for tests which run on a large dataset. */ +abstract class RegressionAndClassificationTests[M](sc: SparkContext) extends PerfTest { + + def runTest(rdd: RDD[LabeledPoint]): M + + def validate(model: M, rdd: RDD[LabeledPoint]): Double + + val NUM_EXAMPLES = ("num-examples", "number of examples for regression tests") + val NUM_FEATURES = ("num-features", "number of features of each example for regression tests") + + intOptions = intOptions ++ Seq(NUM_FEATURES) + longOptions = Seq(NUM_EXAMPLES) + + var rdd: RDD[LabeledPoint] = _ + var testRdd: RDD[LabeledPoint] = _ + + override def run(): JValue = { + var start = System.currentTimeMillis() + val model = runTest(rdd) + val trainingTime = (System.currentTimeMillis() - start).toDouble / 1000.0 + + start = System.currentTimeMillis() + val trainingMetric = validate(model, rdd) + val testTime = (System.currentTimeMillis() - start).toDouble / 1000.0 + + val testMetric = validate(model, testRdd) + Map("trainingTime" -> trainingTime, "testTime" -> testTime, + "trainingMetric" -> trainingMetric, "testMetric" -> testMetric) + } + + /** + * For classification + * @param predictions RDD over (prediction, truth) for each instance + * @return Percent correctly classified + */ + def calculateAccuracy(predictions: RDD[(Double, Double)], numExamples: Long): Double = { + predictions.map{case (pred, label) => + if (pred == label) 1.0 else 0.0 + }.sum() * 100.0 / numExamples + } + + /** + * For regression + * @param predictions RDD over (prediction, truth) for each instance + * @return Root mean squared error (RMSE) + */ + def calculateRMSE(predictions: RDD[(Double, Double)], numExamples: Long): Double = { + val error = predictions.map{ case (pred, label) => + (pred - label) * (pred - label) + }.sum() + math.sqrt(error / numExamples) + } +} + +/** Parent class for Generalized Linear Model (GLM) tests */ +abstract class GLMTests(sc: SparkContext) + extends RegressionAndClassificationTests[GeneralizedLinearModel](sc) { + + val STEP_SIZE = ("step-size", "step size for SGD") + val NUM_ITERATIONS = ("num-iterations", "number of iterations for the algorithm") + val REG_TYPE = ("reg-type", "type of regularization: none, l1, l2, elastic-net") + val ELASTIC_NET_PARAM = ("elastic-net-param", "elastic-net param, 0.0 for L2, and 1.0 for L1") + val REG_PARAM = ("reg-param", "the regularization parameter against overfitting") + val OPTIMIZER = ("optimizer", "optimization algorithm (elastic-net only supports l-bfgs): sgd, l-bfgs") + + intOptions = intOptions ++ Seq(NUM_ITERATIONS) + doubleOptions = doubleOptions ++ Seq(ELASTIC_NET_PARAM, STEP_SIZE, REG_PARAM) + stringOptions = stringOptions ++ Seq(REG_TYPE, OPTIMIZER) +} + +class GLMRegressionTest(sc: SparkContext) extends GLMTests(sc) { + + val INTERCEPT = ("intercept", "intercept for random data generation") + val LABEL_NOISE = ("label-noise", "scale factor for the noise during label generation") + val LOSS = ("loss", "loss to minimize. Supported: l2 (squared error).") + + doubleOptions = doubleOptions ++ Seq(INTERCEPT, LABEL_NOISE) + stringOptions = stringOptions ++ Seq(LOSS) + + val options = intOptions ++ stringOptions ++ booleanOptions ++ doubleOptions ++ longOptions + addOptionsToParser() + + override def createInputData(seed: Long) = { + val numExamples: Long = longOptionValue(NUM_EXAMPLES) + val numFeatures: Int = intOptionValue(NUM_FEATURES) + val numPartitions: Int = intOptionValue(NUM_PARTITIONS) + + val intercept: Double = doubleOptionValue(INTERCEPT) + val labelNoise: Double = doubleOptionValue(LABEL_NOISE) + + val data = DataGenerator.generateLabeledPoints(sc, math.ceil(numExamples * 1.25).toLong, + numFeatures, intercept, labelNoise, numPartitions, seed) + + val split = data.randomSplit(Array(0.8, 0.2), seed) + + rdd = split(0).cache() + testRdd = split(1) + + // Materialize rdd + println("Num Examples: " + rdd.count()) + } + + override def validate(model: GeneralizedLinearModel, rdd: RDD[LabeledPoint]): Double = { + val numExamples = rdd.count() + val predictions: RDD[(Double, Double)] = rdd.map { example => + (model.predict(example.features), example.label) + } + calculateRMSE(predictions, numExamples) + } + + override def runTest(rdd: RDD[LabeledPoint]): GeneralizedLinearModel = { + val stepSize = doubleOptionValue(STEP_SIZE) + val loss = stringOptionValue(LOSS) + val regType = stringOptionValue(REG_TYPE) + val regParam = doubleOptionValue(REG_PARAM) + val elasticNetParam = doubleOptionValue(ELASTIC_NET_PARAM) + val numIterations = intOptionValue(NUM_ITERATIONS) + val optimizer = stringOptionValue(OPTIMIZER) + + // Linear Regression only supports squared loss for now. + if (!Array("l2").contains(loss)) { + throw new IllegalArgumentException( + s"GLMRegressionTest run with unknown loss ($loss). Supported values: l2.") + } + + if (regType == "elastic-net") { // use spark.ml + assert(optimizer == "auto" || optimizer == "l-bfgs", "GLMClassificationTest with" + + s" regType=elastic-net expects optimizer to be in {auto, l-bfgs}, but found: $optimizer") + println("WARNING: Linear Regression with elastic-net in ML package uses LBFGS/OWLQN for" + + " optimization which ignores stepSize in Spark 1.5.") + val rr = new LinearRegression() + .setElasticNetParam(elasticNetParam) + .setRegParam(regParam) + .setMaxIter(numIterations) + val sqlContext = new SQLContext(rdd.context) + import sqlContext.implicits._ + val mlModel = rr.fit(rdd.toDF()) + + new LinearRegressionModel(Vectors.fromML(mlModel.coefficients), + mlModel.intercept) + + } else { + assert(optimizer == "sgd", "GLMClassificationTest with" + + s" regType!=elastic-net expects optimizer to be sgd, but found: $optimizer") + (loss, regType) match { + case ("l2", "none") => + val lr = new LinearRegressionWithSGD().setIntercept(addIntercept = true) + lr.optimizer + .setNumIterations(numIterations) + .setStepSize(stepSize) + .setConvergenceTol(0.0) + lr.run(rdd) + case ("l2", "l1") => + val lasso = new LassoWithSGD().setIntercept(addIntercept = true) + lasso.optimizer + .setNumIterations(numIterations) + .setStepSize(stepSize) + .setRegParam(regParam) + .setConvergenceTol(0.0) + lasso.run(rdd) + case ("l2", "l2") => + val rr = new RidgeRegressionWithSGD().setIntercept(addIntercept = true) + rr.optimizer + .setNumIterations(numIterations) + .setStepSize(stepSize) + .setRegParam(regParam) + .setConvergenceTol(0.0) + rr.run(rdd) + case _ => + throw new IllegalArgumentException( + s"GLMRegressionTest given incompatible (loss, regType) = ($loss, $regType)." + + s" Note the set of supported combinations increases in later Spark versions.") + } + } + } +} + +class GLMClassificationTest(sc: SparkContext) extends GLMTests(sc) { + + val THRESHOLD = ("per-negative", "probability for a negative label during data generation") + val LOSS = ("loss", "loss to minimize. Supported: logistic, hinge (SVM).") + + doubleOptions = doubleOptions ++ Seq(THRESHOLD) + stringOptions = stringOptions ++ Seq(LOSS) + + val options = intOptions ++ stringOptions ++ booleanOptions ++ doubleOptions ++ longOptions + addOptionsToParser() + + override def validate(model: GeneralizedLinearModel, rdd: RDD[LabeledPoint]): Double = { + val numExamples = rdd.count() + val predictions: RDD[(Double, Double)] = rdd.map { example => + (model.predict(example.features), example.label) + } + calculateAccuracy(predictions, numExamples) + } + + override def createInputData(seed: Long) = { + val numExamples: Long = longOptionValue(NUM_EXAMPLES) + val numFeatures: Int = intOptionValue(NUM_FEATURES) + val numPartitions: Int = intOptionValue(NUM_PARTITIONS) + + val threshold: Double = doubleOptionValue(THRESHOLD) + + val data = DataGenerator.generateClassificationLabeledPoints(sc, + math.ceil(numExamples * 1.25).toLong, numFeatures, threshold, numPartitions, + seed) + + val split = data.randomSplit(Array(0.8, 0.2), seed) + + rdd = split(0).cache() + testRdd = split(1) + + // Materialize rdd + println("Num Examples: " + rdd.count()) + } + + override def runTest(rdd: RDD[LabeledPoint]): GeneralizedLinearModel = { + val stepSize = doubleOptionValue(STEP_SIZE) + val loss = stringOptionValue(LOSS) + val regType = stringOptionValue(REG_TYPE) + val regParam = doubleOptionValue(REG_PARAM) + val elasticNetParam = doubleOptionValue(ELASTIC_NET_PARAM) + val numIterations = intOptionValue(NUM_ITERATIONS) + val optimizer = stringOptionValue(OPTIMIZER) + + // For classification problem in GLM, we currently support logistic loss and hinge loss. + if (!Array("logistic", "hinge").contains(loss)) { + throw new IllegalArgumentException( + s"GLMClassificationTest run with unknown loss ($loss). Supported values: logistic, hinge.") + } + + if (regType == "elastic-net") { // use spark.ml + assert(optimizer == "auto" || optimizer == "l-bfgs", "GLMClassificationTest with" + + " regType=elastic-net expects optimizer to be in {auto, l-bfgs}") + loss match { + case "logistic" => + println("WARNING: Logistic Regression with elastic-net in ML package uses LBFGS/OWLQN" + + " for optimization which ignores stepSize in Spark 1.5.") + val lor = new LogisticRegression() + .setElasticNetParam(elasticNetParam) + .setRegParam(regParam) + .setMaxIter(numIterations) + val sqlContext = new SQLContext(rdd.context) + import sqlContext.implicits._ + val mlModel = lor.fit(rdd.toDF()) + new LogisticRegressionModel(Vectors.fromML(mlModel.coefficients), mlModel.intercept) + case _ => + throw new IllegalArgumentException( + s"GLMClassificationTest given unsupported loss = $loss." + + s" Note the set of supported combinations increases in later Spark versions.") + } + } else { + val updater = regType match { + case "none" => new SimpleUpdater + case "l1" => new L1Updater + case "l2" => new SquaredL2Updater + } + (loss, optimizer) match { + case ("logistic", "sgd") => + val lr = new LogisticRegressionWithSGD() + lr.optimizer + .setStepSize(stepSize) + .setNumIterations(numIterations) + .setConvergenceTol(0.0) + .setUpdater(updater) + lr.run(rdd) + case ("logistic", "l-bfgs") => + println("WARNING: LogisticRegressionWithLBFGS ignores stepSize in this Spark version.") + val lr = new LogisticRegressionWithLBFGS() + lr.optimizer + .setNumIterations(numIterations) + .setConvergenceTol(0.0) + .setUpdater(updater) + lr.run(rdd) + case ("hinge", "sgd") => + val svm = new SVMWithSGD() + svm.optimizer + .setNumIterations(numIterations) + .setStepSize(stepSize) + .setRegParam(regParam) + .setConvergenceTol(0.0) + .setUpdater(updater) + svm.run(rdd) + case _ => + throw new IllegalArgumentException( + s"GLMClassificationTest given incompatible (loss, regType) = ($loss, $regType)." + + s" Supported combinations include: (elastic-net, _), (logistic, sgd), (logistic, l-bfgs), (hinge, sgd)." + + s" Note the set of supported combinations increases in later Spark versions.") + } + } + } +} + +abstract class RecommendationTests(sc: SparkContext) extends PerfTest { + + def runTest(rdd: RDD[Rating]): MatrixFactorizationModel + + val NUM_USERS = ("num-users", "number of users for recommendation tests") + val NUM_PRODUCTS = ("num-products", "number of features of each example for recommendation tests") + val NUM_RATINGS = ("num-ratings", "number of ratings for recommendation tests") + val RANK = ("rank", "rank of factorized matrices for recommendation tests") + val IMPLICIT = ("implicit-prefs", "use implicit ratings") + val NUM_ITERATIONS = ("num-iterations", "number of iterations for the algorithm") + val REG_PARAM = ("reg-param", "the regularization parameter against overfitting") + + intOptions = intOptions ++ Seq(NUM_USERS, NUM_PRODUCTS, RANK, NUM_ITERATIONS) + longOptions = longOptions ++ Seq(NUM_RATINGS) + booleanOptions = booleanOptions ++ Seq(IMPLICIT) + doubleOptions = doubleOptions ++ Seq(REG_PARAM) + val options = intOptions ++ stringOptions ++ booleanOptions ++ longOptions ++ doubleOptions + addOptionsToParser() + + var rdd: RDD[Rating] = _ + var testRdd: RDD[Rating] = _ + + override def createInputData(seed: Long) = { + val numPartitions: Int = intOptionValue(NUM_PARTITIONS) + + val numUsers: Int = intOptionValue(NUM_USERS) + val numProducts: Int = intOptionValue(NUM_PRODUCTS) + val numRatings: Long = longOptionValue(NUM_RATINGS) + val implicitRatings: Boolean = booleanOptionValue(IMPLICIT) + + val data = DataGenerator.generateRatings(sc, numUsers, numProducts, + numRatings, implicitRatings, numPartitions, seed) + + rdd = data._1.cache() + testRdd = data._2 + + // Materialize rdd + println("Num Examples: " + rdd.count()) + } + + def validate(model: MatrixFactorizationModel, + data: RDD[Rating]): Double = { + val implicitPrefs: Boolean = booleanOptionValue(IMPLICIT) + val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product))) + val predictionsAndRatings: RDD[(Double, Double)] = predictions.map{ x => + def mapPredictedRating(r: Double) = if (implicitPrefs) math.max(math.min(r, 1.0), 0.0) else r + ((x.user, x.product), mapPredictedRating(x.rating)) + }.join(data.map(x => ((x.user, x.product), x.rating))).values + + math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).mean()) + } + + override def run(): JValue = { + var start = System.currentTimeMillis() + val model = runTest(rdd) + val trainingTime = (System.currentTimeMillis() - start).toDouble / 1000.0 + + start = System.currentTimeMillis() + val trainingMetric = validate(model, rdd) + val testTime = (System.currentTimeMillis() - start).toDouble / 1000.0 + + val testMetric = validate(model, testRdd) + + val numThingsToRecommend = 10 + start = System.currentTimeMillis() + model.recommendProductsForUsers(numThingsToRecommend).count() + val recommendProductsForUsersTime = (System.currentTimeMillis() - start).toDouble / 1000.0 + start = System.currentTimeMillis() + model.recommendUsersForProducts(numThingsToRecommend).count() + val recommendUsersForProductsTime = (System.currentTimeMillis() - start).toDouble / 1000.0 + + Map("trainingTime" -> trainingTime, "testTime" -> testTime, + "trainingMetric" -> trainingMetric, "testMetric" -> testMetric, + "recommendProductsForUsersTime" -> recommendProductsForUsersTime, + "recommendUsersForProductsTime" -> recommendUsersForProductsTime) + } +} + +abstract class ClusteringTests(sc: SparkContext) extends PerfTest { + + def runTest(rdd: RDD[Vector]): KMeansModel + + val NUM_POINTS = ("num-points", "number of points for clustering tests") + val NUM_COLUMNS = ("num-columns", "number of columns for each point for clustering tests") + val NUM_CENTERS = ("num-centers", "number of centers for clustering tests") + val NUM_ITERATIONS = ("num-iterations", "number of iterations for the algorithm") + + intOptions = intOptions ++ Seq(NUM_CENTERS, NUM_COLUMNS, NUM_ITERATIONS) + longOptions = longOptions ++ Seq(NUM_POINTS) + val options = intOptions ++ stringOptions ++ booleanOptions ++ longOptions ++ doubleOptions + addOptionsToParser() + + var rdd: RDD[Vector] = _ + var testRdd: RDD[Vector] = _ + + def validate(model: KMeansModel, rdd: RDD[Vector]): Double = { + val numPoints = rdd.cache().count() + + val error = model.computeCost(rdd) + + math.sqrt(error/numPoints) + } + + override def createInputData(seed: Long) = { + val numPartitions: Int = intOptionValue(NUM_PARTITIONS) + + val numPoints: Long = longOptionValue(NUM_POINTS) + val numColumns: Int = intOptionValue(NUM_COLUMNS) + val numCenters: Int = intOptionValue(NUM_CENTERS) + + val data = DataGenerator.generateKMeansVectors(sc, math.ceil(numPoints*1.25).toLong, numColumns, + numCenters, numPartitions, seed) + + val split = data.randomSplit(Array(0.8, 0.2), seed) + + rdd = split(0).cache() + testRdd = split(1) + + // Materialize rdd + println("Num Examples: " + rdd.count()) + } + + override def run(): JValue = { + var start = System.currentTimeMillis() + val model = runTest(rdd) + val trainingTime = (System.currentTimeMillis() - start).toDouble / 1000.0 + + start = System.currentTimeMillis() + val trainingMetric = validate(model, rdd) + val testTime = (System.currentTimeMillis() - start).toDouble / 1000.0 + + val testMetric = validate(model, testRdd) + Map("trainingTime" -> trainingTime, "testTime" -> testTime, + "trainingMetric" -> trainingMetric, "testMetric" -> testMetric) + } +} + +// Classification Algorithms + +class NaiveBayesTest(sc: SparkContext) + extends RegressionAndClassificationTests[NaiveBayesModel](sc) { + + val THRESHOLD = ("per-negative", "probability for a negative label during data generation") + val SMOOTHING = ("nb-lambda", "the smoothing parameter lambda for Naive Bayes") + val MODEL_TYPE = ("model-type", "either multinomial (default) or bernoulli") + + doubleOptions = doubleOptions ++ Seq(THRESHOLD, SMOOTHING) + stringOptions = stringOptions ++ Seq(MODEL_TYPE) + val options = intOptions ++ stringOptions ++ booleanOptions ++ doubleOptions ++ longOptions + addOptionsToParser() + + /** Note: using same data generation as for GLMClassificationTest, but should change later */ + override def createInputData(seed: Long) = { + val numExamples: Long = longOptionValue(NUM_EXAMPLES) + val numFeatures: Int = intOptionValue(NUM_FEATURES) + val numPartitions: Int = intOptionValue(NUM_PARTITIONS) + + val threshold: Double = doubleOptionValue(THRESHOLD) + val modelType = stringOptionValue(MODEL_TYPE) + + val data = if (modelType == "bernoulli") { + DataGenerator.generateBinaryLabeledPoints(sc, + math.ceil(numExamples * 1.25).toLong, numFeatures, threshold, numPartitions, seed) + } else { + val negdata = DataGenerator.generateClassificationLabeledPoints(sc, + math.ceil(numExamples * 1.25).toLong, numFeatures, threshold, numPartitions, + seed) + val dataNonneg = negdata.map { lp => + LabeledPoint(lp.label, Vectors.dense(lp.features.toArray.map(math.abs))) + } + dataNonneg + } + + val split = data.randomSplit(Array(0.8, 0.2), seed) + + rdd = split(0).cache() + testRdd = split(1) + + // Materialize rdd + println("Num Examples: " + rdd.count()) + } + + override def validate(model: NaiveBayesModel, rdd: RDD[LabeledPoint]): Double = { + val numExamples = rdd.count() + val predictions: RDD[(Double, Double)] = rdd.map { example => + (model.predict(example.features), example.label) + } + calculateAccuracy(predictions, numExamples) + } + + override def runTest(rdd: RDD[LabeledPoint]): NaiveBayesModel = { + val lambda = doubleOptionValue(SMOOTHING) + + val modelType = stringOptionValue(MODEL_TYPE) + NaiveBayes.train(rdd, lambda, modelType) + } +} + + +// Recommendation +class ALSTest(sc: SparkContext) extends RecommendationTests(sc) { + override def runTest(rdd: RDD[Rating]): MatrixFactorizationModel = { + val numIterations: Int = intOptionValue(NUM_ITERATIONS) + val rank: Int = intOptionValue(RANK) + val regParam = doubleOptionValue(REG_PARAM) + val seed = intOptionValue(RANDOM_SEED) + 12 + + new ALS().setIterations(numIterations).setRank(rank).setSeed(seed).setLambda(regParam) + .setBlocks(rdd.partitions.length).run(rdd) + } +} + +// Clustering +// TODO: refactor into mllib.perf.clustering like the other clustering tests +class KMeansTest(sc: SparkContext) extends ClusteringTests(sc) { + override def runTest(rdd: RDD[Vector]): KMeansModel = { + val numIterations: Int = intOptionValue(NUM_ITERATIONS) + val k: Int = intOptionValue(NUM_CENTERS) + KMeans.train(rdd, k, numIterations) + } +} + +// Decision-tree +sealed trait TreeBasedModel +case class MLlibRFModel(model: RandomForestModel) extends TreeBasedModel +case class MLlibGBTModel(model: GradientBoostedTreesModel) extends TreeBasedModel +case class MLRFRegressionModel(model: RandomForestRegressionModel) extends TreeBasedModel +case class MLRFClassificationModel(model: RandomForestClassificationModel) extends TreeBasedModel +case class MLGBTRegressionModel(model: GBTRegressionModel) extends TreeBasedModel +case class MLGBTClassificationModel(model: GBTClassificationModel) extends TreeBasedModel + +/** + * Parent class for DecisionTree-based tests which run on a large dataset. + */ +abstract class DecisionTreeTests(sc: SparkContext) + extends RegressionAndClassificationTests[TreeBasedModel](sc) { + + val TEST_DATA_FRACTION = + ("test-data-fraction", "fraction of data to hold out for testing (ignored if given training and test dataset)") + val LABEL_TYPE = + ("label-type", "Type of label: 0 indicates regression, 2+ indicates " + + "classification with this many classes") + val FRAC_CATEGORICAL_FEATURES = ("frac-categorical-features", + "Fraction of features which are categorical") + val FRAC_BINARY_FEATURES = + ("frac-binary-features", "Fraction of categorical features which are binary. " + + "Others have 20 categories.") + val TREE_DEPTH = ("tree-depth", "Depth of true decision tree model used to label examples.") + val MAX_BINS = ("max-bins", "Maximum number of bins for the decision tree learning algorithm.") + val NUM_TREES = ("num-trees", "Number of trees to train. If 1, run DecisionTree. If >1, run an ensemble method (RandomForest).") + val FEATURE_SUBSET_STRATEGY = + ("feature-subset-strategy", "Strategy for feature subset sampling. Supported: auto, all, sqrt, log2, onethird.") + + intOptions = intOptions ++ Seq(LABEL_TYPE, TREE_DEPTH, MAX_BINS, NUM_TREES) + doubleOptions = doubleOptions ++ Seq(TEST_DATA_FRACTION, FRAC_CATEGORICAL_FEATURES, FRAC_BINARY_FEATURES) + stringOptions = stringOptions ++ Seq(FEATURE_SUBSET_STRATEGY) + + addOptionalOptionToParser("training-data", "path to training dataset (if not given, use random data)", "", classOf[String]) + addOptionalOptionToParser("test-data", "path to test dataset (only used if training dataset given)" + + " (if not given, hold out part of training data for validation)", "", classOf[String]) + + var categoricalFeaturesInfo: Map[Int, Int] = Map.empty + + protected var labelType = -1 + + def validate(model: TreeBasedModel, rdd: RDD[LabeledPoint]): Double = { + val numExamples = rdd.count() + val predictions: RDD[(Double, Double)] = model match { + case MLlibRFModel(rfModel) => rfModel.predict(rdd.map(_.features)).zip(rdd.map(_.label)) + case MLlibGBTModel(gbtModel) => gbtModel.predict(rdd.map(_.features)).zip(rdd.map(_.label)) + case MLRFRegressionModel(rfModel) => makePredictions(rfModel, rdd) + case MLRFClassificationModel(rfModel) => makePredictions(rfModel, rdd) + case MLGBTRegressionModel(gbtModel) => makePredictions(gbtModel, rdd) + case MLGBTClassificationModel(gbtModel) => makePredictions(gbtModel, rdd) + } + val labelType: Int = intOptionValue(LABEL_TYPE) + if (labelType == 0) { + calculateRMSE(predictions, numExamples) + } else { + calculateAccuracy(predictions, numExamples) + } + } + + // TODO: generate DataFrame outside of `runTest` so it is not included in timing results + private def makePredictions( + model: PredictionModel[org.apache.spark.ml.linalg.Vector, _], rdd: RDD[LabeledPoint]): RDD[(Double, Double)] = { + val labelType: Int = intOptionValue(LABEL_TYPE) + val dataFrame = DataGenerator.setMetadata(rdd, categoricalFeaturesInfo, labelType) + val results = model.transform(dataFrame) + results + .select(model.getPredictionCol, model.getLabelCol) + .rdd + .map { case Row(prediction: Double, label: Double) => (prediction, label) } + } +} + +class DecisionTreeTest(sc: SparkContext) extends DecisionTreeTests(sc) { + val supportedTreeTypes = Array("RandomForest", "GradientBoostedTrees", + "ml.RandomForest", "ml.GradientBoostedTrees") + + val ENSEMBLE_TYPE = ("ensemble-type", "Type of ensemble algorithm: " + supportedTreeTypes.mkString(" ")) + + stringOptions = stringOptions ++ Seq(ENSEMBLE_TYPE) + + val options = intOptions ++ stringOptions ++ booleanOptions ++ doubleOptions ++ longOptions + addOptionsToParser() + + private def getTestDataFraction: Double = { + val testDataFraction: Double = doubleOptionValue(TEST_DATA_FRACTION) + assert(testDataFraction >= 0 && testDataFraction <= 1, s"Bad testDataFraction: $testDataFraction") + testDataFraction + } + + override def createInputData(seed: Long) = { + val trainingDataPath: String = optionValue[String]("training-data") + val (rdds, categoricalFeaturesInfo_, numClasses) = if (trainingDataPath != "") { + println(s"LOADING FILE: $trainingDataPath") + val numPartitions: Int = intOptionValue(NUM_PARTITIONS) + val testDataPath: String = optionValue[String]("test-data") + val testDataFraction: Double = getTestDataFraction + DataLoader.loadLibSVMFiles(sc, numPartitions, trainingDataPath, testDataPath, + testDataFraction, seed) + } else { + createSyntheticInputData(seed) + } + assert(rdds.length == 2) + rdd = rdds(0).cache() + testRdd = rdds(1) + categoricalFeaturesInfo = categoricalFeaturesInfo_ + this.labelType = numClasses + + // Materialize rdd + println("Num Examples: " + rdd.count()) + } + + /** + * Create synthetic training and test datasets. + * @return (trainTestDatasets, categoricalFeaturesInfo, numClasses) where + * trainTestDatasets = Array(trainingData, testData), + * categoricalFeaturesInfo is a map of categorical feature arities, and + * numClasses = number of classes label can take. + */ + private def createSyntheticInputData( + seed: Long): (Array[RDD[LabeledPoint]], Map[Int, Int], Int) = { + // Generic test options + val numPartitions: Int = intOptionValue(NUM_PARTITIONS) + // Data dimensions and type + val numExamples: Long = longOptionValue(NUM_EXAMPLES) + val numFeatures: Int = intOptionValue(NUM_FEATURES) + val labelType: Int = intOptionValue(LABEL_TYPE) + val fracCategoricalFeatures: Double = doubleOptionValue(FRAC_CATEGORICAL_FEATURES) + val fracBinaryFeatures: Double = doubleOptionValue(FRAC_BINARY_FEATURES) + // Model specification + val treeDepth: Int = intOptionValue(TREE_DEPTH) + + val (rdd_, categoricalFeaturesInfo_) = + DataGenerator.generateDecisionTreeLabeledPoints(sc, math.ceil(numExamples * 1.25).toLong, + numFeatures, numPartitions, labelType, + fracCategoricalFeatures, fracBinaryFeatures, treeDepth, seed) + + val splits = rdd_.randomSplit(Array(0.8, 0.2), seed) + (splits, categoricalFeaturesInfo_, labelType) + } + + // TODO: generate DataFrame outside of `runTest` so it is not included in timing results + override def runTest(rdd: RDD[LabeledPoint]): TreeBasedModel = { + val treeDepth: Int = intOptionValue(TREE_DEPTH) + val maxBins: Int = intOptionValue(MAX_BINS) + val numTrees: Int = intOptionValue(NUM_TREES) + val featureSubsetStrategy: String = stringOptionValue(FEATURE_SUBSET_STRATEGY) + val ensembleType: String = stringOptionValue(ENSEMBLE_TYPE) + if (!supportedTreeTypes.contains(ensembleType)) { + throw new IllegalArgumentException( + s"DecisionTreeTest given unknown ensembleType param: $ensembleType." + + " Supported values: " + supportedTreeTypes.mkString(" ")) + } + if (labelType == 0) { + // Regression + ensembleType match { + case "RandomForest" => + MLlibRFModel(RandomForest.trainRegressor(rdd, categoricalFeaturesInfo, numTrees, + featureSubsetStrategy, "variance", treeDepth, maxBins, this.getRandomSeed)) + case "ml.RandomForest" => + val labelType: Int = intOptionValue(LABEL_TYPE) + val dataset = DataGenerator.setMetadata(rdd, categoricalFeaturesInfo, labelType) + val model = new RandomForestRegressor() + .setImpurity("variance") + .setMaxDepth(treeDepth) + .setMaxBins(maxBins) + .setNumTrees(numTrees) + .setFeatureSubsetStrategy(featureSubsetStrategy) + .setSeed(this.getRandomSeed) + .fit(dataset) + MLRFRegressionModel(model) + case "GradientBoostedTrees" => + val treeStrategy = new Strategy(Algo.Regression, Variance, treeDepth, + labelType, maxBins, QuantileStrategy.Sort, categoricalFeaturesInfo) + val boostingStrategy = BoostingStrategy(treeStrategy, SquaredError, numTrees, + learningRate = 0.1) + MLlibGBTModel(GradientBoostedTrees.train(rdd, boostingStrategy)) + case "ml.GradientBoostedTrees" => + val labelType: Int = intOptionValue(LABEL_TYPE) + val dataset = DataGenerator.setMetadata(rdd, categoricalFeaturesInfo, labelType) + val model = new GBTRegressor() + .setLossType("squared") + .setMaxBins(maxBins) + .setMaxDepth(treeDepth) + .setMaxIter(numTrees) + .setStepSize(0.1) + .setSeed(this.getRandomSeed) + .fit(dataset) + MLGBTRegressionModel(model) + } + } else if (labelType >= 2) { + // Classification + ensembleType match { + case "RandomForest" => + MLlibRFModel(RandomForest.trainClassifier(rdd, labelType, categoricalFeaturesInfo, numTrees, + featureSubsetStrategy, "gini", treeDepth, maxBins, this.getRandomSeed)) + case "ml.RandomForest" => + val labelType: Int = intOptionValue(LABEL_TYPE) + val dataset = DataGenerator.setMetadata(rdd, categoricalFeaturesInfo, labelType) + val model = new RandomForestClassifier() + .setImpurity("gini") + .setMaxDepth(treeDepth) + .setMaxBins(maxBins) + .setNumTrees(numTrees) + .setFeatureSubsetStrategy(featureSubsetStrategy) + .setSeed(this.getRandomSeed) + .fit(dataset) + MLRFClassificationModel(model) + case "GradientBoostedTrees" => + val treeStrategy = new Strategy(Algo.Classification, Variance, treeDepth, + labelType, maxBins, QuantileStrategy.Sort, categoricalFeaturesInfo) + val boostingStrategy = BoostingStrategy(treeStrategy, LogLoss, numTrees, + learningRate = 0.1) + MLlibGBTModel(GradientBoostedTrees.train(rdd, boostingStrategy)) + case "ml.GradientBoostedTrees" => + val labelType: Int = intOptionValue(LABEL_TYPE) + val dataset = DataGenerator.setMetadata(rdd, categoricalFeaturesInfo, labelType) + val model = new GBTClassifier() + .setLossType("logistic") + .setMaxBins(maxBins) + .setMaxDepth(treeDepth) + .setMaxIter(numTrees) + .setStepSize(0.1) + .setSeed(this.getRandomSeed) + .fit(dataset) + MLGBTClassificationModel(model) + } + } else { + throw new IllegalArgumentException(s"Bad label-type parameter " + + s"given to DecisionTreeTest: $labelType") + } + } +} diff --git a/mllib-tests/v2p0/src/main/scala/mllib/perf/PerfTest.scala b/mllib-tests/v2p0/src/main/scala/mllib/perf/PerfTest.scala new file mode 100644 index 0000000..bf51482 --- /dev/null +++ b/mllib-tests/v2p0/src/main/scala/mllib/perf/PerfTest.scala @@ -0,0 +1,134 @@ +package mllib.perf + +import scala.collection.JavaConverters._ + +import joptsimple.{OptionSet, OptionParser} + +import org.json4s._ + +import org.slf4j._ + +abstract class PerfTest { + + val NUM_TRIALS = ("num-trials", "number of trials to run") + val INTER_TRIAL_WAIT = ("inter-trial-wait", "seconds to sleep between trials") + val NUM_PARTITIONS = ("num-partitions", "number of input partitions") + val RANDOM_SEED = ("random-seed", "seed for random number generator") + + val log = LoggerFactory.getLogger("PerfTest") + def logInfo(msg: String) { + if (log.isInfoEnabled) { + log.info(msg) + } + } + + /** Initialize internal state based on arguments */ + def initialize(testName_ : String, otherArgs: Array[String]) { + testName = testName_ + optionSet = parser.parse(otherArgs:_*) + } + + def getRandomSeed: Int = { + intOptionValue(RANDOM_SEED) + } + + def getNumTrials: Int = { + intOptionValue(NUM_TRIALS) + } + + def getWait: Int = { + intOptionValue(INTER_TRIAL_WAIT) * 1000 + } + + def createInputData(seed: Long) + + /** + * Runs the test and returns a JSON object that captures performance metrics, such as time taken, + * and values of any parameters. + * + * The rendered JSON will look like this (except it will be minified): + * + * { + * "options": { + * "num-partitions": "10", + * "unique-values": "10", + * ... + * }, + * "results": [ + * { + * "trainingTime": 0.211, + * "trainingMetric": 98.1, + * ... + * }, + * ... + * ] + * } + * + * @return metrics from run (e.g. ("time" -> time) + * */ + def run(): JValue + + val parser = new OptionParser() + var optionSet: OptionSet = _ + var testName: String = _ + + var intOptions: Seq[(String, String)] = Seq(NUM_TRIALS, INTER_TRIAL_WAIT, NUM_PARTITIONS, + RANDOM_SEED) + + var doubleOptions: Seq[(String, String)] = Seq() + var longOptions: Seq[(String, String)] = Seq() + + var stringOptions: Seq[(String, String)] = Seq() + var booleanOptions: Seq[(String, String)] = Seq() + + def addOptionsToParser() { + // add all the options to parser + stringOptions.map{case (opt, desc) => + parser.accepts(opt, desc).withRequiredArg().ofType(classOf[String]).required() + } + booleanOptions.map{case (opt, desc) => + parser.accepts(opt, desc) + } + intOptions.map{case (opt, desc) => + parser.accepts(opt, desc).withRequiredArg().ofType(classOf[Int]).required() + } + doubleOptions.map{case (opt, desc) => + parser.accepts(opt, desc).withRequiredArg().ofType(classOf[Double]).required() + } + longOptions.map{case (opt, desc) => + parser.accepts(opt, desc).withRequiredArg().ofType(classOf[Long]).required() + } + } + + def addOptionalOptionToParser[T](opt: String, desc: String, default: T, clazz: Class[T]): Unit = { + parser.accepts(opt, desc).withOptionalArg().ofType(clazz).defaultsTo(default) + } + + def intOptionValue(option: (String, String)) = + optionSet.valueOf(option._1).asInstanceOf[Int] + + def stringOptionValue(option: (String, String)) = + optionSet.valueOf(option._1).asInstanceOf[String] + + def booleanOptionValue(option: (String, String)) = + optionSet.has(option._1) + + def doubleOptionValue(option: (String, String)) = + optionSet.valueOf(option._1).asInstanceOf[Double] + + def longOptionValue(option: (String, String)) = + optionSet.valueOf(option._1).asInstanceOf[Long] + + def optionValue[T](option: String) = + optionSet.valueOf(option).asInstanceOf[T] + + def getOptions: Map[String, String] = { + optionSet.asMap().asScala.flatMap { case (spec, values) => + if (spec.options().size() == 1 && values.size() == 1) { + Some((spec.options().iterator().next(), values.iterator().next().toString)) + } else { + None + } + }.toMap + } +} diff --git a/mllib-tests/v2p0/src/main/scala/mllib/perf/StatTests.scala b/mllib-tests/v2p0/src/main/scala/mllib/perf/StatTests.scala new file mode 100644 index 0000000..2e84629 --- /dev/null +++ b/mllib-tests/v2p0/src/main/scala/mllib/perf/StatTests.scala @@ -0,0 +1,109 @@ +package mllib.perf + +import org.json4s.JsonDSL._ +import org.json4s.JsonAST._ + +import scala.util.Random + +import org.apache.spark.mllib.linalg.{Matrices, Vectors, Matrix, Vector} +import org.apache.spark.SparkContext +import org.apache.spark.mllib.random.RandomRDDs +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.mllib.stat.Statistics +import org.apache.spark.rdd.RDD + +import mllib.perf.util.DataGenerator + + +/** + * Parent class for the tests for the statistics toolbox + */ +abstract class StatTests[T](sc: SparkContext) extends PerfTest { + + def runTest(rdd: T) + + val NUM_ROWS = ("num-rows", "number of rows of the matrix") + val NUM_COLS = ("num-cols", "number of columns of the matrix") + + longOptions = Seq(NUM_ROWS) + intOptions = intOptions ++ Seq(NUM_COLS) + + var rdd: T = _ + + val options = intOptions ++ stringOptions ++ booleanOptions ++ doubleOptions ++ longOptions + addOptionsToParser() + + override def run(): JValue = { + val start = System.currentTimeMillis() + runTest(rdd) + val end = System.currentTimeMillis() + val time = (end - start).toDouble / 1000.0 + Map("time" -> time) + } +} + +abstract class CorrelationTests(sc: SparkContext) extends StatTests[RDD[Vector]](sc){ + override def createInputData(seed: Long) = { + val m: Long = longOptionValue(NUM_ROWS) + val n: Int = intOptionValue(NUM_COLS) + val numPartitions: Int = intOptionValue(NUM_PARTITIONS) + + rdd = RandomRDDs.normalVectorRDD(sc, m, n, numPartitions, seed).cache() + + // Materialize rdd + println("Num Examples: " + rdd.count()) + } +} + +class PearsonCorrelationTest(sc: SparkContext) extends CorrelationTests(sc) { + override def runTest(data: RDD[Vector]) { + Statistics.corr(data) + } +} + +class SpearmanCorrelationTest(sc: SparkContext) extends CorrelationTests(sc) { + override def runTest(data: RDD[Vector]) { + Statistics.corr(data, "spearman") + } +} + +class ChiSquaredFeatureTest(sc: SparkContext) extends StatTests[RDD[LabeledPoint]](sc) { + override def createInputData(seed: Long) = { + val m: Long = longOptionValue(NUM_ROWS) + val n: Int = intOptionValue(NUM_COLS) + val numPartitions: Int = intOptionValue(NUM_PARTITIONS) + + rdd = DataGenerator.generateClassificationLabeledPoints(sc, m, n, 0.5, numPartitions, + seed, chiSq = true).cache() + + // Materialize rdd + println("Num Examples: " + rdd.count()) + } + override def runTest(data: RDD[LabeledPoint]) { + Statistics.chiSqTest(data) + } +} + +class ChiSquaredGoFTest(sc: SparkContext) extends StatTests[Vector](sc) { + override def createInputData(seed: Long) = { + val m: Long = longOptionValue(NUM_ROWS) + val rng = new Random(seed) + + rdd = Vectors.dense(Array.fill(m.toInt)(rng.nextDouble())) + } + override def runTest(data: Vector) { + Statistics.chiSqTest(data) + } +} + +class ChiSquaredMatTest(sc: SparkContext) extends StatTests[Matrix](sc) { + override def createInputData(seed: Long) = { + val m: Long = longOptionValue(NUM_ROWS) + val rng = new Random(seed) + + rdd = Matrices.dense(m.toInt, m.toInt, Array.fill(m.toInt * m.toInt)(rng.nextDouble())) + } + override def runTest(data: Matrix) { + Statistics.chiSqTest(data) + } +} diff --git a/mllib-tests/v2p0/src/main/scala/mllib/perf/TestRunner.scala b/mllib-tests/v2p0/src/main/scala/mllib/perf/TestRunner.scala new file mode 100644 index 0000000..421b62f --- /dev/null +++ b/mllib-tests/v2p0/src/main/scala/mllib/perf/TestRunner.scala @@ -0,0 +1,87 @@ +package mllib.perf + +import scala.collection.JavaConverters._ + +import org.json4s.JsonAST._ +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.{SparkConf, SparkContext} + +import mllib.perf.clustering.{GaussianMixtureTest, LDATest, PICTest} +import mllib.perf.feature.Word2VecTest +import mllib.perf.fpm.{FPGrowthTest, PrefixSpanTest} +import mllib.perf.linalg.BlockMatrixMultTest + +object TestRunner { + def main(args: Array[String]) { + if (args.length < 1) { + println( + "mllib.perf.TestRunner requires 1 or more args, you gave %s, exiting".format(args.length)) + System.exit(1) + } + val testName = args(0) + val perfTestArgs = args.slice(1, args.length) + val sc = new SparkContext(new SparkConf().setAppName("TestRunner: " + testName)) + + // Unfortunate copy of code because there are Perf Tests in both projects and the compiler doesn't like it + val test: PerfTest = testName match { + case "glm-regression" => new GLMRegressionTest(sc) + case "glm-classification" => new GLMClassificationTest(sc) + case "naive-bayes" => new NaiveBayesTest(sc) + // recommendation + case "als" => new ALSTest(sc) + // clustering + case "gmm" => new GaussianMixtureTest(sc) + case "kmeans" => new KMeansTest(sc) + case "lda" => new LDATest(sc) + case "pic" => new PICTest(sc) + // trees + case "decision-tree" => new DecisionTreeTest(sc) + // linalg + case "svd" => new SVDTest(sc) + case "pca" => new PCATest(sc) + case "block-matrix-mult" => new BlockMatrixMultTest(sc) + // stats + case "summary-statistics" => new ColumnSummaryStatisticsTest(sc) + case "pearson" => new PearsonCorrelationTest(sc) + case "spearman" => new SpearmanCorrelationTest(sc) + case "chi-sq-feature" => new ChiSquaredFeatureTest(sc) + case "chi-sq-gof" => new ChiSquaredGoFTest(sc) + case "chi-sq-mat" => new ChiSquaredMatTest(sc) + // feature + case "word2vec" => new Word2VecTest(sc) + // frequent pattern mining + case "fp-growth" => new FPGrowthTest(sc) + case "prefix-span" => new PrefixSpanTest(sc) + } + test.initialize(testName, perfTestArgs) + // Generate a new dataset for each test + val rand = new java.util.Random(test.getRandomSeed) + + val numTrials = test.getNumTrials + val interTrialWait = test.getWait + + var testOptions: JValue = test.getOptions + val results: Seq[JValue] = (1 to numTrials).map { i => + test.createInputData(rand.nextLong()) + val res: JValue = test.run() + System.gc() + Thread.sleep(interTrialWait) + res + } + // Report the test results as a JSON object describing the test options, Spark + // configuration, Java system properties, as well as the per-test results. + // This extra information helps to ensure reproducibility and makes automatic analysis easier. + val json: JValue = + ("testName" -> testName) ~ + ("options" -> testOptions) ~ + ("sparkConf" -> sc.getConf.getAll.toMap) ~ + ("sparkVersion" -> sc.version) ~ + ("systemProperties" -> System.getProperties.asScala.toMap) ~ + ("results" -> results) + println("results: " + compact(render(json))) + + sc.stop() + } +} diff --git a/mllib-tests/v2p0/src/main/scala/mllib/perf/clustering/GaussianMixtureTest.scala b/mllib-tests/v2p0/src/main/scala/mllib/perf/clustering/GaussianMixtureTest.scala new file mode 100644 index 0000000..95ce9c6 --- /dev/null +++ b/mllib-tests/v2p0/src/main/scala/mllib/perf/clustering/GaussianMixtureTest.scala @@ -0,0 +1,63 @@ +package mllib.perf.clustering + +import java.util.Random + +import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV} +import org.json4s.JValue +import org.json4s.JsonDSL._ + +import org.apache.spark.SparkContext +import org.apache.spark.mllib.clustering.GaussianMixture +import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.rdd.RDD + +import mllib.perf.PerfTest + +class GaussianMixtureTest(sc: SparkContext) extends PerfTest { + + val NUM_POINTS = ("num-points", "number of points for clustering tests") + val NUM_COLUMNS = ("num-columns", "number of columns for each point for clustering tests") + val NUM_CENTERS = ("num-centers", "number of centers for clustering tests") + val NUM_ITERATIONS = ("num-iterations", "number of iterations for the algorithm") + + intOptions ++= Seq(NUM_CENTERS, NUM_COLUMNS, NUM_ITERATIONS) + longOptions ++= Seq(NUM_POINTS) + val options = intOptions ++ stringOptions ++ booleanOptions ++ longOptions ++ doubleOptions + addOptionsToParser() + + var data: RDD[Vector] = _ + + override def createInputData(seed: Long): Unit = { + val m = longOptionValue(NUM_POINTS) + val n = intOptionValue(NUM_COLUMNS) + val k = intOptionValue(NUM_CENTERS) + val p = intOptionValue(NUM_PARTITIONS) + + val random = new Random(seed ^ 8793480384L) + val mu = Array.fill(k)(new BDV[Double](Array.fill(n)(random.nextGaussian()))) + val f = Array.fill(k)(new BDM[Double](n, n, Array.fill(n * n)(random.nextGaussian()))) + data = sc.parallelize(0L until m, p) + .mapPartitionsWithIndex { (idx, part) => + val rng = new Random(seed & idx) + part.map { _ => + val i = (rng.nextDouble() * k).toInt + val x = new BDV[Double](Array.fill(n)(rng.nextGaussian())) + val y = f(i) * x + mu(i) + Vectors.dense(y.data) + } + }.cache() + logInfo(s"Generated ${data.count()} points.") + } + + override def run(): JValue = { + val numIterations = intOptionValue(NUM_ITERATIONS) + val k = intOptionValue(NUM_CENTERS) + val start = System.currentTimeMillis() + val gmm = new GaussianMixture() + .setK(k) + .setMaxIterations(numIterations) + val model = gmm.run(data) + val duration = (System.currentTimeMillis() - start) / 1e3 + "time" -> duration + } +} diff --git a/mllib-tests/v2p0/src/main/scala/mllib/perf/clustering/LDATest.scala b/mllib-tests/v2p0/src/main/scala/mllib/perf/clustering/LDATest.scala new file mode 100644 index 0000000..812f2da --- /dev/null +++ b/mllib-tests/v2p0/src/main/scala/mllib/perf/clustering/LDATest.scala @@ -0,0 +1,73 @@ +package mllib.perf.clustering + +import mllib.perf.PerfTest + +import org.json4s.JValue +import org.json4s.JsonDSL._ + +import scala.collection.mutable.{HashMap => MHashMap} + +import org.apache.commons.math3.random.Well19937c +import org.apache.spark.SparkContext +import org.apache.spark.mllib.clustering.LDA +import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.rdd.RDD + +class LDATest(sc: SparkContext) extends PerfTest { + + val NUM_DOCUMENTS = ("num-documents", "number of documents in corpus") + val NUM_VOCABULARY = ("num-vocab", "number of terms in vocabulary") + val NUM_TOPICS = ("num-topics", "number of topics to infer") + val NUM_ITERATIONS = ("num-iterations", "number of iterations for the algorithm") + val DOCUMENT_LENGTH = ("document-length", "number of words per document for the algorithm") + val OPTIMIZER = ("optimizer", "optimization algorithm: em or online") + + intOptions ++= Seq(NUM_VOCABULARY, NUM_TOPICS, NUM_ITERATIONS, DOCUMENT_LENGTH) + longOptions ++= Seq(NUM_DOCUMENTS) + stringOptions ++= Seq(OPTIMIZER) + val options = intOptions ++ stringOptions ++ booleanOptions ++ longOptions ++ doubleOptions + addOptionsToParser() + + var data: RDD[(Long, Vector)] = _ + + override def createInputData(seed: Long): Unit = { + val numDocs = longOptionValue(NUM_DOCUMENTS) + val numVocab = intOptionValue(NUM_VOCABULARY) + val k = intOptionValue(NUM_TOPICS) + + val numPartitions = intOptionValue(NUM_PARTITIONS) + val docLength = intOptionValue(DOCUMENT_LENGTH) + + data = sc.parallelize(0L until numDocs, numPartitions) + .mapPartitionsWithIndex { (idx, part) => + val rng = new Well19937c(seed ^ idx) + part.map { case docIndex => + var currentSize = 0 + val entries = MHashMap[Int, Int]() + while (currentSize < docLength) { + val index = rng.nextInt(numVocab) + entries(index) = entries.getOrElse(index, 0) + 1 + currentSize += 1 + } + + val iter = entries.toSeq.map(v => (v._1, v._2.toDouble)) + (docIndex, Vectors.sparse(numVocab, iter)) + } + }.cache() + logInfo(s"Number of documents = ${data.count()}.") + } + + override def run(): JValue = { + val k = intOptionValue(NUM_TOPICS) + val numIterations = intOptionValue(NUM_ITERATIONS) + val optimizer = stringOptionValue(OPTIMIZER) + val start = System.currentTimeMillis() + val lda = new LDA() + .setK(k) + .setMaxIterations(numIterations) + .setOptimizer(optimizer) + val model = lda.run(data) + val duration = (System.currentTimeMillis() - start) / 1e3 + "time" -> duration + } +} diff --git a/mllib-tests/v2p0/src/main/scala/mllib/perf/clustering/PICTest.scala b/mllib-tests/v2p0/src/main/scala/mllib/perf/clustering/PICTest.scala new file mode 100644 index 0000000..6832ffa --- /dev/null +++ b/mllib-tests/v2p0/src/main/scala/mllib/perf/clustering/PICTest.scala @@ -0,0 +1,53 @@ +package mllib.perf.clustering + +import org.json4s.JValue +import org.json4s.JsonDSL._ + +import org.apache.spark.SparkContext +import org.apache.spark.mllib.clustering.PowerIterationClustering +import org.apache.spark.rdd.RDD + +import mllib.perf.PerfTest + +class PICTest(sc: SparkContext) extends PerfTest { + + val NUM_POINTS = ("num-points", "number of points") + val NODE_DEGREE = ("node-degree", "number of neighbors each node is connected to") + val NUM_CENTERS = ("num-centers", "number of centers for clustering tests") + val NUM_ITERATIONS = ("num-iterations", "number of iterations for the algorithm") + + intOptions ++= Seq(NODE_DEGREE, NUM_CENTERS, NUM_ITERATIONS) + longOptions ++= Seq(NUM_POINTS) + val options = intOptions ++ stringOptions ++ booleanOptions ++ longOptions ++ doubleOptions + addOptionsToParser() + + var data: RDD[(Long, Long, Double)] = _ + + override def createInputData(seed: Long): Unit = { + val numPoints = longOptionValue(NUM_POINTS) + val nodeDegree = intOptionValue(NODE_DEGREE) + val numPartitions = intOptionValue(NUM_PARTITIONS) + + // Generates a periodic banded matrix with bandwidth = nodeDegree + val data = sc.parallelize(0L to numPoints, numPartitions) + .flatMap { id => + (((id - nodeDegree / 2) % numPoints) until id).map { nbr => + (id, (nbr + numPoints) % numPoints, 1D) + } + } + logInfo(s"Generated ${data.count()} pairwise similarities.") + } + + override def run(): JValue = { + val numIterations = intOptionValue(NUM_ITERATIONS) + val k = intOptionValue(NUM_CENTERS) + val start = System.currentTimeMillis() + val pic = new PowerIterationClustering() + .setK(k) + .setMaxIterations(numIterations) + val model = pic.run(data) + val duration = (System.currentTimeMillis() - start) / 1e3 + "time" -> duration + } +} + diff --git a/mllib-tests/v2p0/src/main/scala/mllib/perf/feature/Word2VecTest.scala b/mllib-tests/v2p0/src/main/scala/mllib/perf/feature/Word2VecTest.scala new file mode 100644 index 0000000..389d094 --- /dev/null +++ b/mllib-tests/v2p0/src/main/scala/mllib/perf/feature/Word2VecTest.scala @@ -0,0 +1,69 @@ +package mllib.perf.feature + +import scala.collection.mutable + +import org.apache.commons.math3.random.Well19937c +import org.json4s.JValue +import org.json4s.JsonDSL._ + +import org.apache.spark.SparkContext +import org.apache.spark.mllib.feature.Word2Vec +import org.apache.spark.rdd.RDD + +import mllib.perf.PerfTest + +class Word2VecTest(sc: SparkContext) extends PerfTest { + + val NUM_SENTENCES = ("num-sentences", "number of sentences") + val NUM_WORDS = ("num-words", "vocabulary size") + val VECTOR_SIZE = ("vector-size", "vector size") + val NUM_ITERATIONS = ("num-iterations", "number of iterations") + val MIN_COUNT = ("min-count", "minimum count for a word to be included") + + intOptions ++= Seq(NUM_SENTENCES, NUM_WORDS, VECTOR_SIZE, NUM_ITERATIONS, MIN_COUNT) + + val options = intOptions ++ stringOptions ++ booleanOptions ++ doubleOptions ++ longOptions + addOptionsToParser() + + private val avgSentenceLength = 16 + private var sentences: RDD[Seq[String]] = _ + + override def createInputData(seed: Long): Unit = { + val numSentences = intOptionValue(NUM_SENTENCES) + val numPartitions = intOptionValue(NUM_PARTITIONS) + val numWords = intOptionValue(NUM_WORDS) + val p = 1.0 / avgSentenceLength + sentences = sc.parallelize(0 until numSentences, numPartitions) + .mapPartitionsWithIndex { (idx, part) => + val rng = new Well19937c(seed ^ idx) + part.map { case i => + var cur = rng.nextInt(numWords) + val sentence = mutable.ArrayBuilder.make[Int] + while (rng.nextDouble() > p) { + cur = (cur + rng.nextGaussian() * 10).toInt % numWords + if (cur < 0) { + cur += numWords + } + sentence += cur + } + sentence.result().map(_.toString).toSeq + } + }.cache() + logInfo(s"Number of sentences = ${sentences.count()}.") + } + + override def run(): JValue = { + val start = System.currentTimeMillis() + val numIterations = intOptionValue(NUM_ITERATIONS) + val numPartitions = math.ceil(math.pow(numIterations, 1.5)).toInt + val w2v = new Word2Vec() + .setNumPartitions(numPartitions) + .setNumIterations(numIterations) + .setVectorSize(intOptionValue(VECTOR_SIZE)) + .setMinCount(intOptionValue(MIN_COUNT)) + .setSeed(0L) + val model = w2v.fit(sentences) + val duration = (System.currentTimeMillis() - start) / 1e3 + "time" -> duration + } +} diff --git a/mllib-tests/v2p0/src/main/scala/mllib/perf/fpm/FPGrowthTest.scala b/mllib-tests/v2p0/src/main/scala/mllib/perf/fpm/FPGrowthTest.scala new file mode 100644 index 0000000..ddc19d0 --- /dev/null +++ b/mllib-tests/v2p0/src/main/scala/mllib/perf/fpm/FPGrowthTest.scala @@ -0,0 +1,65 @@ +package mllib.perf.fpm + +import org.apache.commons.math3.distribution.BinomialDistribution +import org.apache.commons.math3.random.Well19937c +import org.json4s.JValue +import org.json4s.JsonDSL._ + +import org.apache.spark.SparkContext +import org.apache.spark.mllib.fpm.FPGrowth +import org.apache.spark.rdd.RDD + +import mllib.perf.PerfTest + +class FPGrowthTest(sc: SparkContext) extends PerfTest { + + val NUM_BASKETS = ("num-baskets", "number of baskets") + val AVG_BASKET_SIZE = ("avg-basket-size", "average basket size. " + + "The distribution of basket sizes follows binomial distribution with B(10n,1/10).") + val NUM_ITEMS = ("num-items", "number of distinct items") + val MIN_SUPPORT = ("min-support", "minimum support level") + + intOptions = intOptions ++ Seq(NUM_BASKETS, AVG_BASKET_SIZE, NUM_ITEMS) + doubleOptions = doubleOptions ++ Seq(MIN_SUPPORT) + + val options = intOptions ++ stringOptions ++ booleanOptions ++ doubleOptions ++ longOptions + addOptionsToParser() + + private var baskets: RDD[Array[Int]] = _ + + override def createInputData(seed: Long): Unit = { + val numPartitions = intOptionValue(NUM_PARTITIONS) + val numBaskets = intOptionValue(NUM_BASKETS) + val numItems = intOptionValue(NUM_ITEMS) + val avgBasketSize = intOptionValue(AVG_BASKET_SIZE) + val maxRatio = 10 + baskets = sc.parallelize(0 until numBaskets, numPartitions) + .mapPartitionsWithIndex { (idx, part) => + val rng = new Well19937c(seed ^ idx) + val binom = new BinomialDistribution(rng, maxRatio * avgBasketSize, 1.0 / maxRatio) + part.map { i => + val basketSize = binom.sample() + // Use math.pow to create a skewed item distribution. + val items = Array.fill(basketSize)((numItems * math.pow(rng.nextDouble(), 0.1)).toInt) + items.toSet[Int].toArray // dedup + }.filter(_.nonEmpty) + }.cache() + val exactNumBaskets = baskets.count() + logInfo(s"Number of baskets: $exactNumBaskets.") + val totalNumItems = baskets.map(_.length.toLong).reduce(_ + _) + logInfo(s"Total number of items: $totalNumItems.") + logInfo(s"Average basket size: ${totalNumItems.toDouble/exactNumBaskets}.") + } + + override def run(): JValue = { + val start = System.currentTimeMillis() + val model = new FPGrowth() + .setMinSupport(doubleOptionValue(MIN_SUPPORT)) + .setNumPartitions(baskets.partitions.length * 8) + .run(baskets) + val numFreqItemsets = model.freqItemsets.count() + val duration = (System.currentTimeMillis() - start) / 1000.0 + logInfo(s"Number of frequent itemsets: $numFreqItemsets.") + "time" -> duration + } +} diff --git a/mllib-tests/v2p0/src/main/scala/mllib/perf/fpm/PrefixSpanTest.scala b/mllib-tests/v2p0/src/main/scala/mllib/perf/fpm/PrefixSpanTest.scala new file mode 100644 index 0000000..b8fc590 --- /dev/null +++ b/mllib-tests/v2p0/src/main/scala/mllib/perf/fpm/PrefixSpanTest.scala @@ -0,0 +1,82 @@ +package mllib.perf.fpm + +import org.apache.commons.math3.distribution.BinomialDistribution +import org.apache.commons.math3.random.Well19937c +import org.json4s.JValue +import org.json4s.JsonDSL._ + +import org.apache.spark.SparkContext +import org.apache.spark.mllib.fpm.PrefixSpan +import org.apache.spark.rdd.RDD + +import mllib.perf.PerfTest + +class PrefixSpanTest(sc: SparkContext) extends PerfTest { + + val NUM_SEQUENCES = ("num-sequences", "number of itemset sequences") + val AVG_SEQUENCE_SIZE = ("avg-sequence-size", "average number of itemsets in a sequence. " + + "The distribution of itemset sequence sizes follows binomial distribution with B(10n,1/10).") + val AVG_ITEMSET_SIZE = ("avg-itemset-size", "average number of items in a itemset. " + + "The distribution of itemset sizes follows binomial distribution with B(10n,1/10).") + val NUM_ITEMS = ("num-items", "number of distinct items") + val MIN_SUPPORT = ("min-support", "minimum support level") + val MAX_PATTERN_LEN = ("max-pattern-len", "maximum length of frequent itemset sequences") + val MAX_LOCAL_PROJ_DB_SIZE = ("max-local-proj-db-size", "maximum number of items allowed in a " + + "locally processed projected database") + + intOptions ++= Seq(NUM_SEQUENCES, AVG_SEQUENCE_SIZE, AVG_ITEMSET_SIZE, NUM_ITEMS, + MAX_PATTERN_LEN, MAX_LOCAL_PROJ_DB_SIZE) + doubleOptions ++= Seq(MIN_SUPPORT) + longOptions ++= Seq(MAX_LOCAL_PROJ_DB_SIZE) + + + val options = intOptions ++ stringOptions ++ booleanOptions ++ doubleOptions ++ longOptions + addOptionsToParser() + + private var sequences: RDD[Array[Array[Int]]] = _ + + override def createInputData(seed: Long): Unit = { + val numPartitions = intOptionValue(NUM_PARTITIONS) + val numSequences = intOptionValue(NUM_SEQUENCES) + val numItems = intOptionValue(NUM_ITEMS) + val avgSequenceSize = intOptionValue(AVG_SEQUENCE_SIZE) + val avgItemsetSize = intOptionValue(AVG_ITEMSET_SIZE) + val maxRatio = 10 + sequences = sc.parallelize(0 until numSequences, numPartitions) + .mapPartitionsWithIndex { (idx, part) => + val rng = new Well19937c(seed ^ idx) + val binomSeq = new BinomialDistribution(rng, maxRatio * avgSequenceSize, 1.0 / maxRatio) + val binomItemset = new BinomialDistribution(rng, maxRatio * avgItemsetSize, 1.0 / maxRatio) + part.map { i => + val seqSize = binomSeq.sample() + // Use math.pow to create a skewed item distribution. + val items = Array.fill(seqSize)( + Array.fill(binomItemset.sample())((numItems * math.pow(rng.nextDouble(), 0.1)).toInt) + ) + items.map(_.toSet[Int].toArray) // dedup + }.filter(_.nonEmpty) + }.cache() + val exactNumSeqs = sequences.count() + logInfo(s"Number of sequences: $exactNumSeqs.") + val totalNumItems = sequences.map(_.flatten.length.toLong).reduce(_ + _) + val totalNumItemsets = sequences.map(_.length.toLong).reduce(_ + _) + logInfo(s"Total number of items: $totalNumItems.") + logInfo(s"Total number of itemsets: $totalNumItemsets.") + logInfo(s"Average num itemsets per sequence: ${totalNumItemsets.toDouble/exactNumSeqs}.") + logInfo(s"Average num items per itemset: ${totalNumItems.toDouble/totalNumItemsets}.") + } + + override def run(): JValue = { + val start = System.currentTimeMillis() + val model = new PrefixSpan() + .setMinSupport(doubleOptionValue(MIN_SUPPORT)) + .setMaxPatternLength(intOptionValue(MAX_PATTERN_LEN)) + .setMaxLocalProjDBSize(longOptionValue(MAX_LOCAL_PROJ_DB_SIZE)) + .run(sequences) + val numFreqItemsets = model.freqSequences.count() + val duration = (System.currentTimeMillis() - start) / 1000.0 + logInfo(s"Number of frequent sequences: $numFreqItemsets.") + "time" -> duration + } +} + diff --git a/mllib-tests/v2p0/src/main/scala/mllib/perf/linalg/BlockMatrixMultTest.scala b/mllib-tests/v2p0/src/main/scala/mllib/perf/linalg/BlockMatrixMultTest.scala new file mode 100644 index 0000000..123368a --- /dev/null +++ b/mllib-tests/v2p0/src/main/scala/mllib/perf/linalg/BlockMatrixMultTest.scala @@ -0,0 +1,74 @@ +package mllib.perf.linalg + +import java.util.Random + +import org.json4s.JValue +import org.json4s.JsonDSL._ + +import org.apache.spark.SparkContext +import org.apache.spark.mllib.linalg.Matrices +import org.apache.spark.mllib.linalg.distributed.BlockMatrix + +import mllib.perf.PerfTest + +class BlockMatrixMultTest(sc: SparkContext) extends PerfTest { + + val M = ("m", "number of rows of A") + val K = ("k", "number of columns of A, the same as number of rows of B") + val N = ("n", "number of columns of B") + val BLOCK_SIZE = ("block-size", "block size") + + intOptions ++= Seq(BLOCK_SIZE) + longOptions ++= Seq(M, K, N) + + val options = intOptions ++ stringOptions ++ booleanOptions ++ doubleOptions ++ longOptions + addOptionsToParser() + + private var A: BlockMatrix = _ + private var B: BlockMatrix = _ + + override def createInputData(seed: Long): Unit = { + val m = longOptionValue(M) + val k = longOptionValue(K) + val n = longOptionValue(N) + val blockSize = intOptionValue(BLOCK_SIZE) + val numPartitions = intOptionValue(NUM_PARTITIONS) + + val random = new Random(seed) + + A = randn(m, k, blockSize, numPartitions, seed ^ random.nextLong()) + B = randn(k, n, blockSize, numPartitions, seed ^ random.nextLong()) + } + + def randn( + m: Long, + n: Long, + blockSize: Int, + numPartitions: Int, + seed: Long): BlockMatrix = { + val numRowBlocks = math.ceil(m / blockSize).toInt + val numColBlocks = math.ceil(n / blockSize).toInt + val sqrtParts = math.ceil(math.sqrt(numPartitions)).toInt + val rowBlockIds = sc.parallelize(0 until numRowBlocks, sqrtParts) + val colBlockIds = sc.parallelize(0 until numColBlocks, sqrtParts) + val blockIds = rowBlockIds.cartesian(colBlockIds) + val blocks = blockIds.mapPartitionsWithIndex { (idx, ids) => + val random = new Random(idx ^ seed) + ids.map { case (rowBlockId, colBlockId) => + val mi = math.min(m - rowBlockId * blockSize, blockSize).toInt + val ni = math.min(n - colBlockId * blockSize, blockSize).toInt + ((rowBlockId, colBlockId), Matrices.randn(mi, ni, random)) + } + }.cache() + logInfo(s"Generated ${blocks.count()} blocks.") + new BlockMatrix(blocks, blockSize, blockSize, m, n) + } + + override def run(): JValue = { + val start = System.currentTimeMillis() + val C = A.multiply(B) + C.blocks.count() + val duration = (System.currentTimeMillis() - start) / 1e3 + "time" -> duration + } +} diff --git a/mllib-tests/v2p0/src/main/scala/mllib/perf/util/DataGenerator.scala b/mllib-tests/v2p0/src/main/scala/mllib/perf/util/DataGenerator.scala new file mode 100644 index 0000000..ff3fd00 --- /dev/null +++ b/mllib-tests/v2p0/src/main/scala/mllib/perf/util/DataGenerator.scala @@ -0,0 +1,584 @@ +package mllib.perf.util + +import org.apache.spark.ml.attribute.{AttributeGroup, NumericAttribute, NominalAttribute} +import org.apache.spark.sql.{SQLContext, DataFrame} + +import scala.collection.mutable + +import org.apache.spark.mllib.linalg.{Vectors, Vector} +import org.apache.spark.mllib.linalg.distributed.RowMatrix +import org.apache.spark.mllib.random._ +import org.apache.spark.mllib.recommendation.Rating +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.mllib.tree.configuration.{Algo, FeatureType} +import org.apache.spark.mllib.tree.model.{Split, DecisionTreeModel, Node, Predict} +import org.apache.spark.rdd.{PairRDDFunctions, RDD} +import org.apache.spark.SparkContext + +object DataGenerator { + + def generateLabeledPoints( + sc: SparkContext, + numRows: Long, + numCols: Int, + intercept: Double, + labelNoise: Double, + numPartitions: Int, + seed: Long = System.currentTimeMillis(), + problem: String = ""): RDD[LabeledPoint] = { + + RandomRDDs.randomRDD(sc, new LinearDataGenerator(numCols,intercept, seed, labelNoise, problem), + numRows, numPartitions, seed) + + } + + def generateDistributedSquareMatrix( + sc: SparkContext, + m: Long, + n: Int, + numPartitions: Int, + seed: Long = System.currentTimeMillis()): RowMatrix = { + + val data: RDD[Vector] = RandomRDDs.normalVectorRDD(sc, m, n, numPartitions, seed) + + new RowMatrix(data,m,n) + } + + def generateClassificationLabeledPoints( + sc: SparkContext, + numRows: Long, + numCols: Int, + threshold: Double, + numPartitions: Int, + seed: Long = System.currentTimeMillis(), + chiSq: Boolean = false): RDD[LabeledPoint] = { + + RandomRDDs.randomRDD(sc, new ClassLabelGenerator(numCols,threshold, chiSq), + numRows, numPartitions, seed) + } + + def generateBinaryLabeledPoints( + sc: SparkContext, + numRows: Long, + numCols: Int, + threshold: Double, + numPartitions: Int, + seed: Long = System.currentTimeMillis()): RDD[LabeledPoint] = { + + RandomRDDs.randomRDD(sc, new BinaryLabeledDataGenerator(numCols,threshold), + numRows, numPartitions, seed) + } + + /** + * @param labelType 0 = regression with labels in [0,1]. Values >= 2 indicate classification. + * @param fracCategorical Fraction of columns/features to be categorical. + * @param fracBinary Fraction of categorical features to be binary. Others are high-arity (20). + * @param treeDepth Depth of "true" tree used to label points. + * @return (data, categoricalFeaturesInfo) + * data is an RDD of data points. + * categoricalFeaturesInfo is a map storing the arity of categorical features. + * E.g., an entry (n -> k) indicates that feature n is categorical + * with k categories indexed from 0: {0, 1, ..., k-1}. + */ + def generateDecisionTreeLabeledPoints( + sc: SparkContext, + numRows: Long, + numCols: Int, + numPartitions: Int, + labelType: Int, + fracCategorical: Double, + fracBinary: Double, + treeDepth: Int, + seed: Long = System.currentTimeMillis()): (RDD[LabeledPoint], Map[Int, Int]) = { + + val highArity = 20 + + require(fracCategorical >= 0 && fracCategorical <= 1, + s"fracCategorical must be in [0,1], but it is $fracCategorical") + require(fracBinary >= 0 && fracBinary <= 1, + s"fracBinary must be in [0,1], but it is $fracBinary") + + val isRegression = labelType == 0 + if (!isRegression) { + require(labelType >= 2, s"labelType must be >= 2 for classification. 0 indicates regression.") + } + val numCategorical = (numCols * fracCategorical).toInt + val numContinuous = numCols - numCategorical + val numBinary = (numCategorical * fracBinary).toInt + val numHighArity = numCategorical - numBinary + val categoricalArities = Array.concat(Array.fill(numBinary)(2), + Array.fill(numHighArity)(highArity)) + + val featuresGenerator = new FeaturesGenerator(categoricalArities, numContinuous) + val featureMatrix = RandomRDDs.randomRDD(sc, featuresGenerator, + numRows, numPartitions, seed) + + // Create random DecisionTree. + val featureArity = Array.concat(categoricalArities, Array.fill(numContinuous)(0)) + val trueModel = randomBalancedDecisionTree(treeDepth, labelType, featureArity, seed) + println(trueModel) + + // Label points using tree. + val labelVector = featureMatrix.map(trueModel.predict) + + val data = labelVector.zip(featureMatrix).map(pair => new LabeledPoint(pair._1, pair._2)) + val categoricalFeaturesInfo = featuresGenerator.getCategoricalFeaturesInfo + (data, categoricalFeaturesInfo) + } + + /** + * From spark.ml.impl.TreeTests + * + * Convert the given data to a DataFrame, and set the features and label metadata. + * @param data Dataset. Categorical features and labels must already have 0-based indices. + * This must be non-empty. + * @param categoricalFeatures Map: categorical feature index -> number of distinct values + * @param numClasses Number of classes label can take. If 0, mark as continuous. + * @return DataFrame with metadata + */ + def setMetadata( + data: RDD[LabeledPoint], + categoricalFeatures: Map[Int, Int], + numClasses: Int): DataFrame = { + val sqlContext = SQLContext.getOrCreate(data.sparkContext) + import sqlContext.implicits._ + val df = data.toDF() + val numFeatures = data.first().features.size + val featuresAttributes = Range(0, numFeatures).map { feature => + if (categoricalFeatures.contains(feature)) { + NominalAttribute.defaultAttr.withIndex(feature).withNumValues(categoricalFeatures(feature)) + } else { + NumericAttribute.defaultAttr.withIndex(feature) + } + }.toArray + val featuresMetadata = new AttributeGroup("features", featuresAttributes).toMetadata() + val labelAttribute = if (numClasses == 0) { + NumericAttribute.defaultAttr.withName("label") + } else { + NominalAttribute.defaultAttr.withName("label").withNumValues(numClasses) + } + val labelMetadata = labelAttribute.toMetadata() + df.select(df("features").as("features", featuresMetadata), + df("label").as("label", labelMetadata)) + } + + + def randomBalancedDecisionTree( + depth: Int, + labelType: Int, + featureArity: Array[Int], + seed: Long = System.currentTimeMillis()): DecisionTreeModel = { + + require(depth >= 0, s"randomBalancedDecisionTree given depth < 0.") + require(depth <= featureArity.size, + s"randomBalancedDecisionTree requires depth <= featureArity.size," + + s" but depth = $depth and featureArity.size = ${featureArity.size}") + val isRegression = labelType == 0 + if (!isRegression) { + require(labelType >= 2, s"labelType must be >= 2 for classification. 0 indicates regression.") + } + + val rng = new scala.util.Random() + rng.setSeed(seed) + + val labelGenerator = if (isRegression) { + new RealLabelPairGenerator() + } else { + new ClassLabelPairGenerator(labelType) + } + + val topNode = randomBalancedDecisionTreeHelper(0, depth, featureArity, labelGenerator, + Set.empty, rng) + if (isRegression) { + new DecisionTreeModel(topNode, Algo.Regression) + } else { + new DecisionTreeModel(topNode, Algo.Classification) + } + } + + /** + * Create an internal node. Either create the leaf nodes beneath it, or recurse as needed. + * @param nodeIndex Index of node. + * @param subtreeDepth Depth of subtree to build. Depth 0 means this is a leaf node. + * @param featureArity Indicates feature type. Value 0 indicates continuous feature. + * Other values >= 2 indicate a categorical feature, + * where the value is the number of categories. + * @param usedFeatures Features appearing in the path from the tree root to the node + * being constructed. + * @param labelGenerator Generates pairs of distinct labels. + * @return + */ + def randomBalancedDecisionTreeHelper( + nodeIndex: Int, + subtreeDepth: Int, + featureArity: Array[Int], + labelGenerator: RandomDataGenerator[Pair[Double, Double]], + usedFeatures: Set[Int], + rng: scala.util.Random): Node = { + + if (subtreeDepth == 0) { + // This case only happens for a depth 0 tree. + return new Node(id = nodeIndex, predict = new Predict(0), impurity = 0, isLeaf = true, + split = None, leftNode = None, rightNode = None, stats = None) + } + + val numFeatures = featureArity.size + if (usedFeatures.size >= numFeatures) { + // Should not happen. + throw new RuntimeException(s"randomBalancedDecisionTreeSplitNode ran out of " + + s"features for splits.") + } + + // Make node internal. + var feature: Int = rng.nextInt(numFeatures) + while (usedFeatures.contains(feature)) { + feature = rng.nextInt(numFeatures) + } + val split: Split = if (featureArity(feature) == 0) { + // continuous feature + new Split(feature = feature, threshold = rng.nextDouble(), + featureType = FeatureType.Continuous, categories = List()) + } else { + // categorical feature + // Put nCatsSplit categories on left, and the rest on the right. + // nCatsSplit is in {1,...,arity-1}. + val nCatsSplit = rng.nextInt(featureArity(feature) - 1) + 1 + val splitCategories = rng.shuffle(Range(0,featureArity(feature)).toList).take(nCatsSplit) + new Split(feature = feature, threshold = 0, + featureType = FeatureType.Categorical, categories = + splitCategories.asInstanceOf[List[Double]]) + } + + val leftChildIndex = nodeIndex * 2 + 1 + val rightChildIndex = nodeIndex * 2 + 2 + if (subtreeDepth == 1) { + // Add leaf nodes. + val predictions = labelGenerator.nextValue() + new Node(id = nodeIndex, predict = new Predict(0), impurity = 0, isLeaf = false, split = Some(split), + leftNode = Some(new Node(id = leftChildIndex, predict = new Predict(predictions._1), impurity = 0, isLeaf = true, + split = None, leftNode = None, rightNode = None, stats = None)), + rightNode = Some(new Node(id = rightChildIndex, predict = new Predict(predictions._2), impurity = 0, isLeaf = true, + split = None, leftNode = None, rightNode = None, stats = None)), stats = None) + } else { + new Node(id = nodeIndex, predict = new Predict(0), impurity = 0, isLeaf = false, split = Some(split), + leftNode = Some(randomBalancedDecisionTreeHelper(leftChildIndex, subtreeDepth - 1, + featureArity, labelGenerator, usedFeatures + feature, rng)), + rightNode = Some(randomBalancedDecisionTreeHelper(rightChildIndex, subtreeDepth - 1, + featureArity, labelGenerator, usedFeatures + feature, rng)), stats = None) + } + } + + def generateKMeansVectors( + sc: SparkContext, + numRows: Long, + numCols: Int, + numCenters: Int, + numPartitions: Int, + seed: Long = System.currentTimeMillis()): RDD[Vector] = { + + RandomRDDs.randomRDD(sc, new KMeansDataGenerator(numCenters, numCols, seed), + numRows, numPartitions, seed) + } + + + // Problems with having a userID or productID in the test set but not training set + // leads to a lot of work... + def generateRatings( + sc: SparkContext, + numUsers: Int, + numProducts: Int, + numRatings: Long, + implicitPrefs: Boolean, + numPartitions: Int, + seed: Long = System.currentTimeMillis()): (RDD[Rating],RDD[Rating]) = { + + val train = RandomRDDs.randomRDD(sc, + new RatingGenerator(numUsers, numProducts,implicitPrefs), + numRatings, numPartitions, seed).cache() + + val test = RandomRDDs.randomRDD(sc, + new RatingGenerator(numUsers, numProducts,implicitPrefs), + math.ceil(numRatings * 0.25).toLong, numPartitions, seed + 24) + + // Now get rid of duplicate ratings and remove non-existant userID's + // and prodID's from the test set + val commons: PairRDDFunctions[(Int,Int),Rating] = + new PairRDDFunctions(train.keyBy(rating => (rating.user, rating.product)).cache()) + + val exact = commons.join(test.keyBy(rating => (rating.user, rating.product))) + + val trainPruned = commons.subtractByKey(exact).map(_._2).cache() + + // Now get rid of users that don't exist in the train set + val trainUsers: RDD[(Int,Rating)] = trainPruned.keyBy(rating => rating.user) + val testUsers: PairRDDFunctions[Int,Rating] = + new PairRDDFunctions(test.keyBy(rating => rating.user)) + val testWithAdditionalUsers = testUsers.subtractByKey(trainUsers) + + val userPrunedTestProds: RDD[(Int,Rating)] = + testUsers.subtractByKey(testWithAdditionalUsers).map(_._2).keyBy(rating => rating.product) + + val trainProds: RDD[(Int,Rating)] = trainPruned.keyBy(rating => rating.product) + + val testWithAdditionalProds = + new PairRDDFunctions[Int, Rating](userPrunedTestProds).subtractByKey(trainProds) + val finalTest = + new PairRDDFunctions[Int, Rating](userPrunedTestProds).subtractByKey(testWithAdditionalProds) + .map(_._2) + + (trainPruned, finalTest) + } + +} + +class RatingGenerator( + private val numUsers: Int, + private val numProducts: Int, + private val implicitPrefs: Boolean) extends RandomDataGenerator[Rating] { + + private val rng = new java.util.Random() + + private val observed = new mutable.HashMap[(Int, Int), Boolean]() + + override def nextValue(): Rating = { + var tuple = (rng.nextInt(numUsers),rng.nextInt(numProducts)) + while (observed.getOrElse(tuple,false)){ + tuple = (rng.nextInt(numUsers),rng.nextInt(numProducts)) + } + observed += (tuple -> true) + + val rating = if (implicitPrefs) rng.nextInt(2)*1.0 else rng.nextDouble()*5 + + new Rating(tuple._1, tuple._2, rating) + } + + override def setSeed(seed: Long) { + rng.setSeed(seed) + } + + override def copy(): RatingGenerator = new RatingGenerator(numUsers, numProducts, implicitPrefs) +} + +// For general classification +class ClassLabelGenerator( + private val numFeatures: Int, + private val threshold: Double, + private val chiSq: Boolean) extends RandomDataGenerator[LabeledPoint] { + + private val rng = new java.util.Random() + + override def nextValue(): LabeledPoint = { + val y = if (rng.nextDouble() < threshold) 0.0 else 1.0 + val x = Array.fill[Double](numFeatures) { + if (!chiSq) rng.nextGaussian() + y else rng.nextInt(6) * 1.0 + } + + LabeledPoint(y, Vectors.dense(x)) + } + + override def setSeed(seed: Long) { + rng.setSeed(seed) + } + + override def copy(): ClassLabelGenerator = + new ClassLabelGenerator(numFeatures, threshold, chiSq) +} + +class BinaryLabeledDataGenerator( + private val numFeatures: Int, + private val threshold: Double) extends RandomDataGenerator[LabeledPoint] { + + private val rng = new java.util.Random() + + override def nextValue(): LabeledPoint = { + val y = if (rng.nextDouble() < threshold) 0.0 else 1.0 + val x = Array.fill[Double](numFeatures) { + if (rng.nextDouble() < threshold) 0.0 else 1.0 + } + LabeledPoint(y, Vectors.dense(x)) + } + + override def setSeed(seed: Long) { + rng.setSeed(seed) + } + + override def copy(): BinaryLabeledDataGenerator = + new BinaryLabeledDataGenerator(numFeatures, threshold) + +} + +class LinearDataGenerator( + val numFeatures: Int, + val intercept: Double, + val seed: Long, + val labelNoise: Double, + val problem: String = "", + val sparsity: Double = 1.0) extends RandomDataGenerator[LabeledPoint] { + + private val rng = new java.util.Random(seed) + + private val weights = Array.fill(numFeatures)(rng.nextDouble()) + private val nnz: Int = math.ceil(numFeatures*sparsity).toInt + + override def nextValue(): LabeledPoint = { + val x = Array.fill[Double](nnz)(2*rng.nextDouble()-1) + + val y = weights.zip(x).map(p => p._1 * p._2).sum + intercept + labelNoise*rng.nextGaussian() + val yD = + if (problem == "SVM"){ + if (y < 0.0) 0.0 else 1.0 + } else{ + y + } + + LabeledPoint(yD, Vectors.dense(x)) + } + + override def setSeed(seed: Long) { + rng.setSeed(seed) + } + + override def copy(): LinearDataGenerator = + new LinearDataGenerator(numFeatures, intercept, seed, labelNoise, problem, sparsity) +} + + +/** + * Generator for a pair of distinct class labels from the set {0,...,numClasses-1}. + * @param numClasses Number of classes. + */ +class ClassLabelPairGenerator(val numClasses: Int) + extends RandomDataGenerator[Pair[Double, Double]] { + + require(numClasses >= 2, + s"ClassLabelPairGenerator given label numClasses = $numClasses, but numClasses should be >= 2.") + + private val rng = new java.util.Random() + + override def nextValue(): Pair[Double, Double] = { + val left = rng.nextInt(numClasses) + var right = rng.nextInt(numClasses) + while (right == left) { + right = rng.nextInt(numClasses) + } + new Pair[Double, Double](left, right) + } + + override def setSeed(seed: Long) { + rng.setSeed(seed) + } + + override def copy(): ClassLabelPairGenerator = new ClassLabelPairGenerator(numClasses) +} + + +/** + * Generator for a pair of real-valued labels. + */ +class RealLabelPairGenerator() extends RandomDataGenerator[Pair[Double, Double]] { + + private val rng = new java.util.Random() + + override def nextValue(): Pair[Double, Double] = + new Pair[Double, Double](rng.nextDouble(), rng.nextDouble()) + + override def setSeed(seed: Long) { + rng.setSeed(seed) + } + + override def copy(): RealLabelPairGenerator = new RealLabelPairGenerator() +} + + +/** + * Generator for a feature vector which can include a mix of categorical and continuous features. + * @param categoricalArities Specifies the number of categories for each categorical feature. + * @param numContinuous Number of continuous features. Feature values are in range [0,1]. + */ +class FeaturesGenerator(val categoricalArities: Array[Int], val numContinuous: Int) + extends RandomDataGenerator[Vector] { + + categoricalArities.foreach { arity => + require(arity >= 2, s"FeaturesGenerator given categorical arity = $arity, " + + s"but arity should be >= 2.") + } + + val numFeatures = categoricalArities.size + numContinuous + + private val rng = new java.util.Random() + + /** + * Generates vector with categorical features first, and continuous features in [0,1] second. + */ + override def nextValue(): Vector = { + // Feature ordering matches getCategoricalFeaturesInfo. + val arr = new Array[Double](numFeatures) + var j = 0 + while (j < categoricalArities.size) { + arr(j) = rng.nextInt(categoricalArities(j)) + j += 1 + } + while (j < numFeatures) { + arr(j) = rng.nextDouble() + j += 1 + } + Vectors.dense(arr) + } + + override def setSeed(seed: Long) { + rng.setSeed(seed) + } + + override def copy(): FeaturesGenerator = new FeaturesGenerator(categoricalArities, numContinuous) + + /** + * @return categoricalFeaturesInfo Map storing arity of categorical features. + * E.g., an entry (n -> k) indicates that feature n is categorical + * with k categories indexed from 0: {0, 1, ..., k-1}. + */ + def getCategoricalFeaturesInfo: Map[Int, Int] = { + // Categorical features are indexed from 0 because of the implementation of nextValue(). + categoricalArities.zipWithIndex.map(_.swap).toMap + } + +} + + +class KMeansDataGenerator( + val numCenters: Int, + val numColumns: Int, + val seed: Long) extends RandomDataGenerator[Vector] { + + private val rng = new java.util.Random(seed) + private val rng2 = new java.util.Random(seed + 24) + private val scale_factors = Array.fill(numCenters)(rng.nextInt(20) - 10) + + // Have a random number of points around a cluster + private val concentrations: Seq[Double] = { + val rand = Array.fill(numCenters)(rng.nextDouble()) + val randSum = rand.sum + val scaled = rand.map(x => x / randSum) + + (1 to numCenters).map{i => + scaled.slice(0, i).sum + } + } + + private val centers = (0 until numCenters).map{i => + Array.fill(numColumns)((2 * rng.nextDouble() - 1)*scale_factors(i)) + } + + override def nextValue(): Vector = { + val pick_center_rand = rng2.nextDouble() + + val centerToAddTo = centers(concentrations.indexWhere(p => pick_center_rand <= p)) + + Vectors.dense(Array.tabulate(numColumns)(i => centerToAddTo(i) + rng2.nextGaussian())) + } + + override def setSeed(seed: Long) { + rng.setSeed(seed) + } + + override def copy(): KMeansDataGenerator = new KMeansDataGenerator(numCenters, numColumns, seed) +} diff --git a/mllib-tests/v2p0/src/main/scala/mllib/perf/util/DataLoader.scala b/mllib-tests/v2p0/src/main/scala/mllib/perf/util/DataLoader.scala new file mode 100644 index 0000000..f0bd48c --- /dev/null +++ b/mllib-tests/v2p0/src/main/scala/mllib/perf/util/DataLoader.scala @@ -0,0 +1,143 @@ +package mllib.perf.util + +import org.apache.spark.SparkContext +import org.apache.spark.mllib.regression._ +import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.rdd.RDD + +object DataLoader { + + // For DecisionTreeTest: PartitionLabelStats tracks the stats for each partition. + class PartitionLabelStats( + var min: Double, + var max: Double, + var distinct: Long, + var nonInteger: Boolean) + extends Serializable + + object PartitionLabelStats extends Serializable { + /** Max categories allowed for categorical label (for inferring labelType) */ + val MAX_CATEGORIES = 1000 + + def labelSeqOp(lps: Iterator[LabeledPoint]): Iterator[PartitionLabelStats] = { + val stats = new PartitionLabelStats(Double.MaxValue, Double.MinValue, 0, false) + val labelSet = new scala.collection.mutable.HashSet[Double]() + lps.foreach { lp => + if (lp.label.toInt != lp.label) { + stats.nonInteger = true + } + stats.min = Math.min(lp.label, stats.min) + stats.max = Math.max(lp.label, stats.max) + if (labelSet.size <= MAX_CATEGORIES) { + labelSet.add(lp.label) + } + } + stats.distinct = labelSet.size + Iterator(stats) + Iterator(new PartitionLabelStats(0,0,0,false)) + } + + def labelCombOp( + labelStatsA: PartitionLabelStats, + labelStatsB: PartitionLabelStats): PartitionLabelStats = { + labelStatsA.min = Math.min(labelStatsA.min, labelStatsB.min) + labelStatsA.max = Math.max(labelStatsA.max, labelStatsB.max) + labelStatsA.distinct = Math.max(labelStatsB.distinct, labelStatsB.distinct) + labelStatsA + } + } + + /** Infer label type from data */ + private def isClassification(data: RDD[LabeledPoint]): Boolean = { + val labelStats = + data.mapPartitions(PartitionLabelStats.labelSeqOp) + .fold(new PartitionLabelStats(Double.MaxValue, Double.MinValue, 0, false))( + PartitionLabelStats.labelCombOp) + labelStats.distinct <= PartitionLabelStats.MAX_CATEGORIES && !labelStats.nonInteger + } + + /** + * Load training and test LibSVM-format data files. + * @return (trainTestDatasets, categoricalFeaturesInfo, numClasses) where + * trainTestDatasets = Array(trainingData, testData), + * categoricalFeaturesInfo is a map of categorical feature arities, and + * numClasses = number of classes label can take. + */ + private[perf] def loadLibSVMFiles( + sc: SparkContext, + numPartitions: Int, + trainingDataPath: String, + testDataPath: String, + testDataFraction: Double, + seed: Long): (Array[RDD[LabeledPoint]], Map[Int, Int], Int) = { + + val trainingData = MLUtils.loadLibSVMFile(sc, trainingDataPath, -1, numPartitions) + + val (rdds, categoricalFeaturesInfo_) = if (testDataPath == "") { + // randomly split trainingData into train, test + val splits = trainingData.randomSplit(Array(1.0 - testDataFraction, testDataFraction), seed) + (splits, Map.empty[Int, Int]) + } else { + // load test data + val numFeatures = trainingData.take(1)(0).features.size + val testData = MLUtils.loadLibSVMFile(sc, testDataPath, numFeatures, numPartitions) + (Array(trainingData, testData), Map.empty[Int, Int]) + } + + // For classification, re-index classes if needed. + val (finalDatasets, classIndexMap, numClasses) = { + if (isClassification(rdds(0)) && isClassification(rdds(1))) { + // classCounts: class --> # examples in class + val classCounts: Map[Double, Long] = { + val trainClassCounts = rdds(0).map(_.label).countByValue() + val testClassCounts = rdds(1).map(_.label).countByValue() + val mutableClassCounts = new scala.collection.mutable.HashMap[Double, Long]() + trainClassCounts.foreach { case (label, cnt) => + mutableClassCounts(label) = mutableClassCounts.getOrElseUpdate(label, 0) + cnt + } + testClassCounts.foreach { case (label, cnt) => + mutableClassCounts(label) = mutableClassCounts.getOrElseUpdate(label, 0) + cnt + } + mutableClassCounts.toMap + } + val sortedClasses = classCounts.keys.toList.sorted + val numClasses = classCounts.size + // classIndexMap: class --> index in 0,...,numClasses-1 + val classIndexMap = { + if (classCounts.keySet != Set(0.0, 1.0)) { + sortedClasses.zipWithIndex.toMap + } else { + Map[Double, Int]() + } + } + val indexedRdds = { + if (classIndexMap.isEmpty) { + rdds + } else { + rdds.map { rdd => + rdd.map(lp => LabeledPoint(classIndexMap(lp.label), lp.features)) + } + } + } + val numTrain = indexedRdds(0).count() + val numTest = indexedRdds(1).count() + val numTotalInstances = numTrain + numTest + println(s"numTrain: $numTrain") + println(s"numTest: $numTest") + println(s"numClasses: $numClasses") + println(s"Per-class example fractions, counts:") + println(s"Class\tFrac\tCount") + sortedClasses.foreach { c => + val frac = classCounts(c) / numTotalInstances.toDouble + println(s"$c\t$frac\t${classCounts(c)}") + } + (indexedRdds, classIndexMap, numClasses) + } else { + (rdds, null, 0) + } + } + + (finalDatasets, categoricalFeaturesInfo_, numClasses) + } + +} diff --git a/spark-tests/project/SparkTestsBuild.scala b/spark-tests/project/SparkTestsBuild.scala index 4116326..f77c39d 100644 --- a/spark-tests/project/SparkTestsBuild.scala +++ b/spark-tests/project/SparkTestsBuild.scala @@ -10,12 +10,12 @@ object SparkTestsBuild extends Build { settings = assemblySettings ++ Seq( organization := "org.spark-project", version := "0.1", - scalaVersion := "2.10.4", + scalaVersion := sys.props.getOrElse("scala.version", default="2.11.8"), libraryDependencies ++= Seq( "net.sf.jopt-simple" % "jopt-simple" % "4.6", "org.scalatest" %% "scalatest" % "2.2.1" % "test", "com.google.guava" % "guava" % "14.0.1", - "org.apache.spark" %% "spark-core" % "1.0.0" % "provided", + "org.apache.spark" %% "spark-core" % "2.0.0" % "provided", "org.json4s" %% "json4s-native" % "3.2.9" ), test in assembly := {}, @@ -36,4 +36,4 @@ object SparkTestsBuild extends Build { case _ => MergeStrategy.first } )) -} \ No newline at end of file +} diff --git a/spark-tests/src/main/scala/spark/perf/TestRunner.scala b/spark-tests/src/main/scala/spark/perf/TestRunner.scala index cbfcb0a..6c21f33 100644 --- a/spark-tests/src/main/scala/spark/perf/TestRunner.scala +++ b/spark-tests/src/main/scala/spark/perf/TestRunner.scala @@ -44,7 +44,7 @@ object TestRunner { ("sparkVersion" -> sc.version) ~ ("systemProperties" -> System.getProperties.asScala.toMap) ~ ("results" -> results) - println("results: " + compact(render(json))) + println("results: " + compact(json)) // Gracefully stop the SparkContext so that the application web UI can be preserved // and viewed using the HistoryServer. diff --git a/streaming-tests/project/StreamingTestsBuild.scala b/streaming-tests/project/StreamingTestsBuild.scala index 7c8c903..39784f3 100644 --- a/streaming-tests/project/StreamingTestsBuild.scala +++ b/streaming-tests/project/StreamingTestsBuild.scala @@ -10,14 +10,18 @@ object StreamingTestsBuild extends Build { settings = assemblySettings ++ Seq( organization := "org.spark-project", version := "0.1", - scalaVersion := "2.10.4", + scalaVersion := sys.props.getOrElse("scala.version", default="2.11.8"), libraryDependencies ++= Seq( "net.sf.jopt-simple" % "jopt-simple" % "4.5", "org.scalatest" %% "scalatest" % "2.2.1" % "test", "com.google.guava" % "guava" % "14.0.1", + "com.typesafe.akka" %% "akka-actor" % "2.3.11", + "com.typesafe.akka" %% "akka-slf4j" % "2.3.11", + "com.typesafe.akka" %% "akka-remote" % "2.3.11", + "com.typesafe.akka" %% "akka-agent" % "2.3.11", "org.slf4j" % "slf4j-log4j12" % "1.7.2", - "org.apache.spark" %% "spark-core" % "1.0.0" % "provided", - "org.apache.spark" %% "spark-streaming" % "1.0.0" % "provided" + "org.apache.spark" %% "spark-core" % "2.0.0" % "provided", + "org.apache.spark" %% "spark-streaming" % "2.0.0" % "provided" ), test in assembly := {}, outputPath in assembly := file("target/streaming-perf-tests-assembly.jar"), diff --git a/streaming-tests/src/main/scala/streaming/perf/HdfsRecoveryTest.scala b/streaming-tests/src/main/scala/streaming/perf/HdfsRecoveryTest.scala index 23e8b80..841ae5e 100644 --- a/streaming-tests/src/main/scala/streaming/perf/HdfsRecoveryTest.scala +++ b/streaming-tests/src/main/scala/streaming/perf/HdfsRecoveryTest.scala @@ -39,7 +39,7 @@ class HdfsRecoveryTest extends PerfTest { // Verify the running counts. For any key the running count should be in the sequence // 1, 3, 6, 10, 15, 21, ... (i.e., nth number is sum of 1..n) val expectedCounts = (1L to maxRecordsPerFile).map(x => (1L to x).reduce(_ + _)).toSet - wordStream.foreach((rdd: RDD[(String, Long)], time: Time) => { + wordStream.foreachRDD((rdd: RDD[(String, Long)], time: Time) => { val partitionCounts = rdd.sparkContext.runJob(rdd.mapPartitions(iter => iter.toSeq.groupBy(_._1).toSeq.map(x => (x._1, x._2.map(_._2).sum)).toIterator ), (iter: Iterator[(String, Long)]) => iter.toArray) @@ -48,7 +48,7 @@ class HdfsRecoveryTest extends PerfTest { val counts = rdd.reduceByKey(_ + _, 1).collect() println(s"New total count at $time = " + counts.mkString("[", ", ", "]")) }) - runningCountStream.foreach((rdd: RDD[(String, Long)], time: Time) => { + runningCountStream.foreachRDD((rdd: RDD[(String, Long)], time: Time) => { val counts = rdd.collect() val possibleCounts = expectedCounts val expected = counts.forall { case (word, count) => possibleCounts.contains(count) } diff --git a/streaming-tests/src/main/scala/streaming/perf/KVDataTest.scala b/streaming-tests/src/main/scala/streaming/perf/KVDataTest.scala index 628376d..9aeb989 100644 --- a/streaming-tests/src/main/scala/streaming/perf/KVDataTest.scala +++ b/streaming-tests/src/main/scala/streaming/perf/KVDataTest.scala @@ -70,7 +70,7 @@ abstract class KVDataTest extends PerfTest { // run test ssc.start() val startTime = System.currentTimeMillis - ssc.awaitTermination(totalDurationSec * 1000) + ssc.awaitTerminationOrTimeout(totalDurationSec * 1000) ssc.stop() processResults(statsReportListener) }