From 4522b72e7e37d56e445efd02c9e374f66d33f3fe Mon Sep 17 00:00:00 2001 From: Brandon Holt Date: Tue, 5 Aug 2014 16:56:12 -0700 Subject: [PATCH 1/4] working on using config to get hadoop.tmp.dir for snapshot --- .../com/twitter/scalding/ReplImplicits.scala | 54 +++++++++---------- .../com/twitter/scalding/ShellPipe.scala | 7 ++- 2 files changed, 32 insertions(+), 29 deletions(-) diff --git a/scalding-repl/src/main/scala/com/twitter/scalding/ReplImplicits.scala b/scalding-repl/src/main/scala/com/twitter/scalding/ReplImplicits.scala index 37f524fcaa..7915fa46f6 100644 --- a/scalding-repl/src/main/scala/com/twitter/scalding/ReplImplicits.scala +++ b/scalding-repl/src/main/scala/com/twitter/scalding/ReplImplicits.scala @@ -49,45 +49,43 @@ object ReplImplicits extends FieldConversions { fd } + def replConfig = { + val conf = Config.default + + // Create a jar to hold compiled code for this REPL session in addition to + // "tempjars" which can be passed in from the command line, allowing code + // in the repl to be distributed for the Hadoop job to run. + val replCodeJar = ScaldingShell.createReplCodeJar() + val tmpJarsConfig: Map[String, String] = + replCodeJar match { + case Some(jar) => + Map("tmpjars" -> { + // Use tmpjars already in the configuration. + conf.get("tmpjars").map(_ + ",").getOrElse("") + // And a jar of code compiled by the REPL. + .concat("file://" + jar.getAbsolutePath) + }) + case None => + // No need to add the tmpjars to the configuration + Map() + } + + conf ++ tmpJarsConfig + } + /** * Runs this pipe as a Scalding job. * * Automatically cleans up the flowDef to include only sources upstream from tails. */ - def run(implicit fd: FlowDef, md: Mode): Option[JobStats] = { - - def config = { - val conf = Config.default - - // Create a jar to hold compiled code for this REPL session in addition to - // "tempjars" which can be passed in from the command line, allowing code - // in the repl to be distributed for the Hadoop job to run. - val replCodeJar = ScaldingShell.createReplCodeJar() - val tmpJarsConfig: Map[String, String] = - replCodeJar match { - case Some(jar) => - Map("tmpjars" -> { - // Use tmpjars already in the configuration. - conf.get("tmpjars").map(_ + ",").getOrElse("") - // And a jar of code compiled by the REPL. - .concat("file://" + jar.getAbsolutePath) - }) - case None => - // No need to add the tmpjars to the configuration - Map() - } - - conf ++ tmpJarsConfig - } - - ExecutionContext.newContext(config)(fd, md).waitFor match { + def run(implicit fd: FlowDef, md: Mode): Option[JobStats] = + ExecutionContext.newContext(replConfig)(fd, md).waitFor match { case Success(stats) => Some(stats) case Failure(e) => println("Flow execution failed!") e.printStackTrace() None } - } /** * Converts a Cascading Pipe to a Scalding RichPipe. This method permits implicit conversions from diff --git a/scalding-repl/src/main/scala/com/twitter/scalding/ShellPipe.scala b/scalding-repl/src/main/scala/com/twitter/scalding/ShellPipe.scala index 12305e168c..ca21f8d076 100644 --- a/scalding-repl/src/main/scala/com/twitter/scalding/ShellPipe.scala +++ b/scalding-repl/src/main/scala/com/twitter/scalding/ShellPipe.scala @@ -61,7 +61,12 @@ class ShellTypedPipe[T](pipe: TypedPipe[T]) { case _: HadoopMode => // come up with unique temporary filename // TODO: refactor into TemporarySequenceFile class - val tmpSeq = "/tmp/scalding-repl/snapshot-" + UUID.randomUUID + ".seq" + val conf = replConfig + val tmpDir = conf.get("hadoop.tmp.dir") + .orElse(conf.get("cascading.tmp.dir")) + .getOrElse("/tmp") + println("@> tmpDir = " + tmpDir) + val tmpSeq = tmpDir + "/scalding-repl/snapshot-" + java.util.UUID.randomUUID + ".seq" val dest = TypedSequenceFile[T](tmpSeq) dest.writeFrom(p)(localFlow, md) run(localFlow, md) From 928bfa25200eb1a1df61928efe206e624dcb3fe9 Mon Sep 17 00:00:00 2001 From: Brandon Holt Date: Tue, 5 Aug 2014 17:21:24 -0700 Subject: [PATCH 2/4] replConfig: load hadoop config --- .../main/scala/com/twitter/scalding/ReplImplicits.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/scalding-repl/src/main/scala/com/twitter/scalding/ReplImplicits.scala b/scalding-repl/src/main/scala/com/twitter/scalding/ReplImplicits.scala index 7915fa46f6..d3a46e8e41 100644 --- a/scalding-repl/src/main/scala/com/twitter/scalding/ReplImplicits.scala +++ b/scalding-repl/src/main/scala/com/twitter/scalding/ReplImplicits.scala @@ -50,8 +50,12 @@ object ReplImplicits extends FieldConversions { } def replConfig = { - val conf = Config.default - + val conf = Config.default ++ { + mode match { + case h: HadoopMode => Config.fromHadoop(h.jobConf) + case _ => Config.empty + } + } // Create a jar to hold compiled code for this REPL session in addition to // "tempjars" which can be passed in from the command line, allowing code // in the repl to be distributed for the Hadoop job to run. From 00c9448b23dce526361f80022ab9560d95c4dc8c Mon Sep 17 00:00:00 2001 From: Brandon Holt Date: Tue, 5 Aug 2014 17:55:42 -0700 Subject: [PATCH 3/4] delete debug print --- .../src/main/scala/com/twitter/scalding/ShellPipe.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scalding-repl/src/main/scala/com/twitter/scalding/ShellPipe.scala b/scalding-repl/src/main/scala/com/twitter/scalding/ShellPipe.scala index ca21f8d076..b9171835d6 100644 --- a/scalding-repl/src/main/scala/com/twitter/scalding/ShellPipe.scala +++ b/scalding-repl/src/main/scala/com/twitter/scalding/ShellPipe.scala @@ -65,7 +65,7 @@ class ShellTypedPipe[T](pipe: TypedPipe[T]) { val tmpDir = conf.get("hadoop.tmp.dir") .orElse(conf.get("cascading.tmp.dir")) .getOrElse("/tmp") - println("@> tmpDir = " + tmpDir) + val tmpSeq = tmpDir + "/scalding-repl/snapshot-" + java.util.UUID.randomUUID + ".seq" val dest = TypedSequenceFile[T](tmpSeq) dest.writeFrom(p)(localFlow, md) From 3132309afd7f30b2b29c8884d71e09a42c01f2ba Mon Sep 17 00:00:00 2001 From: Katya Gonina Date: Wed, 6 Aug 2014 14:55:52 -0700 Subject: [PATCH 4/4] bumping version number --- CHANGES.md | 3 +++ scalding-core/src/main/scala/com/twitter/package.scala | 2 +- version.sbt | 2 +- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index fd151b14d8..009c314d30 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,8 @@ # Scalding # +### Version 0.11.2 ### +* hadoop.tmp.dir for snapshot in config + ### Version 0.11.1 ### * Fixes bad release portion where code wasn't updated for new scalding version number. * use cascading-jdbc 2.5.3 for table exists fix and cascading 2.5.5: https://github.com/twitter/scalding/pull/951 diff --git a/scalding-core/src/main/scala/com/twitter/package.scala b/scalding-core/src/main/scala/com/twitter/package.scala index 2caf86eb8b..817fd1b999 100644 --- a/scalding-core/src/main/scala/com/twitter/package.scala +++ b/scalding-core/src/main/scala/com/twitter/package.scala @@ -33,7 +33,7 @@ package object scalding { /** * Make sure this is in sync with version.sbt */ - val scaldingVersion: String = "0.11.1" + val scaldingVersion: String = "0.11.2" object RichPathFilter { implicit def toRichPathFilter(f: PathFilter) = new RichPathFilter(f) diff --git a/version.sbt b/version.sbt index 70a4e0c55f..798d1a1e01 100644 --- a/version.sbt +++ b/version.sbt @@ -1,2 +1,2 @@ -version in ThisBuild := "0.11.1" +version in ThisBuild := "0.11.2"