From 6ca7adc3664dd409110f327e2df1fb10d11903e4 Mon Sep 17 00:00:00 2001 From: akash-akya Date: Tue, 17 Sep 2024 23:17:31 +0530 Subject: [PATCH] Support redirecting stderr_to_stdout --- go_src/executor.go | 8 +-- go_src/main.go | 14 ++++- go_src/process.go | 33 +++-------- lib/ex_cmd/process.ex | 103 +---------------------------------- lib/ex_cmd/process/exec.ex | 28 +++++++++- lib/ex_cmd/process/proto.ex | 15 +++-- lib/ex_cmd/stream.ex | 9 +-- test/ex_cmd/process_test.exs | 54 +++++++----------- 8 files changed, 84 insertions(+), 180 deletions(-) diff --git a/go_src/executor.go b/go_src/executor.go index b9b9f23..9ddd12a 100644 --- a/go_src/executor.go +++ b/go_src/executor.go @@ -40,7 +40,7 @@ type InputDispatcher func(Packet) type OutPacket func() (Packet, bool) -func execute(workdir string, args []string) error { +func execute(workdir string, args []string, stderrConfig string) error { writerDone := make(chan struct{}) // must be buffered so that function can close without blocking stdinClose := make(chan struct{}, 1) @@ -59,7 +59,7 @@ func execute(workdir string, args []string) error { proc.Dir = workdir proc.Env = append(os.Environ(), readEnvFromStdin()...) - runPipeline(proc, writerDone, stdinClose, kill) + runPipeline(proc, writerDone, stdinClose, kill, stderrConfig) err := waitPipelineTermination(proc, sigs, stdinClose, writerDone, kill) if err == nil { @@ -75,12 +75,12 @@ func execute(workdir string, args []string) error { return err } -func runPipeline(proc *exec.Cmd, writerDone chan struct{}, stdinClose chan struct{}, kill chan<- bool){ +func runPipeline(proc *exec.Cmd, writerDone chan struct{}, stdinClose chan struct{}, kill chan<- bool, stderrConfig string){ cmdInput := make(chan []byte, 1) cmdOutputDemand := make(chan Packet) cmdInputDemand := make(chan Packet) - cmdOutput := startCommandPipeline(proc, cmdInput, cmdInputDemand, cmdOutputDemand) + cmdOutput := startCommandPipeline(proc, cmdInput, cmdInputDemand, cmdOutputDemand, stderrConfig) // go handleSignals(input, outputDemand, done) go stdinReader(cmdInput, cmdOutputDemand, writerDone, stdinClose, kill) diff --git a/go_src/main.go b/go_src/main.go index 560b1bf..1bf37b0 100644 --- a/go_src/main.go +++ b/go_src/main.go @@ -16,6 +16,7 @@ const ProtocolVersion = "1.0" const usage = "Usage: odu [options] -- [...]" var cdFlag = flag.String("cd", ".", "working directory for the spawned process") +var stderrFlag = flag.String("stderr", "console", "how to handle spawned process stderr stream") var logFlag = flag.String("log", "", "enable logging") var protocolVersionFlag = flag.String("protocol_version", "", "protocol version") var versionFlag = flag.Bool("v", false, "print version and exit") @@ -30,10 +31,19 @@ func main() { os.Exit(0) } + switch *stderrFlag { + case "console": + case "disable": + case "redirect_to_stdout": + default: + logger.Printf("invalid stderr flag") + os.Exit(3) + } + args := flag.Args() validateArgs(args) - err := execute(*cdFlag, args) + err := execute(*cdFlag, args, *stderrFlag) if err == nil { os.Exit(0) @@ -59,7 +69,7 @@ func validateArgs(args []string) { dieUsage(fmt.Sprintf("Invalid version specified: %v Supported version: %v", *protocolVersionFlag, ProtocolVersion)) } - logger.Printf("dir:%v, log:%v, protocol_version:%v, args:%v\n", *cdFlag, *logFlag, *protocolVersionFlag, args) + logger.Printf("dir:%v, log:%v, protocol_version:%v, stderr: %v, args:%v\n", *cdFlag, *logFlag, *protocolVersionFlag, *stderrFlag, args) } func notFifo(path string) bool { diff --git a/go_src/process.go b/go_src/process.go index d9761ac..7eb8f17 100644 --- a/go_src/process.go +++ b/go_src/process.go @@ -6,7 +6,7 @@ import ( "os/exec" ) -func startCommandPipeline(proc *exec.Cmd, input <-chan []byte, inputDemand chan<- Packet, outputDemand <-chan Packet) chan []byte { +func startCommandPipeline(proc *exec.Cmd, input <-chan []byte, inputDemand chan<- Packet, outputDemand <-chan Packet, stderrConfig string) chan []byte { logger.Printf("Command: %v\n", proc.String()) cmdInput, err := proc.StdinPipe() @@ -15,16 +15,20 @@ func startCommandPipeline(proc *exec.Cmd, input <-chan []byte, inputDemand chan< cmdOutput, err := proc.StdoutPipe() fatalIf(err) - cmdError, err := proc.StderrPipe() - fatalIf(err) + switch stderrConfig { + case "disable": + proc.Stderr = io.Discard + case "console": + proc.Stderr = os.Stderr + case "redirect_to_stdout": + proc.Stderr = proc.Stdout + } execErr := proc.Start() fatalIf(execErr) go writeToCommandStdin(cmdInput, input, inputDemand) - go printStderr(cmdError) - output := make(chan []byte) go readCommandStdout(cmdOutput, outputDemand, output) @@ -111,22 +115,3 @@ func readCommandStdout(cmdOutput io.ReadCloser, outputDemand <-chan Packet, outp } } } - -func printStderr(cmdError io.ReadCloser) { - var buf [BufferSize]byte - - defer func() { - cmdError.Close() - }() - - for { - bytesRead, readErr := cmdError.Read(buf[:]) - if bytesRead > 0 { - logger.Printf(string(buf[:bytesRead])) - } else if readErr == io.EOF || bytesRead == 0 { - return - } else { - fatal(readErr) - } - } -} diff --git a/lib/ex_cmd/process.ex b/lib/ex_cmd/process.ex index ec55a2d..c60924e 100644 --- a/lib/ex_cmd/process.ex +++ b/lib/ex_cmd/process.ex @@ -167,48 +167,6 @@ defmodule ExCmd.Process do > stream. - ### Using `consume` - - stderr data can be consumed separately using - `ExCmd.Process.read_stderr/2`. Special function - `ExCmd.Process.read_any/2` can be used to read from either stdout or - stderr whichever has the data available. See the examples for more - details. - - - > #### Unexpected Behaviors {: .warning} - > - > When set, the `stderr` output **MUST** be consumed to - > avoid blocking the external program when stderr buffer is full. - - Reading from stderr using `read_stderr` - - ``` - # write "Hello" to stdout and "World" to stderr - iex> script = Enum.join(["echo Hello", "echo World >&2"], "\n") - iex> {:ok, p} = Process.start_link(["sh", "-c", script], stderr: :consume) - iex> Process.read(p, 100) - {:ok, "Hello\n"} - iex> Process.read_stderr(p, 100) - {:ok, "World\n"} - iex> Process.await_exit(p) - {:ok, 0} - ``` - - Reading using `read_any` - - ``` - # write "Hello" to stdout and "World" to stderr - iex> script = Enum.join(["echo Hello", "echo World >&2"], "\n") - iex> {:ok, p} = Process.start_link(["sh", "-c", script], stderr: :consume) - iex> Process.read_any(p) - {:ok, {:stdout, "Hello\n"}} - iex> Process.read_any(p) - {:ok, {:stderr, "World\n"}} - iex> Process.await_exit(p) - {:ok, 0} - ``` - ### Process Termination When owner does (normally or abnormally) the ExCmd process always @@ -342,7 +300,7 @@ defmodule ExCmd.Process do @type caller :: GenServer.from() - @default_opts [env: [], stderr: :console] + @default_opts [env: [], stderr: :console, log: nil] @default_buffer_size 65_535 @doc false @@ -495,39 +453,6 @@ defmodule ExCmd.Process do GenServer.call(process.pid, {:read_stdout, max_size}, :infinity) end - @doc """ - Returns bytes from executed command's stderr with maximum size `max_size`. - Pipe must be enabled with `stderr: :consume` to read the data. - - Blocks if no bytes are written to stderr yet. And returns as soon as - bytes are available - - Note that `max_size` is the maximum size of the returned data. But - the returned data can be less than that depending on how the program - flush the data etc. - """ - @spec read_stderr(t, pos_integer()) :: {:ok, iodata} | :eof | {:error, any()} - def read_stderr(process, size \\ @default_buffer_size) when is_integer(size) and size > 0 do - GenServer.call(process.pid, {:read_stderr, size}, :infinity) - end - - @doc """ - Returns bytes from either stdout or stderr with maximum size - `max_size` whichever is available at that time. - - Blocks if no bytes are written to stdout or stderr yet. And returns - as soon as data is available. - - Note that `max_size` is the maximum size of the returned data. But - the returned data can be less than that depending on how the program - flush the data etc. - """ - @spec read_any(t, pos_integer()) :: - {:ok, {:stdout, iodata}} | {:ok, {:stderr, iodata}} | :eof | {:error, any()} - def read_any(process, size \\ @default_buffer_size) when is_integer(size) and size > 0 do - GenServer.call(process.pid, {:read_stdout_or_stderr, size}, :infinity) - end - @doc """ Changes the Pipe owner of the pipe to specified pid. @@ -710,30 +635,6 @@ defmodule ExCmd.Process do end end - def handle_call({:read_stderr, size}, from, state) do - IO.inspect({:read_stderr, size}) - - case Operations.read(state, {:read_stderr, from, size}) do - {:noreply, state} -> - {:noreply, state} - - ret -> - {:reply, ret, state} - end - end - - def handle_call({:read_stdout_or_stderr, size}, from, state) do - IO.inspect({:read_stdout_or_stderr, size}) - - case Operations.read_any(state, {:read_stdout_or_stderr, from, size}) do - {:noreply, state} -> - {:noreply, state} - - ret -> - {:reply, ret, state} - end - end - def handle_call({:write_stdin, binary}, from, state) do IO.inspect({:write_stdin, byte_size(binary)}) @@ -885,7 +786,7 @@ defmodule ExCmd.Process do Process.flag(:trap_exit, true) %{cmd_with_args: cmd_with_args, env: env} = state.args - {os_pid, port} = Proto.start(cmd_with_args, env, Map.take(state.args, [:stderr, :cd])) + {os_pid, port} = Proto.start(cmd_with_args, env, Map.take(state.args, [:log, :stderr, :cd])) stderr = if state.stderr == :consume do diff --git a/lib/ex_cmd/process/exec.ex b/lib/ex_cmd/process/exec.ex index e1fce91..f7c9de6 100644 --- a/lib/ex_cmd/process/exec.ex +++ b/lib/ex_cmd/process/exec.ex @@ -22,8 +22,9 @@ defmodule ExCmd.Process.Exec do :ok <- validate_opts_fields(opts), {:ok, cd} <- normalize_cd(opts[:cd]), {:ok, stderr} <- normalize_stderr(opts[:stderr]), + {:ok, log} <- normalize_log(opts[:log]), {:ok, env} <- normalize_env(opts[:env]) do - {:ok, %{cmd_with_args: [cmd | args], cd: cd, env: env, stderr: stderr}} + {:ok, %{cmd_with_args: [cmd | args], cd: cd, env: env, stderr: stderr, log: log}} end end @@ -98,7 +99,7 @@ defmodule ExCmd.Process.Exec do nil -> {:ok, :console} - stderr when stderr in [:console, :disable] -> + stderr when stderr in [:console, :disable, :redirect_to_stdout] -> {:ok, stderr} _ -> @@ -107,9 +108,30 @@ defmodule ExCmd.Process.Exec do end end + @spec normalize_log(log :: nil | :stdout | :stderr | String.t()) :: + {:ok, nil | String.t()} | {:error, String.t()} + defp normalize_log(log) do + case log do + nil -> + {:ok, nil} + + :stdout -> + {:ok, "|1"} + + :stderr -> + {:ok, "|2"} + + file when is_binary(file) -> + {:ok, file} + + _ -> + {:error, ":log must be an atom and one of nil, :stdout, :stderr, or path"} + end + end + @spec validate_opts_fields(keyword) :: :ok | {:error, String.t()} defp validate_opts_fields(opts) do - {_, additional_opts} = Keyword.split(opts, [:cd, :env, :stderr]) + {_, additional_opts} = Keyword.split(opts, [:cd, :env, :stderr, :log]) if Enum.empty?(additional_opts) do :ok diff --git a/lib/ex_cmd/process/proto.ex b/lib/ex_cmd/process/proto.ex index 5042247..3f65710 100644 --- a/lib/ex_cmd/process/proto.ex +++ b/lib/ex_cmd/process/proto.ex @@ -174,10 +174,17 @@ defmodule ExCmd.Process.Proto do raise ArgumentError, message: ":cd is not a valid path" end - params = ["-cd", cd, "-protocol_version", @odu_protocol_version] - - if opts[:stderr] == :console do - params ++ ["-log", "|2"] + params = [ + "-cd", + cd, + "-stderr", + to_string(opts[:stderr]), + "-protocol_version", + @odu_protocol_version + ] + + if opts[:log] do + params ++ ["-log", opts[:log]] else params end diff --git a/lib/ex_cmd/stream.ex b/lib/ex_cmd/stream.ex index bb3b5d3..a0faa2a 100644 --- a/lib/ex_cmd/stream.ex +++ b/lib/ex_cmd/stream.ex @@ -108,13 +108,12 @@ defmodule ExCmd.Stream do %{ process: process, stream_opts: %{ - stderr: stderr, stream_exit_status: stream_exit_status, max_chunk_size: max_chunk_size } } = state - case Process.read_any(process, max_chunk_size) do + case Process.read(process, max_chunk_size) do :eof when stream_exit_status == false -> {:halt, {state, :eof}} @@ -122,14 +121,10 @@ defmodule ExCmd.Stream do elem = [await_exit(state, :eof)] {elem, {state, :exited}} - {:ok, {:stdout, x}} when stderr != :consume -> + {:ok, x} -> elem = [IO.iodata_to_binary(x)] {elem, {state, exit_state}} - {:ok, {io_stream, x}} when stderr == :consume -> - elem = [{io_stream, IO.iodata_to_binary(x)}] - {elem, {state, exit_state}} - {:error, errno} -> raise Error, "failed to read from the external process. errno: #{inspect(errno)}" end diff --git a/test/ex_cmd/process_test.exs b/test/ex_cmd/process_test.exs index 0cb6519..61559f2 100644 --- a/test/ex_cmd/process_test.exs +++ b/test/ex_cmd/process_test.exs @@ -78,48 +78,32 @@ defmodule ExCmd.ProcessTest do ] == get_events(logger) end - # TODO: stderr is not supported - # test "reading from stderr" do - # {:ok, s} = Process.start_link(["sh", "-c", "echo foo >>/dev/stderr"], stderr: :consume) - # # TODO: stderr is not supported - # assert {:ok, "foo\n"} = Process.read_stderr(s, 100) - # end - - # test "reading from stdout or stderr using read_any" do - # script = """ - # echo "foo" - # echo "bar" >&2 - # """ - - # {:ok, s} = Process.start_link(["sh", "-c", script], stderr: :consume) - - # {:ok, ret1} = Process.read_any(s, 100) - # {:ok, ret2} = Process.read_any(s, 100) - - # assert {:stderr, "bar\n"} in [ret1, ret2] - # assert {:stdout, "foo\n"} in [ret1, ret2] - - # assert :eof = Process.read_any(s, 100) - # end + test "stderr disabled" do + script = """ + echo "==foo==" + echo "==bar==" >&2 + """ - # test "reading from stderr_read when stderr disabled" do - # {:ok, s} = Process.start_link(["sh", "-c", "echo foo >>/dev/stderr"], stderr: :console) + {:ok, s} = Process.start_link(["sh", "-c", script], stderr: :disable) - # assert {:error, :pipe_closed_or_invalid_caller} = Process.read_stderr(s, 100) - # end + assert {:ok, "==foo==\n"} = Process.read(s, 100) + assert :eof = Process.read(s, 100) + assert {:ok, 0} = Process.await_exit(s, 100) + end - test "read_any with stderr disabled" do + test "stderr redirect_to_stdout" do script = """ - echo "foo" - echo "bar" >&2 + echo "==foo==" + echo "==bar==" >&2 """ - {:ok, s} = Process.start_link(["sh", "-c", script], stderr: :console) - {:ok, ret} = Process.read_any(s, 100) + {:ok, s} = Process.start_link(["sh", "-c", script], stderr: :redirect_to_stdout) + # wait for the the both output to merge + :timer.sleep(500) - # we can still read from stdout even if stderr is disabled - assert ret == {:stdout, "foo\n"} - assert :eof = Process.read_any(s, 100) + assert {:ok, "==foo==\n==bar==\n"} = Process.read(s, 100) + assert :eof = Process.read(s, 100) + assert {:ok, 0} = Process.await_exit(s, 100) end test "if pipe gets closed on pipe owner exit normally" do