From d198abe0ceca7a21a1f9d3d591129dc21a2b5be1 Mon Sep 17 00:00:00 2001 From: ymahajan Date: Sun, 20 May 2018 07:12:17 -0700 Subject: [PATCH] Addressing precheckin failures --- .../scala/org/apache/spark/sql/execution/ExistingPlans.scala | 4 ++-- .../org/apache/spark/sql/execution/NonRecursivePlans.scala | 2 +- .../main/scala/org/apache/spark/sql/execution/TableExec.scala | 2 +- .../spark/sql/execution/aggregate/CollectAggregateExec.scala | 2 +- .../spark/sql/execution/columnar/ColumnBatchCreator.scala | 4 ++-- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/execution/ExistingPlans.scala b/core/src/main/scala/org/apache/spark/sql/execution/ExistingPlans.scala index f655456812..e7abbbe931 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/ExistingPlans.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/ExistingPlans.scala @@ -90,7 +90,7 @@ private[sql] abstract class PartitionedPhysicalScan( } protected override def doExecute(): RDD[InternalRow] = { - WholeStageCodegenExec(this)(codegenStageId = 0).execute() + WholeStageCodegenExec(this)(codegenStageId = 1).execute() } /** Specifies how data is partitioned across different nodes in the cluster. */ @@ -369,7 +369,7 @@ private[sql] final case class ZipPartitionScan(basePlan: CodegenSupport, } override protected def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { - WholeStageCodegenExec(this)(codegenStageId = 0).execute() + WholeStageCodegenExec(this)(codegenStageId = 1).execute() } override def output: Seq[Attribute] = basePlan.output diff --git a/core/src/main/scala/org/apache/spark/sql/execution/NonRecursivePlans.scala b/core/src/main/scala/org/apache/spark/sql/execution/NonRecursivePlans.scala index d038bbd6aa..34e690328f 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/NonRecursivePlans.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/NonRecursivePlans.scala @@ -38,7 +38,7 @@ abstract class NonRecursivePlans extends SparkPlan { throw new CodeGenerationException("Code generation failed for some of the child plans") } nonCodeGeneratedPlan = true - WholeStageCodegenExec(this)(codegenStageId = 0).execute() + WholeStageCodegenExec(this)(codegenStageId = 1).execute() } override def makeCopy(newArgs: Array[AnyRef]): NonRecursivePlans = { diff --git a/core/src/main/scala/org/apache/spark/sql/execution/TableExec.scala b/core/src/main/scala/org/apache/spark/sql/execution/TableExec.scala index af2f8229a4..6b2a9d9f4a 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/TableExec.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/TableExec.scala @@ -99,7 +99,7 @@ trait TableExec extends UnaryExecNode with CodegenSupportOnExecutor { override protected def doExecute(): RDD[InternalRow] = { // don't expect code generation to fail - WholeStageCodegenExec(this)(codegenStageId = 0).execute() + WholeStageCodegenExec(this)(codegenStageId = 1).execute() } override def inputRDDs(): Seq[RDD[InternalRow]] = { diff --git a/core/src/main/scala/org/apache/spark/sql/execution/aggregate/CollectAggregateExec.scala b/core/src/main/scala/org/apache/spark/sql/execution/aggregate/CollectAggregateExec.scala index 8c820b4d24..045af4ea9a 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/aggregate/CollectAggregateExec.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/aggregate/CollectAggregateExec.scala @@ -45,7 +45,7 @@ case class CollectAggregateExec( // temporarily switch producer to an InputAdapter for rows as normal // Iterator[UnsafeRow] which will be set explicitly in executeCollect() basePlan.childProducer = InputAdapter(child) - val (ctx, cleanedSource) = WholeStageCodegenExec(basePlan)(codegenStageId = 0).doCodeGen() + val (ctx, cleanedSource) = WholeStageCodegenExec(basePlan)(codegenStageId = 1).doCodeGen() basePlan.childProducer = child (cleanedSource, ctx.references.toArray) } diff --git a/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnBatchCreator.scala b/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnBatchCreator.scala index 6d65c339c4..bc8e5ae718 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnBatchCreator.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnBatchCreator.scala @@ -92,7 +92,7 @@ final class ColumnBatchCreator( // this is only used for local code generation while its RDD semantics // and related methods are all ignored val (ctx, code) = ExternalStoreUtils.codeGenOnExecutor( - WholeStageCodegenExec(insertPlan)(codegenStageId = 0), insertPlan) + WholeStageCodegenExec(insertPlan)(codegenStageId = 1), insertPlan) val references = ctx.references // also push the index of batchId reference at the end which can be // used by caller to update the reference objects before execution @@ -144,7 +144,7 @@ final class ColumnBatchCreator( // this is only used for local code generation while its RDD semantics // and related methods are all ignored val (ctx, code) = ExternalStoreUtils.codeGenOnExecutor( - WholeStageCodegenExec(insertPlan)(codegenStageId = 0), insertPlan) + WholeStageCodegenExec(insertPlan)(codegenStageId = 1), insertPlan) val references = ctx.references.toArray (code, references) })