Skip to content

Commit

Permalink
Support binary as input & validate parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
akash-akya committed Jun 1, 2023
1 parent 1ea53bb commit a978d03
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 44 deletions.
61 changes: 53 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,19 @@
[![docs](https://img.shields.io/badge/docs-hexpm-blue.svg)](https://hexdocs.pm/ex_cmd/)


ExCmd is an Elixir library to run and communicate with external programs with back-pressure mechanism. It makes use os provided stdio buffer for this.

Communication with external program using [Port](https://hexdocs.pm/elixir/Port.html) is not demand driven. So it is easy to run into memory issues when the size of the data we are writing or reading from the external program is large. ExCmd tries to solve this problem by making better use of os provided stdio buffers and providing demand-driven interface to write and read from external program. It can be used to stream data through an external program. For example, streaming a video through `ffmpeg` to serve a web request.
ExCmd is an Elixir library to run and communicate with external
programs with back-pressure mechanism. It makes use os backed stdio
buffer for this.

Communication with external program using
[Port](https://hexdocs.pm/elixir/Port.html) is not demand driven. So
it is easy to run into memory issues when the size of the data we are
writing or reading from the external program is large. ExCmd tries to
solve this problem by making better use of os backed stdio buffers
and providing demand-driven interface to write and read from external
program. It can be used to stream data through an external
program. For example, streaming a video through `ffmpeg` to serve a
web request.

Getting audio out of a video stream is as simple as

Expand All @@ -26,24 +36,59 @@ ExCmd.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1), input: File.stream!("music_vid
* Proper program termination. No more zombie process
* Ability to close stdin and wait for output (with ports one can not selectively close stdin)

If you are not interested in streaming capability, ExCmd can still be useful because of the features listed above. For example running command and getting output as a string

## Examples

```elixir
ExCmd.stream!(~w(curl ifconfig.co))
|> Enum.into("")
```

If you want to use shell to handle more complex pipelines and globs, you can just spawn shell process and pass your shell command as the argument
Binary as input

```elixir
ExCmd.stream!(~w(cat), input: "Hello World")
|> Enum.into("")
# => "Hello World"
```

```elixir
ExCmd.stream!(~w(base64), input: <<1, 2, 3, 4, 5>>)
|> Enum.into("")
# => "AQIDBAU=\n"
```

List of binary as input

```elixir
ExCmd.stream!(~w(cat), input: ["Hello ", "World"])
|> Enum.into("")
# => "Hello World"
```

iodata as input

```elixir
ExCmd.stream!(~w(base64), input: [<<1, 2,>>, [3], [<<4, 5>>]])
|> Enum.into("")
# => "AQIDBAU=\n"
```

If you want pipes and globs, you can spawn shell process and pass your
pipeline as argument

```elixir
cmd = "echo 'foo baar' | base64"
cmd = "echo 'foo bar' | base64"
ExCmd.stream!(["sh", "-c", cmd])
|> Enum.into("")
# => "Zm9vIGJhcgo=\n"
```

Refer [documentation](https://hexdocs.pm/ex_cmd/readme.html) for information
Read [stream documentation](file:///Users/akash/repo/elixir/ex_cmd/doc/ExCmd.html#stream!/2) for information
about parameters.

**Check out [Exile](https://github.com/akash-akya/exile) which is an alternative solution based on NIF without middleware overhead**
**Check out [Exile](https://github.com/akash-akya/exile) which is an
alternative solution based on NIF without middleware overhead**

## Installation

Expand Down
76 changes: 56 additions & 20 deletions lib/ex_cmd.ex
Original file line number Diff line number Diff line change
@@ -1,49 +1,84 @@
defmodule ExCmd do
@moduledoc """
ExCmd is an Elixir library to run and communicate with external programs with back-pressure.
ExCmd is an Elixir library to run and communicate with external
programs with back-pressure.
"""

@doc """
Runs the given command with arguments and return an Enumerable to read command output.
Runs the given command with arguments and return an Enumerable to
read command output.
First parameter must be a list containing command with arguments. example: `["cat", "file.txt"]`.
First parameter must be a list containing command with
arguments. example: `["cat", "file.txt"]`.
### Options
### Optional
* `input` - Input can be either an `Enumerable` or a function which accepts `Collectable`.
* `input` - Input can be binary, iodata, `Enumerable` (list, stream)
or a function which accepts `Collectable`.
* Enumerable:
* Binary
```
# binary
ExCmd.stream!(~w(base64), input: "Hello Wolrd")
```
* Enumerable or iodata
```
# list
ExCmd.stream!(~w(base64), input: ["hello", "world"]) |> Enum.to_list()
ExCmd.stream!(~w(base64), input: ["hello", "world"])
# iodata
ExCmd.stream!(~w(cat), input: ["He", ["llo"], [' ', ?W], ['or', "ld"]])
# stream
ExCmd.stream!(~w(cat), input: File.stream!("log.txt", [], 65536)) |> Enum.to_list()
ExCmd.stream!(~w(cat), input: File.stream!("log.txt", [], 65536))
```
* Collectable:
If the input in a function with arity 1, ExCmd will call that function with a `Collectable` as the argument. The function must *push* input to this collectable. Return value of the function is ignored.
If the input in a function with arity 1, ExCmd will call that
function with a `Collectable` as the argument. The function
must *push* input to this collectable. Return value of the
function is ignored.
```
ExCmd.stream!(~w(cat), input: fn sink -> Enum.into(1..100, sink, &to_string/1) end)
|> Enum.to_list()
ExCmd.stream!(~w(cat), input: fn sink ->
Enum.into(1..100, sink, &to_string/1)
end)
```
By defaults no input is given
* `exit_timeout` - Duration to wait for external program to exit after completion before raising an error. Defaults to `:infinity`
* `exit_timeout` - Time to wait for external program to exit.
After writing all of the input we close the stdin stream and
wait for external program to finish and exit. If program does
not exit within `exit_timeout` an error will be raised and
subsequently program will be killed in background. Defaults
timeout is `:infinity`.
All other options are passed to `ExCmd.Process.start_link/2`
### Example
## Examples
```
ExCmd.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1), input: File.stream!("music_video.mkv", [], 65336))
|> Stream.into(File.stream!("music.mp3"))
|> Stream.run()
```
With input as binary
```
iex> ExCmd.stream!(~w(cat), input: "Hello World")
...> |> Enum.into("")
"Hello World"
```
```
iex> ExCmd.stream!(~w(base64), input: <<1, 2, 3, 4, 5>>)
...> |> Enum.into("")
"AQIDBAU=\n"
```
With input as list
```
Expand All @@ -52,14 +87,16 @@ defmodule ExCmd do
"Hello World"
```
With input as iodata
```
iex> ExCmd.stream!(~w(base64), input: ["Hello ", "World"])
iex> ExCmd.stream!(~w(base64), input: [<<1, 2,>>, [3], [<<4, 5>>]])
...> |> Enum.into("")
"SGVsbG8gV29ybGQ=\n"
"AQIDBAU=\n"
```
When program exit abnormally it will raise `ExCmd.Stream.AbnormalExit` error with exit_status.
When program exit abnormally it will raise
`ExCmd.Stream.AbnormalExit` error with exit_status.
```
iex> try do
Expand All @@ -69,7 +106,6 @@ defmodule ExCmd do
...> e.exit_status
...> end
5
```
"""
@type collectable_func() :: (Collectable.t() -> any())
Expand Down
84 changes: 68 additions & 16 deletions lib/ex_cmd/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,40 +41,43 @@ defmodule ExCmd.Stream do

defstruct [:process, :stream_opts]

@default_opts [exit_timeout: :infinity]

@type t :: %__MODULE__{}

@doc false
def __build__(cmd_with_args, opts) do
{stream_opts, process_opts} = Keyword.split(opts, [:exit_timeout, :input])
stream_opts = Keyword.merge(@default_opts, stream_opts)

{:ok, process} = Process.start_link(cmd_with_args, process_opts)
case normalize_stream_opts(stream_opts) do
{:ok, stream_opts} ->
{:ok, process} = Process.start_link(cmd_with_args, process_opts)

start_input_streamer(%Sink{process: process}, stream_opts[:input])

start_input_streamer(%Sink{process: process}, stream_opts[:input])
%ExCmd.Stream{process: process, stream_opts: stream_opts}
%ExCmd.Stream{
process: process,
stream_opts: stream_opts
}

{:error, error} ->
raise ArgumentError, message: error
end
end

@doc false
defp start_input_streamer(sink, input) do
cond do
is_nil(input) ->
case input do
:no_input ->
:ok

is_function(input, 1) ->
{:enumerable, enum} ->
spawn_link(fn ->
input.(sink)
Enum.into(enum, sink)
end)

Enumerable.impl_for(input) ->
{:collectable, func} ->
spawn_link(fn ->
Enum.into(input, sink)
func.(sink)
end)

true ->
raise ArgumentError,
message: ":input must be either Enumerable or a function with arity 1"
end
end

Expand Down Expand Up @@ -134,4 +137,53 @@ defmodule ExCmd.Stream do
{:error, __MODULE__}
end
end

@spec normalize_input(term) ::
{:ok, :no_input} | {:ok, {:enumerable, term}} | {:ok, {:collectable, function}}
defp normalize_input(term) do
cond do
is_nil(term) ->
{:ok, :no_input}

is_binary(term) ->
{:ok, {:enumerable, [IO.iodata_to_binary(term)]}}

is_function(term, 1) ->
{:ok, {:collectable, term}}

Enumerable.impl_for(term) ->
{:ok, {:enumerable, term}}

true ->
{:error, "`:input` must be either Enumerable or a function which accepts collectable"}
end
end

defp normalize_exit_timeout(timeout) do
case timeout do
nil ->
{:ok, :infinity}

:infinity ->
{:ok, :infinity}

timeout when is_integer(timeout) and timeout > 0 ->
{:ok, timeout}

_ ->
{:error, ":exit_timeout must be either :infinity or a non-zero integer"}
end
end

defp normalize_stream_opts(opts) do
with {:ok, input} <- normalize_input(opts[:input]),
{:ok, exit_timeout} <- normalize_exit_timeout(opts[:exit_timeout]) do
opts = %{
input: input,
exit_timeout: exit_timeout
}

{:ok, opts}
end
end
end

0 comments on commit a978d03

Please sign in to comment.