Skip to content

Commit

Permalink
Move listing files from HadoopFsUtils to java.io.File
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Dec 2, 2024
1 parent 0a611fc commit 214af10
Showing 1 changed file with 13 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
*/
package org.apache.spark.storage

import java.io.{DataOutputStream, File, FileOutputStream, InputStream, IOException}
import java.io.{DataOutputStream, File, FileFilter, FileOutputStream, InputStream, IOException}
import java.nio.file.Files

import scala.concurrent.duration._
import scala.util.Random

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, LocalFileSystem, Path, PathFilter, PositionedReadable, Seekable}
import org.apache.hadoop.fs.{FSDataInputStream, LocalFileSystem, Path, PositionedReadable, Seekable}
import org.mockito.{ArgumentMatchers => mc}
import org.mockito.Mockito.{mock, never, verify, when}
import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
Expand All @@ -39,7 +39,6 @@ import org.apache.spark.scheduler.ExecutorDecommissionInfo
import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo}
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
import org.apache.spark.util.HadoopFSUtils
import org.apache.spark.util.Utils.tryWithResource

class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
Expand Down Expand Up @@ -340,12 +339,18 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {

// Check number of subdirectories
val path = conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).get
val noopFilter = new PathFilter {
override def accept(path: Path): Boolean = true
val dirsOnly = new FileFilter {
override def accept(file: File): Boolean = file.isDirectory
}
val subDirs = HadoopFSUtils.listFiles(new Path(path), sc.hadoopConfiguration, noopFilter)
.flatMap(_._2.map(_.getPath.getParent)).toSet.toList
assert(subDirs.length == Math.max(1, Math.min(subPaths, 20)), subDirs.mkString(", "))
// first level is app dir
val appDirs = new File(path).listFiles(dirsOnly)
assert(appDirs.length == 1)
// second level is shuffle id
val shuffleDirs = appDirs(0).listFiles(dirsOnly)
assert(shuffleDirs.length == 1)
// third level is controlled number of hash / bucket dirs
val subDirs = shuffleDirs(0).listFiles(dirsOnly)
assert(subDirs.length == Math.min(subPaths, 20), subDirs.mkString(", "))
}
}
}
Expand Down

0 comments on commit 214af10

Please sign in to comment.