-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #31 from akash-akya/dev
Handle abrupt VM termination
- Loading branch information
Showing
5 changed files
with
108 additions
and
8 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,12 +3,15 @@ package main | |
import ( | ||
"os" | ||
"os/exec" | ||
"os/signal" | ||
"syscall" | ||
"time" | ||
) | ||
|
||
func execute(workdir string, args []string) error { | ||
done := make(chan struct{}) | ||
|
||
sigs := make(chan os.Signal, 1) | ||
input := make(chan Packet, 1) | ||
outputDemand := make(chan Packet) | ||
inputDemand := make(chan Packet) | ||
|
@@ -20,8 +23,17 @@ func execute(workdir string, args []string) error { | |
logger.Printf("Command path: %v\n", proc.Path) | ||
|
||
output := startCommandPipeline(proc, input, inputDemand, outputDemand) | ||
|
||
// Capture common signals. | ||
// Setting notify for SIGPIPE is important to capture and without that | ||
// we won't be able to handle abrupt beam vm terminations | ||
// Also, SIGPIPE behaviour in golang is bit complex, | ||
// see: https://pkg.go.dev/os/[email protected]#hdr-SIGPIPE | ||
signal.Notify(sigs, os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGPIPE) | ||
|
||
// go handleSignals(input, outputDemand, done) | ||
go dispatchStdin(input, outputDemand, done) | ||
go collectStdout(proc.Process.Pid, output, inputDemand, done) | ||
go collectStdout(proc.Process.Pid, output, inputDemand, sigs, done) | ||
|
||
// wait for pipline to exit | ||
<-done | ||
|
@@ -33,8 +45,8 @@ func execute(workdir string, args []string) error { | |
logger.Printf("Command exited with error: %v\n", e) | ||
os.Exit(3) | ||
} | ||
// TODO: return Stderr and exit stauts to beam process | ||
logger.Printf("Command exited: %#v\n", err) | ||
// TODO: return Stderr and exit status to beam process | ||
logger.Printf("Command exited\n") | ||
return err | ||
} | ||
|
||
|
@@ -57,15 +69,20 @@ func dispatchStdin(input chan<- Packet, outputDemand chan<- Packet, done chan st | |
stdinReader(dispatch, done) | ||
} | ||
|
||
func collectStdout(pid int, output <-chan Packet, inputDemand <-chan Packet, done chan struct{}) { | ||
func collectStdout(pid int, output <-chan Packet, inputDemand <-chan Packet, sigs <-chan os.Signal, done chan struct{}) { | ||
defer func() { | ||
close(done) | ||
}() | ||
|
||
merged := func() (Packet, bool) { | ||
select { | ||
case sig := <-sigs: | ||
logger.Printf("Received OS Signal: ", sig) | ||
return Packet{}, false | ||
|
||
case v, ok := <-inputDemand: | ||
return v, ok | ||
|
||
case v, ok := <-output: | ||
return v, ok | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
defmodule ExCmdExitTest do | ||
use ExUnit.Case, async: false | ||
|
||
# currently running `elixir` command is not working in Windows | ||
@tag os: :unix | ||
test "if it kills external command on abnormal vm exit" do | ||
ex_cmd_expr = ~S{ExCmd.stream!(["cat"]) |> Stream.run()} | ||
|
||
port = | ||
Port.open( | ||
{:spawn, "elixir -S mix run -e '#{ex_cmd_expr}'"}, | ||
[:stderr_to_stdout, :use_stdio, :exit_status, :binary, :hide] | ||
) | ||
|
||
port_info = Port.info(port) | ||
os_pid = port_info[:os_pid] | ||
|
||
on_exit(fn -> | ||
os_process_alive?(os_pid) && os_process_kill(os_pid) | ||
end) | ||
|
||
assert os_process_alive?(os_pid) | ||
|
||
[_, cmd_pid] = capture_output!(port, ~r/os pid: ([0-9]+)/) | ||
|
||
cmd_pid = String.to_integer(cmd_pid) | ||
assert os_process_alive?(cmd_pid) | ||
|
||
assert {:ok, _msg} = os_process_kill(os_pid) | ||
|
||
# wait for the cleanup | ||
:timer.sleep(5000) | ||
|
||
refute os_process_alive?(os_pid) | ||
refute os_process_alive?(cmd_pid) | ||
end | ||
|
||
defp os_process_alive?(pid) do | ||
if windows?() do | ||
case cmd(["tasklist", "/fi", "pid eq #{pid}"]) do | ||
{"INFO: No tasks are running which match the specified criteria.\r\n", 0} -> false | ||
{_, 0} -> true | ||
end | ||
else | ||
match?({_, 0}, cmd(["ps", "-p", to_string(pid)])) | ||
end | ||
end | ||
|
||
defp os_process_kill(pid) do | ||
if windows?() do | ||
cmd(["taskkill", "/pid", "#{pid}", "/f"]) | ||
else | ||
cmd(["kill", "-SIGKILL", "#{pid}"]) | ||
end | ||
|> case do | ||
{msg, 0} -> {:ok, msg} | ||
{msg, status} -> {:error, status, msg} | ||
end | ||
end | ||
|
||
defp windows?, do: :os.type() == {:win32, :nt} | ||
|
||
def cmd([cmd | args]), do: System.cmd(cmd, args, stderr_to_stdout: true) | ||
|
||
defp capture_output!(port, regexp, acc \\ "") do | ||
receive do | ||
{^port, {:data, bin}} -> | ||
output = acc <> bin | ||
|
||
if match = Regex.run(regexp, output) do | ||
match | ||
else | ||
capture_output!(port, regexp, output) | ||
end | ||
after | ||
5000 -> | ||
raise "timeout while waiting for the iex prompt, acc: #{acc}" | ||
end | ||
end | ||
end |