Skip to content

Commit

Permalink
[GLUTEN-8326][CORE] Rename GlutenConfig.getConf to GlutenConfig.get (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
yikf authored Jan 3, 2025
1 parent f2fecae commit b956a72
Show file tree
Hide file tree
Showing 76 changed files with 176 additions and 180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ private class CHCelebornColumnarBatchSerializerInstance(
with Logging {

private lazy val conf = SparkEnv.get.conf
private lazy val gluten_conf = GlutenConfig.getConf
private lazy val gluten_conf = GlutenConfig.get
private lazy val compressionCodec = GlutenShuffleUtils.getCompressionCodec(conf)
private lazy val capitalizedCompressionCodec = compressionCodec.toUpperCase(Locale.ROOT)
private lazy val compressionLevel =
GlutenShuffleUtils.getCompressionLevel(
conf,
compressionCodec,
GlutenConfig.getConf.columnarShuffleCodecBackend.orNull)
GlutenConfig.get.columnarShuffleCodecBackend.orNull)

override def deserializeStream(in: InputStream): DeserializationStream = {
new DeserializationStream {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ class CHCelebornColumnarShuffleWriter[K, V](
nativeBufferSize,
capitalizedCompressionCodec,
compressionLevel,
GlutenConfig.getConf.chColumnarShuffleSpillThreshold,
GlutenConfig.get.chColumnarShuffleSpillThreshold,
CHBackendSettings.shuffleHashAlgorithm,
celebornPartitionPusher,
GlutenConfig.getConf.chColumnarForceMemorySortShuffle
GlutenConfig.get.chColumnarForceMemorySortShuffle
|| ShuffleMode.SORT.name.equalsIgnoreCase(shuffleWriterType)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class ClickhouseOptimisticTransaction(
writeOptions: Option[DeltaOptions],
isOptimize: Boolean,
additionalConstraints: Seq[Constraint]): Seq[FileAction] = {
val nativeWrite = GlutenConfig.getConf.enableNativeWriter.getOrElse(false)
val nativeWrite = GlutenConfig.get.enableNativeWriter.getOrElse(false)
if (writingMergeTree) {
// TODO: update FallbackByBackendSettings for mergetree always return true
val onePipeline = nativeWrite && CHConf.get.enableOnePipelineMergeTreeWrite
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,11 +282,11 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
}

override def supportSortExec(): Boolean = {
GlutenConfig.getConf.enableColumnarSort
GlutenConfig.get.enableColumnarSort
}

override def supportSortMergeJoinExec(): Boolean = {
GlutenConfig.getConf.enableColumnarSortMergeJoin
GlutenConfig.get.enableColumnarSortMergeJoin
}

override def supportWindowExec(windowFunctions: Seq[NamedExpression]): Boolean = {
Expand Down Expand Up @@ -391,7 +391,7 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
}

override def enableNativeWriteFiles(): Boolean = {
GlutenConfig.getConf.enableNativeWriter.getOrElse(false)
GlutenConfig.get.enableNativeWriter.getOrElse(false)
}

override def supportCartesianProductExec(): Boolean = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,12 +452,12 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
val readBatchNumRows = metrics("avgReadBatchNumRows")
val numOutputRows = metrics("numOutputRows")
val dataSize = metrics("dataSize")
if (GlutenConfig.getConf.isUseCelebornShuffleManager) {
if (GlutenConfig.get.isUseCelebornShuffleManager) {
val clazz = ClassUtils.getClass("org.apache.spark.shuffle.CHCelebornColumnarBatchSerializer")
val constructor =
clazz.getConstructor(classOf[SQLMetric], classOf[SQLMetric], classOf[SQLMetric])
constructor.newInstance(readBatchNumRows, numOutputRows, dataSize).asInstanceOf[Serializer]
} else if (GlutenConfig.getConf.isUseUniffleShuffleManager) {
} else if (GlutenConfig.get.isUseUniffleShuffleManager) {
throw new UnsupportedOperationException("temporarily uniffle not support ch ")
} else {
new CHColumnarBatchSerializer(readBatchNumRows, numOutputRows, dataSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class CHValidatorApi extends ValidatorApi with AdaptiveSparkPlanHelper with Logg
}
case rangePartitoning: RangePartitioning =>
if (
GlutenConfig.getConf.enableColumnarSort &&
GlutenConfig.get.enableColumnarSort &&
RangePartitionerBoundsGenerator.supportedOrderings(rangePartitoning, child)
) {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ class CommonSubexpressionEliminateRule(spark: SparkSession) extends Rule[Logical
override def apply(plan: LogicalPlan): LogicalPlan = {
val newPlan =
if (
plan.resolved && GlutenConfig.getConf.enableGluten
&& GlutenConfig.getConf.enableCommonSubexpressionEliminate && !plan.fastEquals(lastPlan)
plan.resolved && GlutenConfig.get.enableGluten
&& GlutenConfig.get.enableCommonSubexpressionEliminate && !plan.fastEquals(lastPlan)
) {
lastPlan = plan
visitPlan(plan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.AGGREGATE_EXPRESSION
*/
object CountDistinctWithoutExpand extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
if (
GlutenConfig.getConf.enableGluten && GlutenConfig.getConf.enableCountDistinctWithoutExpand
) {
if (GlutenConfig.get.enableGluten && GlutenConfig.get.enableCountDistinctWithoutExpand) {
plan.transformAllExpressionsWithPruning(_.containsPattern(AGGREGATE_EXPRESSION)) {
case ae: AggregateExpression
if ae.isDistinct && ae.aggregateFunction.isInstanceOf[Count] &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ object ExtendedGeneratorNestedColumnAliasing {
def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match {
case pj @ Project(projectList, f @ Filter(condition, g: Generate))
if canPruneGenerator(g.generator) &&
GlutenConfig.getConf.enableExtendedColumnPruning &&
GlutenConfig.get.enableExtendedColumnPruning &&
(SQLConf.get.nestedPruningOnExpressions || SQLConf.get.nestedSchemaPruningEnabled) =>
val attrToExtractValues =
getAttributeToExtractValues(projectList ++ g.generator.children :+ condition, Seq.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import scala.util.control.Breaks.{break, breakable}
// queryStagePrepRules.
case class FallbackBroadcastHashJoinPrepQueryStage(session: SparkSession) extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = {
val glutenConf: GlutenConfig = GlutenConfig.getConf
val glutenConf: GlutenConfig = GlutenConfig.get
plan.foreach {
case bhj: BroadcastHashJoinExec =>
val buildSidePlan = bhj.buildSide match {
Expand Down Expand Up @@ -106,8 +106,8 @@ case class FallbackBroadcastHashJoinPrepQueryStage(session: SparkSession) extend
case Some(exchange @ BroadcastExchangeExec(mode, child)) =>
val isTransformable =
if (
!GlutenConfig.getConf.enableColumnarBroadcastExchange ||
!GlutenConfig.getConf.enableColumnarBroadcastJoin
!GlutenConfig.get.enableColumnarBroadcastExchange ||
!GlutenConfig.get.enableColumnarBroadcastJoin
) {
ValidationResult.failed(
"columnar broadcast exchange is disabled or " +
Expand Down Expand Up @@ -146,12 +146,12 @@ case class FallbackBroadcastHashJoinPrepQueryStage(session: SparkSession) extend
case class FallbackBroadcastHashJoin(session: SparkSession) extends Rule[SparkPlan] {

private val enableColumnarBroadcastJoin: Boolean =
GlutenConfig.getConf.enableColumnarBroadcastJoin &&
GlutenConfig.getConf.enableColumnarBroadcastExchange
GlutenConfig.get.enableColumnarBroadcastJoin &&
GlutenConfig.get.enableColumnarBroadcastExchange

private val enableColumnarBroadcastNestedLoopJoin: Boolean =
GlutenConfig.getConf.broadcastNestedLoopJoinTransformerTransformerEnabled &&
GlutenConfig.getConf.enableColumnarBroadcastExchange
GlutenConfig.get.broadcastNestedLoopJoinTransformerTransformerEnabled &&
GlutenConfig.get.enableColumnarBroadcastExchange

override def apply(plan: SparkPlan): SparkPlan = {
plan.foreachUp {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ class RewriteDateTimestampComparisonRule(spark: SparkSession)
override def apply(plan: LogicalPlan): LogicalPlan = {
if (
plan.resolved &&
GlutenConfig.getConf.enableGluten &&
GlutenConfig.getConf.enableRewriteDateTimestampComparison
GlutenConfig.get.enableGluten &&
GlutenConfig.get.enableRewriteDateTimestampComparison
) {
visitPlan(plan)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ class RewriteToDateExpresstionRule(spark: SparkSession) extends Rule[LogicalPlan
override def apply(plan: LogicalPlan): LogicalPlan = {
if (
plan.resolved &&
GlutenConfig.getConf.enableGluten &&
GlutenConfig.getConf.enableCHRewriteDateConversion
GlutenConfig.get.enableGluten &&
GlutenConfig.get.enableCHRewriteDateConversion
) {
visitPlan(plan)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ object WriteFilesWithBucketValue extends Rule[SparkPlan] {

override def apply(plan: SparkPlan): SparkPlan = {
if (
GlutenConfig.getConf.enableGluten
&& GlutenConfig.getConf.enableNativeWriter.getOrElse(false)
GlutenConfig.get.enableGluten
&& GlutenConfig.get.enableNativeWriter.getOrElse(false)
) {
plan.transformDown {
case writeFiles: WriteFilesExec if writeFiles.bucketSpec.isDefined =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ private class CHColumnarBatchSerializerInstance(
GlutenShuffleUtils.getCompressionLevel(
conf,
compressionCodec,
GlutenConfig.getConf.columnarShuffleCodecBackend.orNull)
GlutenConfig.get.columnarShuffleCodecBackend.orNull)

private val useColumnarShuffle: Boolean = GlutenConfig.getConf.isUseColumnarShuffleManager
private val useColumnarShuffle: Boolean = GlutenConfig.get.isUseColumnarShuffleManager

override def deserializeStream(in: InputStream): DeserializationStream = {
// Don't use GlutenConfig in this method. It will execute in non task Thread.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,17 @@ class CHColumnarShuffleWriter[K, V](
.map(_.getAbsolutePath)
.mkString(",")
private val subDirsPerLocalDir = blockManager.diskBlockManager.subDirsPerLocalDir
private val splitSize = GlutenConfig.getConf.maxBatchSize
private val splitSize = GlutenConfig.get.maxBatchSize
private val compressionCodec = GlutenShuffleUtils.getCompressionCodec(conf)
private val capitalizedCompressionCodec = compressionCodec.toUpperCase(Locale.ROOT)
private val compressionLevel =
GlutenShuffleUtils.getCompressionLevel(
conf,
compressionCodec,
GlutenConfig.getConf.columnarShuffleCodecBackend.orNull)
private val maxSortBufferSize = GlutenConfig.getConf.chColumnarMaxSortBufferSize
private val forceMemorySortShuffle = GlutenConfig.getConf.chColumnarForceMemorySortShuffle
private val spillThreshold = GlutenConfig.getConf.chColumnarShuffleSpillThreshold
GlutenConfig.get.columnarShuffleCodecBackend.orNull)
private val maxSortBufferSize = GlutenConfig.get.chColumnarMaxSortBufferSize
private val forceMemorySortShuffle = GlutenConfig.get.chColumnarForceMemorySortShuffle
private val spillThreshold = GlutenConfig.get.chColumnarShuffleSpillThreshold
private val jniWrapper = new CHShuffleSplitterJniWrapper
// Are we in the process of stopping? Because map tasks can call stop() with success = true
// and then call stop() with success = false if they get an exception, we want to make sure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ case class CHAggregateFunctionRewriteRule(spark: SparkSession) extends Rule[Logi
case a: Aggregate =>
a.transformExpressions {
case avgExpr @ AggregateExpression(avg: Average, _, _, _, _)
if GlutenConfig.getConf.enableCastAvgAggregateFunction &&
GlutenConfig.getConf.enableColumnarHashAgg &&
if GlutenConfig.get.enableCastAvgAggregateFunction &&
GlutenConfig.get.enableColumnarHashAgg &&
!avgExpr.isDistinct && isDataTypeNeedConvert(avg.child.dataType) =>
AggregateExpression(
avg.copy(child = Cast(avg.child, DoubleType)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ class CHParquetWriterInjects extends CHFormatWriterInjects {
sparkOptions.put(SQLConf.PARQUET_COMPRESSION.key, compressionCodec)
val blockSize = options.getOrElse(
GlutenConfig.PARQUET_BLOCK_SIZE,
GlutenConfig.getConf.columnarParquetWriteBlockSize.toString)
GlutenConfig.get.columnarParquetWriteBlockSize.toString)
sparkOptions.put(GlutenConfig.PARQUET_BLOCK_SIZE, blockSize)
val blockRows = options.getOrElse(
GlutenConfig.PARQUET_BLOCK_ROWS,
GlutenConfig.getConf.columnarParquetWriteBlockRows.toString)
GlutenConfig.get.columnarParquetWriteBlockRows.toString)
sparkOptions.put(GlutenConfig.PARQUET_BLOCK_ROWS, blockRows)
sparkOptions
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ object CHExecUtil extends Logging {

private def buildPartitioningOptions(nativePartitioning: NativePartitioning): IteratorOptions = {
val options = new IteratorOptions
options.setBufferSize(GlutenConfig.getConf.maxBatchSize)
options.setBufferSize(GlutenConfig.get.maxBatchSize)
options.setName(nativePartitioning.getShortName)
options.setPartitionNum(nativePartitioning.getNumPartitions)
options.setExpr(new String(nativePartitioning.getExprList))
Expand Down Expand Up @@ -345,8 +345,8 @@ object CHExecUtil extends Logging {

val rddWithPartitionKey: RDD[Product2[Int, ColumnarBatch]] =
if (
GlutenConfig.getConf.isUseColumnarShuffleManager
|| GlutenConfig.getConf.isUseCelebornShuffleManager
GlutenConfig.get.isUseColumnarShuffleManager
|| GlutenConfig.get.isUseCelebornShuffleManager
) {
newPartitioning match {
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,12 @@ private class CelebornColumnarBatchSerializerInstance(
null // uncompressed
}
val compressionCodecBackend =
GlutenConfig.getConf.columnarShuffleCodecBackend.orNull
val shuffleWriterType = GlutenConfig.getConf.celebornShuffleWriterType
GlutenConfig.get.columnarShuffleCodecBackend.orNull
val shuffleWriterType = GlutenConfig.get.celebornShuffleWriterType
.replace(GLUTEN_SORT_SHUFFLE_WRITER, GLUTEN_RSS_SORT_SHUFFLE_WRITER)
val jniWrapper = ShuffleReaderJniWrapper.create(runtime)
val batchSize = GlutenConfig.getConf.maxBatchSize
val bufferSize = GlutenConfig.getConf.columnarShuffleReaderBufferSize
val batchSize = GlutenConfig.get.maxBatchSize
val bufferSize = GlutenConfig.get.columnarShuffleReaderBufferSize
val handle = jniWrapper
.make(
cSchema.memoryAddress(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
compressionLevel,
compressionBufferSize,
bufferCompressThreshold,
GlutenConfig.getConf.columnarShuffleCompressionMode,
GlutenConfig.get.columnarShuffleCompressionMode,
conf.get(SHUFFLE_SORT_INIT_BUFFER_SIZE).toInt,
conf.get(SHUFFLE_SORT_USE_RADIXSORT),
clientPushBufferMaxSize,
Expand All @@ -138,7 +138,7 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
GlutenShuffleUtils.getStartPartitionId(dep.nativePartitioning, context.partitionId),
"celeborn",
shuffleWriterType,
GlutenConfig.getConf.columnarShuffleReallocThreshold
GlutenConfig.get.columnarShuffleReallocThreshold
)
runtime
.memoryManager()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ public class VeloxUniffleColumnarShuffleWriter<K, V> extends RssShuffleWriter<K,

private boolean stopping = false;
private final int compressThreshold =
GlutenConfig.getConf().columnarShuffleCompressionThreshold();
private final double reallocThreshold = GlutenConfig.getConf().columnarShuffleReallocThreshold();
GlutenConfig.get().columnarShuffleCompressionThreshold();
private final double reallocThreshold = GlutenConfig.get().columnarShuffleReallocThreshold();
private String compressionCodec;
private int compressionLevel;
private int compressionBufferSize;
Expand All @@ -74,7 +74,7 @@ public class VeloxUniffleColumnarShuffleWriter<K, V> extends RssShuffleWriter<K,
private final Runtime runtime =
Runtimes.contextInstance(BackendsApiManager.getBackendName(), "UniffleShuffleWriter");
private final ShuffleWriterJniWrapper jniWrapper = ShuffleWriterJniWrapper.create(runtime);
private final int nativeBufferSize = GlutenConfig.getConf().maxBatchSize();
private final int nativeBufferSize = GlutenConfig.get().maxBatchSize();
private final int bufferSize;
private final Boolean isSort;

Expand Down Expand Up @@ -127,7 +127,7 @@ public VeloxUniffleColumnarShuffleWriter(
GlutenShuffleUtils.getCompressionLevel(
sparkConf,
compressionCodec,
GlutenConfig.getConf().columnarShuffleCodecBackend().getOrElse(() -> null));
GlutenConfig.get().columnarShuffleCodecBackend().getOrElse(() -> null));
compressionBufferSize =
GlutenShuffleUtils.getSortEvictBufferSize(sparkConf, compressionCodec);
}
Expand Down Expand Up @@ -158,7 +158,7 @@ protected void writeImpl(Iterator<Product2<K, V>> records) {
compressionLevel,
compressionBufferSize,
compressThreshold,
GlutenConfig.getConf().columnarShuffleCompressionMode(),
GlutenConfig.get().columnarShuffleCompressionMode(),
(int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()),
(boolean) sparkConf.get(package$.MODULE$.SHUFFLE_SORT_USE_RADIXSORT()),
bufferSize,
Expand Down
Loading

0 comments on commit b956a72

Please sign in to comment.