diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 1a18b5089af90..9d18bcc2d1308 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -521,6 +521,24 @@ package object config { .checkValue(_ >= 0, "The number of subdirectories must be 0 or larger.") .createWithDefault(Int.MaxValue) + private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_DELAY = + ConfigBuilder("spark.storage.decommission.fallbackStorage.replicationDelay") + .doc("The maximum expected delay for files written by one executor to become " + + "available to other executors.") + .version("4.0.0") + .timeConf(TimeUnit.SECONDS) + .checkValue(_ > 0, "Value must be positive.") + .createOptional + + private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_WAIT = + ConfigBuilder("spark.storage.decommission.fallbackStorage.replicationWait") + .doc("When an executor cannot find a file in the fallback storage it waits " + + "this amount of time before attempting to open the file again, " + + f"while not exceeding ${STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_DELAY.key}.") + .version("4.0.0") + .timeConf(TimeUnit.SECONDS) + .checkValue(_ > 0, "Value must be positive.") + .createWithDefaultString("1s") private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP = ConfigBuilder("spark.storage.decommission.fallbackStorage.cleanUp") diff --git a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala index 6939bf836602c..9847c27ed2d50 100644 --- a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala +++ b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala @@ -17,26 +17,27 @@ package org.apache.spark.storage -import java.io.DataInputStream +import java.io.{DataInputStream, FileNotFoundException} import java.nio.ByteBuffer +import scala.annotation.tailrec import scala.concurrent.Future import scala.reflect.ClassTag import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path} import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.{STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH, STORAGE_DECOMMISSION_FALLBACK_STORAGE_SUBPATHS} +import org.apache.spark.internal.config.{STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH, STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_DELAY, STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_WAIT, STORAGE_DECOMMISSION_FALLBACK_STORAGE_SUBPATHS} import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} import org.apache.spark.network.util.JavaUtils import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout} import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo} import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID import org.apache.spark.storage.FallbackStorage.getPath -import org.apache.spark.util.Utils +import org.apache.spark.util.{Clock, SystemClock, Utils} /** * A fallback storage used by storage decommissioners. @@ -135,6 +136,7 @@ private[spark] object FallbackStorage extends Logging { val fallbackFileSystem = FileSystem.get(fallbackUri, hadoopConf) // The fallback directory for this app may not be created yet. if (fallbackFileSystem.exists(fallbackPath)) { + logInfo(s"Attempt to clean up: $fallbackUri") if (fallbackFileSystem.delete(fallbackPath, true)) { logInfo(s"Succeed to clean up: $fallbackUri") } else { @@ -169,6 +171,59 @@ private[spark] object FallbackStorage extends Logging { } } + /** + * Open the file, retry a FileNotFoundException for waitMs milliseconds, + * unless this would exceed the deadline. In the latter case, rethrow the exception. + */ + @tailrec + private def open(filesystem: FileSystem, + path: Path, + deadlineMs: Long, + waitMs: Long, + clock: Clock) : FSDataInputStream = { + try { + filesystem.open(path) + } catch { + case fnf: FileNotFoundException => + val waitTillMs = clock.getTimeMillis() + waitMs + if (waitTillMs <= deadlineMs) { + logInfo(f"File not found, waiting ${waitMs / 1000}s: $path") + clock.waitTillTime(waitTillMs) + open(filesystem, path, deadlineMs, waitMs, clock) + } else { + throw fnf + } + } + } + + /** + * Open the file and retry FileNotFoundExceptions according to + * STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_DELAY and + * STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_WAIT + */ + // Visible for testing + private[spark] def open(conf: SparkConf, + filesystem: FileSystem, + path: Path, + clock: Clock = new SystemClock()): FSDataInputStream = { + val replicationDelay = conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_DELAY) + if (replicationDelay.isDefined) { + val replicationDeadline = clock.getTimeMillis() + replicationDelay.get * 1000 + val replicationWait = conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_WAIT) + val replicationWaitMs = replicationWait * 1000 + try { + open(filesystem, path, replicationDeadline, replicationWaitMs, clock) + } catch { + case fnf: FileNotFoundException => + logInfo(f"File not found, exceeded expected replication delay " + + f"of ${replicationDelay.get}s: $path") + throw fnf + } + } else { + filesystem.open(path) + } + } + /** * Read a ManagedBuffer. */ @@ -193,7 +248,7 @@ private[spark] object FallbackStorage extends Logging { val indexFile = getPath(conf, appId, shuffleId, name) val start = startReduceId * 8L val end = endReduceId * 8L - Utils.tryWithResource(fallbackFileSystem.open(indexFile)) { inputStream => + Utils.tryWithResource(open(conf, fallbackFileSystem, indexFile)) { inputStream => Utils.tryWithResource(new DataInputStream(inputStream)) { index => index.skip(start) val offset = index.readLong() @@ -205,7 +260,7 @@ private[spark] object FallbackStorage extends Logging { logDebug(s"To byte array $size") val array = new Array[Byte](size.toInt) val startTimeNs = System.nanoTime() - Utils.tryWithResource(fallbackFileSystem.open(dataFile)) { f => + Utils.tryWithResource(open(conf, fallbackFileSystem, dataFile)) { f => f.seek(offset) f.readFully(array) logDebug(s"Took ${(System.nanoTime() - startTimeNs) / (1000 * 1000)}ms") diff --git a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala index cafb76def4692..4ca9a5f59b75b 100644 --- a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala @@ -16,20 +16,21 @@ */ package org.apache.spark.storage -import java.io.{DataOutputStream, File, FileFilter, FileOutputStream, InputStream, IOException} +import java.io.{DataOutputStream, File, FileFilter, FileNotFoundException, FileOutputStream, InputStream, IOException} import java.nio.file.Files import scala.concurrent.duration._ import scala.util.Random import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FSDataInputStream, LocalFileSystem, Path, PositionedReadable, Seekable} +import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, LocalFileSystem, Path, PositionedReadable, Seekable} import org.mockito.{ArgumentMatchers => mc} import org.mockito.Mockito.{mock, never, verify, when} import org.scalatest.concurrent.Eventually.{eventually, interval, timeout} import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TestUtils} import org.apache.spark.LocalSparkContext.withSpark +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.config._ import org.apache.spark.launcher.SparkLauncher.{EXECUTOR_MEMORY, SPARK_MASTER} import org.apache.spark.network.BlockTransferService @@ -38,11 +39,12 @@ import org.apache.spark.scheduler.ExecutorDecommissionInfo import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo} import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID +import org.apache.spark.util.Clock import org.apache.spark.util.Utils.tryWithResource class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext { - def getSparkConf(initialExecutor: Int = 1, minExecutor: Int = 1): SparkConf = { + def getSparkConf(initialExecutor: Int = 1, minExecutor: Int = 1): SparkConf = { new SparkConf(false) .setAppName(getClass.getName) .set(SPARK_MASTER, s"local-cluster[$initialExecutor,1,1024]") @@ -395,6 +397,41 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext { } } } + + Seq(0, 1000, 3000, 6000).foreach { replicationMs => + test(s"Consider replication delay - ${replicationMs}ms") { + val delay = 5000 // max allowed replication + val wait = 2000 // time between open file attempts + val conf = getSparkConf() + .set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_DELAY.key, s"${delay}ms") + .set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_WAIT.key, s"${wait}ms") + + val filesystem = FileSystem.get(SparkHadoopUtil.get.newConfiguration(conf)) + val path = new Path(conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).get, "file") + val startMs = 123000000L * 1000L // arbitrary system time + val clock = new DelayedActionClock(replicationMs, startMs)(filesystem.create(path).close()) + + if (replicationMs <= delay) { + // expect open to succeed + val in = FallbackStorage.open(conf, filesystem, path, clock) + assert(in != null) + + // how many waits are expected to observe replication + val expectedWaits = Math.ceil(replicationMs.toFloat / wait).toInt + assert(clock.timeMs == startMs + expectedWaits * wait) + assert(clock.waited == expectedWaits) + in.close() + } else { + // expect open to fail + assertThrows[FileNotFoundException](FallbackStorage.open(conf, filesystem, path, clock)) + + // how many waits are expected to observe delay + val expectedWaits = delay / wait + assert(clock.timeMs == startMs + expectedWaits * wait) + assert(clock.waited == expectedWaits) + } + } + } } class ReadPartialInputStream(val in: FSDataInputStream) extends InputStream @@ -440,3 +477,31 @@ class ReadPartialFileSystem extends LocalFileSystem { new FSDataInputStream(new ReadPartialInputStream(stream)) } } + +class DelayedActionClock(delayMs: Long, startTimeMs: Long)(action: => Unit) + extends Clock { + var timeMs: Long = startTimeMs + var waited: Int = 0 + var triggered: Boolean = false + + if (delayMs == 0) trigger() + + private def trigger(): Unit = { + if (!triggered) { + triggered = true + action + } + } + + override def getTimeMillis(): Long = timeMs + override def nanoTime(): Long = timeMs * 1000000 + override def waitTillTime(targetTime: Long): Long = { + waited += 1 + if (targetTime >= startTimeMs + delayMs) { + timeMs = startTimeMs + delayMs + trigger() + } + timeMs = targetTime + targetTime + } +}