diff --git a/README.md b/README.md index ea5129d..4bd9c2a 100644 --- a/README.md +++ b/README.md @@ -5,9 +5,19 @@ [![docs](https://img.shields.io/badge/docs-hexpm-blue.svg)](https://hexdocs.pm/ex_cmd/) -ExCmd is an Elixir library to run and communicate with external programs with back-pressure mechanism. It makes use os provided stdio buffer for this. - -Communication with external program using [Port](https://hexdocs.pm/elixir/Port.html) is not demand driven. So it is easy to run into memory issues when the size of the data we are writing or reading from the external program is large. ExCmd tries to solve this problem by making better use of os provided stdio buffers and providing demand-driven interface to write and read from external program. It can be used to stream data through an external program. For example, streaming a video through `ffmpeg` to serve a web request. +ExCmd is an Elixir library to run and communicate with external +programs with back-pressure mechanism. It makes use os backed stdio +buffer for this. + +Communication with external program using +[Port](https://hexdocs.pm/elixir/Port.html) is not demand driven. So +it is easy to run into memory issues when the size of the data we are +writing or reading from the external program is large. ExCmd tries to +solve this problem by making better use of os backed stdio buffers +and providing demand-driven interface to write and read from external +program. It can be used to stream data through an external +program. For example, streaming a video through `ffmpeg` to serve a +web request. Getting audio out of a video stream is as simple as @@ -26,24 +36,59 @@ ExCmd.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1), input: File.stream!("music_vid * Proper program termination. No more zombie process * Ability to close stdin and wait for output (with ports one can not selectively close stdin) -If you are not interested in streaming capability, ExCmd can still be useful because of the features listed above. For example running command and getting output as a string + +## Examples ```elixir ExCmd.stream!(~w(curl ifconfig.co)) |> Enum.into("") ``` -If you want to use shell to handle more complex pipelines and globs, you can just spawn shell process and pass your shell command as the argument +Binary as input + + ```elixir + ExCmd.stream!(~w(cat), input: "Hello World") + |> Enum.into("") + # => "Hello World" + ``` + + ```elixir + ExCmd.stream!(~w(base64), input: <<1, 2, 3, 4, 5>>) + |> Enum.into("") + # => "AQIDBAU=\n" + ``` + +List of binary as input + + ```elixir + ExCmd.stream!(~w(cat), input: ["Hello ", "World"]) + |> Enum.into("") + # => "Hello World" + ``` + +iodata as input + + ```elixir + ExCmd.stream!(~w(base64), input: [<<1, 2,>>, [3], [<<4, 5>>]]) + |> Enum.into("") + # => "AQIDBAU=\n" + ``` + +If you want pipes and globs, you can spawn shell process and pass your +pipeline as argument ```elixir -cmd = "echo 'foo baar' | base64" +cmd = "echo 'foo bar' | base64" ExCmd.stream!(["sh", "-c", cmd]) |> Enum.into("") +# => "Zm9vIGJhcgo=\n" ``` -Refer [documentation](https://hexdocs.pm/ex_cmd/readme.html) for information +Read [stream documentation](file:///Users/akash/repo/elixir/ex_cmd/doc/ExCmd.html#stream!/2) for information +about parameters. -**Check out [Exile](https://github.com/akash-akya/exile) which is an alternative solution based on NIF without middleware overhead** +**Check out [Exile](https://github.com/akash-akya/exile) which is an +alternative solution based on NIF without middleware overhead** ## Installation diff --git a/lib/ex_cmd.ex b/lib/ex_cmd.ex index 85f8da5..24e7912 100644 --- a/lib/ex_cmd.ex +++ b/lib/ex_cmd.ex @@ -1,42 +1,63 @@ defmodule ExCmd do @moduledoc """ - ExCmd is an Elixir library to run and communicate with external programs with back-pressure. + ExCmd is an Elixir library to run and communicate with external + programs with back-pressure. """ @doc """ - Runs the given command with arguments and return an Enumerable to read command output. + Runs the given command with arguments and return an Enumerable to + read command output. - First parameter must be a list containing command with arguments. example: `["cat", "file.txt"]`. + First parameter must be a list containing command with + arguments. example: `["cat", "file.txt"]`. - ### Options + ### Optional - * `input` - Input can be either an `Enumerable` or a function which accepts `Collectable`. + * `input` - Input can be binary, iodata, `Enumerable` (list, stream) + or a function which accepts `Collectable`. - * Enumerable: + * Binary + ``` + # binary + ExCmd.stream!(~w(base64), input: "Hello Wolrd") + ``` + + * Enumerable or iodata ``` # list - ExCmd.stream!(~w(base64), input: ["hello", "world"]) |> Enum.to_list() + ExCmd.stream!(~w(base64), input: ["hello", "world"]) + + # iodata + ExCmd.stream!(~w(cat), input: ["He", ["llo"], [' ', ?W], ['or', "ld"]]) + # stream - ExCmd.stream!(~w(cat), input: File.stream!("log.txt", [], 65536)) |> Enum.to_list() + ExCmd.stream!(~w(cat), input: File.stream!("log.txt", [], 65536)) ``` * Collectable: - If the input in a function with arity 1, ExCmd will call that function with a `Collectable` as the argument. The function must *push* input to this collectable. Return value of the function is ignored. + If the input in a function with arity 1, ExCmd will call that + function with a `Collectable` as the argument. The function + must *push* input to this collectable. Return value of the + function is ignored. ``` - ExCmd.stream!(~w(cat), input: fn sink -> Enum.into(1..100, sink, &to_string/1) end) - |> Enum.to_list() + ExCmd.stream!(~w(cat), input: fn sink -> + Enum.into(1..100, sink, &to_string/1) + end) ``` - By defaults no input is given - - * `exit_timeout` - Duration to wait for external program to exit after completion before raising an error. Defaults to `:infinity` + * `exit_timeout` - Time to wait for external program to exit. + After writing all of the input we close the stdin stream and + wait for external program to finish and exit. If program does + not exit within `exit_timeout` an error will be raised and + subsequently program will be killed in background. Defaults + timeout is `:infinity`. All other options are passed to `ExCmd.Process.start_link/2` - ### Example + ## Examples ``` ExCmd.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1), input: File.stream!("music_video.mkv", [], 65336)) @@ -44,6 +65,20 @@ defmodule ExCmd do |> Stream.run() ``` + With input as binary + + ``` + iex> ExCmd.stream!(~w(cat), input: "Hello World") + ...> |> Enum.into("") + "Hello World" + ``` + + ``` + iex> ExCmd.stream!(~w(base64), input: <<1, 2, 3, 4, 5>>) + ...> |> Enum.into("") + "AQIDBAU=\n" + ``` + With input as list ``` @@ -52,14 +87,16 @@ defmodule ExCmd do "Hello World" ``` + With input as iodata + ``` - iex> ExCmd.stream!(~w(base64), input: ["Hello ", "World"]) + iex> ExCmd.stream!(~w(base64), input: [<<1, 2,>>, [3], [<<4, 5>>]]) ...> |> Enum.into("") - "SGVsbG8gV29ybGQ=\n" + "AQIDBAU=\n" ``` - - When program exit abnormally it will raise `ExCmd.Stream.AbnormalExit` error with exit_status. + When program exit abnormally it will raise + `ExCmd.Stream.AbnormalExit` error with exit_status. ``` iex> try do @@ -69,7 +106,6 @@ defmodule ExCmd do ...> e.exit_status ...> end 5 - ``` """ @type collectable_func() :: (Collectable.t() -> any()) diff --git a/lib/ex_cmd/stream.ex b/lib/ex_cmd/stream.ex index 176042a..a2d5783 100644 --- a/lib/ex_cmd/stream.ex +++ b/lib/ex_cmd/stream.ex @@ -41,40 +41,43 @@ defmodule ExCmd.Stream do defstruct [:process, :stream_opts] - @default_opts [exit_timeout: :infinity] - @type t :: %__MODULE__{} @doc false def __build__(cmd_with_args, opts) do {stream_opts, process_opts} = Keyword.split(opts, [:exit_timeout, :input]) - stream_opts = Keyword.merge(@default_opts, stream_opts) - {:ok, process} = Process.start_link(cmd_with_args, process_opts) + case normalize_stream_opts(stream_opts) do + {:ok, stream_opts} -> + {:ok, process} = Process.start_link(cmd_with_args, process_opts) + + start_input_streamer(%Sink{process: process}, stream_opts[:input]) - start_input_streamer(%Sink{process: process}, stream_opts[:input]) - %ExCmd.Stream{process: process, stream_opts: stream_opts} + %ExCmd.Stream{ + process: process, + stream_opts: stream_opts + } + + {:error, error} -> + raise ArgumentError, message: error + end end @doc false defp start_input_streamer(sink, input) do - cond do - is_nil(input) -> + case input do + :no_input -> :ok - is_function(input, 1) -> + {:enumerable, enum} -> spawn_link(fn -> - input.(sink) + Enum.into(enum, sink) end) - Enumerable.impl_for(input) -> + {:collectable, func} -> spawn_link(fn -> - Enum.into(input, sink) + func.(sink) end) - - true -> - raise ArgumentError, - message: ":input must be either Enumerable or a function with arity 1" end end @@ -134,4 +137,53 @@ defmodule ExCmd.Stream do {:error, __MODULE__} end end + + @spec normalize_input(term) :: + {:ok, :no_input} | {:ok, {:enumerable, term}} | {:ok, {:collectable, function}} + defp normalize_input(term) do + cond do + is_nil(term) -> + {:ok, :no_input} + + is_binary(term) -> + {:ok, {:enumerable, [IO.iodata_to_binary(term)]}} + + is_function(term, 1) -> + {:ok, {:collectable, term}} + + Enumerable.impl_for(term) -> + {:ok, {:enumerable, term}} + + true -> + {:error, "`:input` must be either Enumerable or a function which accepts collectable"} + end + end + + defp normalize_exit_timeout(timeout) do + case timeout do + nil -> + {:ok, :infinity} + + :infinity -> + {:ok, :infinity} + + timeout when is_integer(timeout) and timeout > 0 -> + {:ok, timeout} + + _ -> + {:error, ":exit_timeout must be either :infinity or a non-zero integer"} + end + end + + defp normalize_stream_opts(opts) do + with {:ok, input} <- normalize_input(opts[:input]), + {:ok, exit_timeout} <- normalize_exit_timeout(opts[:exit_timeout]) do + opts = %{ + input: input, + exit_timeout: exit_timeout + } + + {:ok, opts} + end + end end