Skip to content

Commit

Permalink
Support redirecting stderr_to_stdout
Browse files Browse the repository at this point in the history
  • Loading branch information
akash-akya committed Sep 17, 2024
1 parent 2e1d839 commit 6ca7adc
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 180 deletions.
8 changes: 4 additions & 4 deletions go_src/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type InputDispatcher func(Packet)

type OutPacket func() (Packet, bool)

func execute(workdir string, args []string) error {
func execute(workdir string, args []string, stderrConfig string) error {
writerDone := make(chan struct{})
// must be buffered so that function can close without blocking
stdinClose := make(chan struct{}, 1)
Expand All @@ -59,7 +59,7 @@ func execute(workdir string, args []string) error {
proc.Dir = workdir
proc.Env = append(os.Environ(), readEnvFromStdin()...)

runPipeline(proc, writerDone, stdinClose, kill)
runPipeline(proc, writerDone, stdinClose, kill, stderrConfig)
err := waitPipelineTermination(proc, sigs, stdinClose, writerDone, kill)

if err == nil {
Expand All @@ -75,12 +75,12 @@ func execute(workdir string, args []string) error {
return err
}

func runPipeline(proc *exec.Cmd, writerDone chan struct{}, stdinClose chan struct{}, kill chan<- bool){
func runPipeline(proc *exec.Cmd, writerDone chan struct{}, stdinClose chan struct{}, kill chan<- bool, stderrConfig string){
cmdInput := make(chan []byte, 1)
cmdOutputDemand := make(chan Packet)
cmdInputDemand := make(chan Packet)

cmdOutput := startCommandPipeline(proc, cmdInput, cmdInputDemand, cmdOutputDemand)
cmdOutput := startCommandPipeline(proc, cmdInput, cmdInputDemand, cmdOutputDemand, stderrConfig)

// go handleSignals(input, outputDemand, done)
go stdinReader(cmdInput, cmdOutputDemand, writerDone, stdinClose, kill)
Expand Down
14 changes: 12 additions & 2 deletions go_src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const ProtocolVersion = "1.0"
const usage = "Usage: odu [options] -- <program> [<arg>...]"

var cdFlag = flag.String("cd", ".", "working directory for the spawned process")
var stderrFlag = flag.String("stderr", "console", "how to handle spawned process stderr stream")
var logFlag = flag.String("log", "", "enable logging")
var protocolVersionFlag = flag.String("protocol_version", "", "protocol version")
var versionFlag = flag.Bool("v", false, "print version and exit")
Expand All @@ -30,10 +31,19 @@ func main() {
os.Exit(0)
}

switch *stderrFlag {
case "console":
case "disable":
case "redirect_to_stdout":
default:
logger.Printf("invalid stderr flag")
os.Exit(3)
}

args := flag.Args()
validateArgs(args)

err := execute(*cdFlag, args)
err := execute(*cdFlag, args, *stderrFlag)

if err == nil {
os.Exit(0)
Expand All @@ -59,7 +69,7 @@ func validateArgs(args []string) {
dieUsage(fmt.Sprintf("Invalid version specified: %v Supported version: %v", *protocolVersionFlag, ProtocolVersion))
}

logger.Printf("dir:%v, log:%v, protocol_version:%v, args:%v\n", *cdFlag, *logFlag, *protocolVersionFlag, args)
logger.Printf("dir:%v, log:%v, protocol_version:%v, stderr: %v, args:%v\n", *cdFlag, *logFlag, *protocolVersionFlag, *stderrFlag, args)
}

func notFifo(path string) bool {
Expand Down
33 changes: 9 additions & 24 deletions go_src/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"os/exec"
)

func startCommandPipeline(proc *exec.Cmd, input <-chan []byte, inputDemand chan<- Packet, outputDemand <-chan Packet) chan []byte {
func startCommandPipeline(proc *exec.Cmd, input <-chan []byte, inputDemand chan<- Packet, outputDemand <-chan Packet, stderrConfig string) chan []byte {
logger.Printf("Command: %v\n", proc.String())

cmdInput, err := proc.StdinPipe()
Expand All @@ -15,16 +15,20 @@ func startCommandPipeline(proc *exec.Cmd, input <-chan []byte, inputDemand chan<
cmdOutput, err := proc.StdoutPipe()
fatalIf(err)

cmdError, err := proc.StderrPipe()
fatalIf(err)
switch stderrConfig {
case "disable":
proc.Stderr = io.Discard
case "console":
proc.Stderr = os.Stderr
case "redirect_to_stdout":
proc.Stderr = proc.Stdout
}

execErr := proc.Start()
fatalIf(execErr)

go writeToCommandStdin(cmdInput, input, inputDemand)

go printStderr(cmdError)

output := make(chan []byte)
go readCommandStdout(cmdOutput, outputDemand, output)

Expand Down Expand Up @@ -111,22 +115,3 @@ func readCommandStdout(cmdOutput io.ReadCloser, outputDemand <-chan Packet, outp
}
}
}

func printStderr(cmdError io.ReadCloser) {
var buf [BufferSize]byte

defer func() {
cmdError.Close()
}()

for {
bytesRead, readErr := cmdError.Read(buf[:])
if bytesRead > 0 {
logger.Printf(string(buf[:bytesRead]))
} else if readErr == io.EOF || bytesRead == 0 {
return
} else {
fatal(readErr)
}
}
}
103 changes: 2 additions & 101 deletions lib/ex_cmd/process.ex
Original file line number Diff line number Diff line change
Expand Up @@ -167,48 +167,6 @@ defmodule ExCmd.Process do
> stream.
### Using `consume`
stderr data can be consumed separately using
`ExCmd.Process.read_stderr/2`. Special function
`ExCmd.Process.read_any/2` can be used to read from either stdout or
stderr whichever has the data available. See the examples for more
details.
> #### Unexpected Behaviors {: .warning}
>
> When set, the `stderr` output **MUST** be consumed to
> avoid blocking the external program when stderr buffer is full.
Reading from stderr using `read_stderr`
```
# write "Hello" to stdout and "World" to stderr
iex> script = Enum.join(["echo Hello", "echo World >&2"], "\n")
iex> {:ok, p} = Process.start_link(["sh", "-c", script], stderr: :consume)
iex> Process.read(p, 100)
{:ok, "Hello\n"}
iex> Process.read_stderr(p, 100)
{:ok, "World\n"}
iex> Process.await_exit(p)
{:ok, 0}
```
Reading using `read_any`
```
# write "Hello" to stdout and "World" to stderr
iex> script = Enum.join(["echo Hello", "echo World >&2"], "\n")
iex> {:ok, p} = Process.start_link(["sh", "-c", script], stderr: :consume)
iex> Process.read_any(p)
{:ok, {:stdout, "Hello\n"}}
iex> Process.read_any(p)
{:ok, {:stderr, "World\n"}}
iex> Process.await_exit(p)
{:ok, 0}
```
### Process Termination
When owner does (normally or abnormally) the ExCmd process always
Expand Down Expand Up @@ -342,7 +300,7 @@ defmodule ExCmd.Process do

@type caller :: GenServer.from()

@default_opts [env: [], stderr: :console]
@default_opts [env: [], stderr: :console, log: nil]
@default_buffer_size 65_535

@doc false
Expand Down Expand Up @@ -495,39 +453,6 @@ defmodule ExCmd.Process do
GenServer.call(process.pid, {:read_stdout, max_size}, :infinity)
end

@doc """
Returns bytes from executed command's stderr with maximum size `max_size`.
Pipe must be enabled with `stderr: :consume` to read the data.
Blocks if no bytes are written to stderr yet. And returns as soon as
bytes are available
Note that `max_size` is the maximum size of the returned data. But
the returned data can be less than that depending on how the program
flush the data etc.
"""
@spec read_stderr(t, pos_integer()) :: {:ok, iodata} | :eof | {:error, any()}
def read_stderr(process, size \\ @default_buffer_size) when is_integer(size) and size > 0 do
GenServer.call(process.pid, {:read_stderr, size}, :infinity)
end

@doc """
Returns bytes from either stdout or stderr with maximum size
`max_size` whichever is available at that time.
Blocks if no bytes are written to stdout or stderr yet. And returns
as soon as data is available.
Note that `max_size` is the maximum size of the returned data. But
the returned data can be less than that depending on how the program
flush the data etc.
"""
@spec read_any(t, pos_integer()) ::
{:ok, {:stdout, iodata}} | {:ok, {:stderr, iodata}} | :eof | {:error, any()}
def read_any(process, size \\ @default_buffer_size) when is_integer(size) and size > 0 do
GenServer.call(process.pid, {:read_stdout_or_stderr, size}, :infinity)
end

@doc """
Changes the Pipe owner of the pipe to specified pid.
Expand Down Expand Up @@ -710,30 +635,6 @@ defmodule ExCmd.Process do
end
end

def handle_call({:read_stderr, size}, from, state) do
IO.inspect({:read_stderr, size})

case Operations.read(state, {:read_stderr, from, size}) do
{:noreply, state} ->
{:noreply, state}

ret ->
{:reply, ret, state}
end
end

def handle_call({:read_stdout_or_stderr, size}, from, state) do
IO.inspect({:read_stdout_or_stderr, size})

case Operations.read_any(state, {:read_stdout_or_stderr, from, size}) do
{:noreply, state} ->
{:noreply, state}

ret ->
{:reply, ret, state}
end
end

def handle_call({:write_stdin, binary}, from, state) do
IO.inspect({:write_stdin, byte_size(binary)})

Check warning on line 639 in lib/ex_cmd/process.ex

View workflow job for this annotation

GitHub Actions / Lint OTP 26.x / Elixir 1.16.x

There should be no calls to `IO.inspect/1`.

Expand Down Expand Up @@ -885,7 +786,7 @@ defmodule ExCmd.Process do
Process.flag(:trap_exit, true)

%{cmd_with_args: cmd_with_args, env: env} = state.args
{os_pid, port} = Proto.start(cmd_with_args, env, Map.take(state.args, [:stderr, :cd]))
{os_pid, port} = Proto.start(cmd_with_args, env, Map.take(state.args, [:log, :stderr, :cd]))

stderr =
if state.stderr == :consume do
Expand Down
28 changes: 25 additions & 3 deletions lib/ex_cmd/process/exec.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ defmodule ExCmd.Process.Exec do
:ok <- validate_opts_fields(opts),
{:ok, cd} <- normalize_cd(opts[:cd]),
{:ok, stderr} <- normalize_stderr(opts[:stderr]),
{:ok, log} <- normalize_log(opts[:log]),
{:ok, env} <- normalize_env(opts[:env]) do
{:ok, %{cmd_with_args: [cmd | args], cd: cd, env: env, stderr: stderr}}
{:ok, %{cmd_with_args: [cmd | args], cd: cd, env: env, stderr: stderr, log: log}}
end
end

Expand Down Expand Up @@ -98,7 +99,7 @@ defmodule ExCmd.Process.Exec do
nil ->
{:ok, :console}

stderr when stderr in [:console, :disable] ->
stderr when stderr in [:console, :disable, :redirect_to_stdout] ->
{:ok, stderr}

_ ->
Expand All @@ -107,9 +108,30 @@ defmodule ExCmd.Process.Exec do
end
end

@spec normalize_log(log :: nil | :stdout | :stderr | String.t()) ::
{:ok, nil | String.t()} | {:error, String.t()}
defp normalize_log(log) do
case log do
nil ->
{:ok, nil}

:stdout ->
{:ok, "|1"}

:stderr ->
{:ok, "|2"}

file when is_binary(file) ->
{:ok, file}

_ ->
{:error, ":log must be an atom and one of nil, :stdout, :stderr, or path"}
end
end

@spec validate_opts_fields(keyword) :: :ok | {:error, String.t()}
defp validate_opts_fields(opts) do
{_, additional_opts} = Keyword.split(opts, [:cd, :env, :stderr])
{_, additional_opts} = Keyword.split(opts, [:cd, :env, :stderr, :log])

if Enum.empty?(additional_opts) do
:ok
Expand Down
15 changes: 11 additions & 4 deletions lib/ex_cmd/process/proto.ex
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,17 @@ defmodule ExCmd.Process.Proto do
raise ArgumentError, message: ":cd is not a valid path"
end

params = ["-cd", cd, "-protocol_version", @odu_protocol_version]

if opts[:stderr] == :console do
params ++ ["-log", "|2"]
params = [
"-cd",
cd,
"-stderr",
to_string(opts[:stderr]),
"-protocol_version",
@odu_protocol_version
]

if opts[:log] do
params ++ ["-log", opts[:log]]
else
params
end
Expand Down
9 changes: 2 additions & 7 deletions lib/ex_cmd/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -108,28 +108,23 @@ defmodule ExCmd.Stream do
%{
process: process,
stream_opts: %{
stderr: stderr,
stream_exit_status: stream_exit_status,
max_chunk_size: max_chunk_size
}
} = state

case Process.read_any(process, max_chunk_size) do
case Process.read(process, max_chunk_size) do
:eof when stream_exit_status == false ->
{:halt, {state, :eof}}

:eof when stream_exit_status == true ->
elem = [await_exit(state, :eof)]
{elem, {state, :exited}}

{:ok, {:stdout, x}} when stderr != :consume ->
{:ok, x} ->
elem = [IO.iodata_to_binary(x)]
{elem, {state, exit_state}}

{:ok, {io_stream, x}} when stderr == :consume ->
elem = [{io_stream, IO.iodata_to_binary(x)}]
{elem, {state, exit_state}}

{:error, errno} ->
raise Error, "failed to read from the external process. errno: #{inspect(errno)}"
end
Expand Down
Loading

0 comments on commit 6ca7adc

Please sign in to comment.