Skip to content

Commit

Permalink
Add flag to register shutdown hooks for os.call and os.spawn APIs…
Browse files Browse the repository at this point in the history
…, overhaul `destroy` APIs (#324)

* Backports the functionality from Mill to allow us to register shutdown
hooks such that when the parent process terminates the subprocesses are
shut down as well. This allows the same logic to be used consistently
across all `.call` and `.spawn` invocations

* Consolidate `SubProcess#destroy` and `SubProcess#destroyForcibly` into
a single `SubProcess#destroy` method which takes some default
parameters, allowing the user to choose `async = true` or configure the
`shutdownGracePeriod: Long`.

* The default behavior of `SubProcess#destroy` has changed to block on
the subprocess actually exiting, which I think is more intuitive. The
old behavior is available under `destroy(async = true)`

Best reviewed with `Hide whitespace`. Added some simple unit tests to
assert the new functionality, existing unit tests should assert on
existing workflows
  • Loading branch information
lihaoyi authored Oct 21, 2024
1 parent 9e7efc3 commit 383964d
Show file tree
Hide file tree
Showing 8 changed files with 587 additions and 284 deletions.
8 changes: 6 additions & 2 deletions Readme.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1725,7 +1725,9 @@ os.call(cmd: os.Shellable,
mergeErrIntoOut: Boolean = false,
timeout: Long = Long.MaxValue,
check: Boolean = true,
propagateEnv: Boolean = true): os.CommandResult
propagateEnv: Boolean = true,
shutdownGracePeriod: Long = 100,
destroyOnExit: Boolean = true): os.CommandResult
----

_Also callable via `os.proc(cmd).call(...)`_
Expand Down Expand Up @@ -1853,7 +1855,9 @@ os.spawn(cmd: os.Shellable,
stdout: os.ProcessOutput = os.Pipe,
stderr: os.ProcessOutput = os.Pipe,
mergeErrIntoOut: Boolean = false,
propagateEnv: Boolean = true): os.SubProcess
propagateEnv: Boolean = true,
shutdownGracePeriod: Long = 100,
destroyOnExit: Boolean = true): os.SubProcess
----

_Also callable via `os.proc(cmd).spawn(...)`_
Expand Down
10 changes: 9 additions & 1 deletion build.sc
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,21 @@ object os extends Module {
def forkEnv = super.forkEnv() ++ Map(
"TEST_JAR_WRITER_ASSEMBLY" -> testJarWriter.assembly().path.toString,
"TEST_JAR_READER_ASSEMBLY" -> testJarReader.assembly().path.toString,
"TEST_JAR_EXIT_ASSEMBLY" -> testJarExit.assembly().path.toString
"TEST_JAR_EXIT_ASSEMBLY" -> testJarExit.assembly().path.toString,
"TEST_SPAWN_EXIT_HOOK_ASSEMBLY" -> testSpawnExitHook.assembly().path.toString,
"TEST_SPAWN_EXIT_HOOK_ASSEMBLY2" -> testSpawnExitHook2.assembly().path.toString
)

object testJarWriter extends JavaModule
object testJarReader extends JavaModule
object testJarExit extends JavaModule
object testSpawnExitHook extends ScalaModule{
def scalaVersion = OsJvmModule.this.scalaVersion()
def moduleDeps = Seq(OsJvmModule.this)
}
object testSpawnExitHook2 extends JavaModule
}

object nohometest extends ScalaTests with OsLibTestModule
}

Expand Down
168 changes: 151 additions & 17 deletions os/src/ProcessOps.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
package os

import java.util.concurrent.{ArrayBlockingQueue, Semaphore, TimeUnit}
import collection.JavaConverters._
import scala.annotation.tailrec
import java.lang.ProcessBuilder.Redirect
import os.SubProcess.InputStream
import java.io.IOException
import java.util.concurrent.LinkedBlockingQueue
import ProcessOps._
import scala.util.Try

object call {

Expand All @@ -28,7 +25,8 @@ object call {
timeout: Long = -1,
check: Boolean = true,
propagateEnv: Boolean = true,
timeoutGracePeriod: Long = 100
shutdownGracePeriod: Long = 100,
destroyOnExit: Boolean = true
): CommandResult = {
os.proc(cmd).call(
cwd = cwd,
Expand All @@ -40,7 +38,40 @@ object call {
timeout = timeout,
check = check,
propagateEnv = propagateEnv,
timeoutGracePeriod = timeoutGracePeriod
shutdownGracePeriod = shutdownGracePeriod,
destroyOnExit = destroyOnExit
)
}

// Bincompat Forwarder
def apply(
cmd: Shellable,
env: Map[String, String],
// Make sure `cwd` only comes after `env`, so `os.call("foo", path)` is a compile error
// since the correct syntax is `os.call(("foo", path))`
cwd: Path,
stdin: ProcessInput,
stdout: ProcessOutput,
stderr: ProcessOutput,
mergeErrIntoOut: Boolean,
timeout: Long,
check: Boolean,
propagateEnv: Boolean,
timeoutGracePeriod: Long
): CommandResult = {
call(
cmd = cmd,
cwd = cwd,
env = env,
stdin = stdin,
stdout = stdout,
stderr = stderr,
mergeErrIntoOut = mergeErrIntoOut,
timeout = timeout,
check = check,
propagateEnv = propagateEnv,
shutdownGracePeriod = timeoutGracePeriod,
destroyOnExit = false
)
}
}
Expand All @@ -59,7 +90,9 @@ object spawn {
stdout: ProcessOutput = Pipe,
stderr: ProcessOutput = os.Inherit,
mergeErrIntoOut: Boolean = false,
propagateEnv: Boolean = true
propagateEnv: Boolean = true,
shutdownGracePeriod: Long = 100,
destroyOnExit: Boolean = true
): SubProcess = {
os.proc(cmd).spawn(
cwd = cwd,
Expand All @@ -68,7 +101,36 @@ object spawn {
stdout = stdout,
stderr = stderr,
mergeErrIntoOut = mergeErrIntoOut,
propagateEnv = propagateEnv
propagateEnv = propagateEnv,
shutdownGracePeriod = shutdownGracePeriod,
destroyOnExit = destroyOnExit
)
}

// Bincompat Forwarder
def apply(
cmd: Shellable,
// Make sure `cwd` only comes after `env`, so `os.spawn("foo", path)` is a compile error
// since the correct syntax is `os.spawn(("foo", path))`
env: Map[String, String],
cwd: Path,
stdin: ProcessInput,
stdout: ProcessOutput,
stderr: ProcessOutput,
mergeErrIntoOut: Boolean,
propagateEnv: Boolean
): SubProcess = {
spawn(
cmd = cmd,
cwd = cwd,
env = env,
stdin = stdin,
stdout = stdout,
stderr = stderr,
mergeErrIntoOut = mergeErrIntoOut,
propagateEnv = propagateEnv,
shutdownGracePeriod = 100,
destroyOnExit = false
)
}
}
Expand Down Expand Up @@ -119,7 +181,7 @@ case class proc(command: Shellable*) {
* fails with a non-zero exit code
* @param propagateEnv disable this to avoid passing in this parent process's
* environment variables to the subprocess
* @param timeoutGracePeriod if the timeout is enabled, how long in milliseconds for the
* @param shutdownGracePeriod if the timeout is enabled, how long in milliseconds for the
* subprocess to gracefully terminate before attempting to
* forcibly kill it
* (-1 for no kill, 0 for always kill immediately)
Expand All @@ -138,7 +200,8 @@ case class proc(command: Shellable*) {
check: Boolean = true,
propagateEnv: Boolean = true,
// this cannot be next to `timeout` as this will introduce a bin-compat break (default arguments are numbered in the bytecode)
timeoutGracePeriod: Long = 100
shutdownGracePeriod: Long = 100,
destroyOnExit: Boolean = true
): CommandResult = {

val chunks = new java.util.concurrent.ConcurrentLinkedQueue[Either[geny.Bytes, geny.Bytes]]
Expand All @@ -159,7 +222,7 @@ case class proc(command: Shellable*) {
propagateEnv
)

sub.join(timeout, timeoutGracePeriod)
sub.join(timeout, shutdownGracePeriod)

val chunksSeq = chunks.iterator.asScala.toIndexedSeq
val res = CommandResult(commandChunks, sub.exitCode(), chunksSeq)
Expand Down Expand Up @@ -188,7 +251,33 @@ case class proc(command: Shellable*) {
timeout,
check,
propagateEnv,
timeoutGracePeriod = 100
shutdownGracePeriod = 100
)

// Bincompat Forwarder
private[os] def call(
cwd: Path,
env: Map[String, String],
stdin: ProcessInput,
stdout: ProcessOutput,
stderr: ProcessOutput,
mergeErrIntoOut: Boolean,
timeout: Long,
check: Boolean,
propagateEnv: Boolean,
timeoutGracePeriod: Long
): CommandResult = call(
cwd,
env,
stdin,
stdout,
stderr,
mergeErrIntoOut,
timeout,
check,
propagateEnv,
timeoutGracePeriod,
destroyOnExit = false
)

/**
Expand All @@ -208,7 +297,9 @@ case class proc(command: Shellable*) {
stdout: ProcessOutput = Pipe,
stderr: ProcessOutput = os.Inherit,
mergeErrIntoOut: Boolean = false,
propagateEnv: Boolean = true
propagateEnv: Boolean = true,
shutdownGracePeriod: Long = 100,
destroyOnExit: Boolean = true
): SubProcess = {

val cmdChunks = commandChunks
Expand All @@ -230,19 +321,62 @@ case class proc(command: Shellable*) {
propagateEnv
)

lazy val shutdownHookThread =
if (!destroyOnExit) None
else Some(new Thread("subprocess-shutdown-hook") {
override def run(): Unit = proc.destroy(shutdownGracePeriod)
})

lazy val shutdownHookMonitorThread = shutdownHookThread.map(t =>
new Thread("subprocess-shutdown-hook-monitor") {
override def run(): Unit = {
while (proc.wrapped.isAlive) Thread.sleep(1)
try Runtime.getRuntime().removeShutdownHook(t)
catch { case e: Throwable => /*do nothing*/ }
}
}
)

shutdownHookThread.foreach(Runtime.getRuntime().addShutdownHook)

lazy val proc: SubProcess = new SubProcess(
builder.start(),
resolvedStdin.processInput(proc.stdin).map(new Thread(_, commandStr + " stdin thread")),
resolvedStdout.processOutput(proc.stdout).map(new Thread(_, commandStr + " stdout thread")),
resolvedStderr.processOutput(proc.stderr).map(new Thread(_, commandStr + " stderr thread"))
resolvedStderr.processOutput(proc.stderr).map(new Thread(_, commandStr + " stderr thread")),
shutdownGracePeriod = shutdownGracePeriod,
shutdownHookMonitorThread = shutdownHookMonitorThread
)

shutdownHookMonitorThread.foreach(_.start())

proc.inputPumperThread.foreach(_.start())
proc.outputPumperThread.foreach(_.start())
proc.errorPumperThread.foreach(_.start())
proc
}

// Bincompat Forwarder
def spawn(
cwd: Path,
env: Map[String, String],
stdin: ProcessInput,
stdout: ProcessOutput,
stderr: ProcessOutput,
mergeErrIntoOut: Boolean,
propagateEnv: Boolean
): SubProcess = spawn(
cwd = cwd,
env = env,
stdin = stdin,
stdout = stdout,
stderr = stderr,
mergeErrIntoOut = mergeErrIntoOut,
propagateEnv = propagateEnv,
shutdownGracePeriod = 100,
destroyOnExit = false
)

/**
* Pipes the output of this process into the input of the [[next]] process. Returns a
* [[ProcGroup]] containing both processes, which you can then either execute or
Expand Down Expand Up @@ -295,7 +429,7 @@ case class ProcGroup private[os] (commands: Seq[proc]) {
* will be caught and handled by killing the writing process. This behaviour
* is consistent with handlers of SIGPIPE signals in most programs
* supporting interruptable piping. Disabled by default on Windows.
* @param timeoutGracePeriod if the timeout is enabled, how long in milliseconds for the
* @param shutdownGracePeriod if the timeout is enabled, how long in milliseconds for the
* subprocess to gracefully terminate before attempting to
* forcibly kill it
* (-1 for no kill, 0 for always kill immediately)
Expand All @@ -316,7 +450,7 @@ case class ProcGroup private[os] (commands: Seq[proc]) {
pipefail: Boolean = true,
handleBrokenPipe: Boolean = !isWindows,
// this cannot be next to `timeout` as this will introduce a bin-compat break (default arguments are numbered in the bytecode)
timeoutGracePeriod: Long = 100
shutdownGracePeriod: Long = 100
): CommandResult = {
val chunks = new java.util.concurrent.ConcurrentLinkedQueue[Either[geny.Bytes, geny.Bytes]]

Expand All @@ -337,7 +471,7 @@ case class ProcGroup private[os] (commands: Seq[proc]) {
pipefail
)

sub.join(timeout, timeoutGracePeriod)
sub.join(timeout, shutdownGracePeriod)

val chunksSeq = chunks.iterator.asScala.toIndexedSeq
val res =
Expand Down Expand Up @@ -370,7 +504,7 @@ case class ProcGroup private[os] (commands: Seq[proc]) {
propagateEnv,
pipefail,
handleBrokenPipe,
timeoutGracePeriod = 100
shutdownGracePeriod = 100
)

/**
Expand Down
Loading

0 comments on commit 383964d

Please sign in to comment.