Skip to content

Commit

Permalink
Merge pull request #17 from akash-akya/dev
Browse files Browse the repository at this point in the history
Refactor Exile.Process API and handle corner cases
  • Loading branch information
akash-akya authored Apr 19, 2023
2 parents a76f76f + e22c2f0 commit 9829789
Show file tree
Hide file tree
Showing 22 changed files with 2,338 additions and 877 deletions.
61 changes: 56 additions & 5 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
name: CI
name: Elixir CI

on:
- push
- pull_request

jobs:
test:
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
name: Test - Elixir ${{matrix.elixir}} / OTP ${{matrix.otp}}

strategy:
Expand All @@ -27,9 +27,60 @@ jobs:

- uses: actions/checkout@v3

- name: Cache Dependencies
id: mix-cache
uses: actions/cache@v3
with:
path: |
deps
_build
key: ${{ runner.os }}-${{ matrix.otp }}-${{ matrix.elixir }}-${{ hashFiles('mix.lock') }}

- name: Install Dependencies
if: steps.mix-cache.outputs.cache-hit != 'true'
run: |
mix deps.get
mix deps.compile
- run: gcc --version
- run: mix compile --warnings-as-errors
- run: mix test --exclude skip:true --trace

lint:
runs-on: ubuntu-22.04
name: Lint
strategy:
matrix:
include:
- elixir: 1.14.x
otp: 25.x
steps:
- uses: erlef/setup-beam@v1
with:
otp-version: ${{matrix.otp}}
elixir-version: ${{matrix.elixir}}

- uses: actions/checkout@v3

- name: Cache Dependencies
id: mix-cache
uses: actions/cache@v3
with:
path: |
deps
_build
key: ${{ runner.os }}-${{ matrix.otp }}-${{ matrix.elixir }}-${{ hashFiles('mix.lock') }}

- name: Install Dependencies
if: steps.mix-cache.outputs.cache-hit != 'true'
run: |
mkdir -p priv/plts
mix deps.get
mix deps.compile
mix dialyzer --plt
- run: mix deps.get
- run: mix deps.unlock --check-unused
- run: mix format --check-formatted
- run: gcc --version
- run: mix compile --force --warnings-as-errors
- run: mix test --exclude skip:true --trace
- run: mix credo --strict
- run: mix dialyzer --plt
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ calling_from_make:

UNAME := $(shell uname)

CFLAGS ?= -D_POSIX_C_SOURCE=200809L -Wall -Werror -Wno-unused-parameter -pedantic -std=c99 -O2
CFLAGS ?= -D_POSIX_C_SOURCE=200809L -Wall -Werror -Wno-unused-parameter -pedantic -std=c99 -O2 -fsanitize=undefined

ifeq ($(UNAME), Darwin)
TARGET_CFLAGS ?= -fPIC -undefined dynamic_lookup -dynamiclib -Wextra
Expand All @@ -13,7 +13,6 @@ ifeq ($(UNAME), Linux)
TARGET_CFLAGS ?= -fPIC -shared
endif


all: priv/exile.so priv/spawner

priv/exile.so: c_src/exile.c
Expand Down
110 changes: 109 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Exile is an alternative to [ports](https://hexdocs.pm/elixir/Port.html) for runn
Exile is built around the idea of having demand-driven, asynchronous interaction with external process. Think of streaming a video through `ffmpeg` to serve a web request. Exile internally uses NIF. See [Rationale](#rationale) for details. It also provides stream abstraction for interacting with an external program. For example, getting audio out of a stream is as simple as

``` elixir
Exile.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1), input: File.stream!("music_video.mkv", [], 65535))
Exile.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1), input: File.stream!("music_video.mkv", [], 65_535))
|> Stream.into(File.stream!("music.mp3"))
|> Stream.run()
```
Expand All @@ -22,6 +22,7 @@ Exile requires OTP v22.1 and above.

Exile is based on NIF, please know consequence of that before using Exile. For basic use cases use [ExCmd](https://github.com/akash-akya/ex_cmd) instead.


## Installation

```elixir
Expand All @@ -32,6 +33,113 @@ def deps do
end
```


## Quick Start

Run a command and read from stdout

```
iex> Exile.stream!(~w(echo Hello))
...> |> Enum.into("") # collect as string
"Hello\n"
```

Run a command with list of strings as input

```
iex> Exile.stream!(~w(cat), input: ["Hello", " ", "World"])
...> |> Enum.into("") # collect as string
"Hello World"
```

Run a command with input as Stream

```
iex> input_stream = Stream.map(1..10, fn num -> "#{num} " end)
iex> Exile.stream!(~w(cat), input: input_stream)
...> |> Enum.into("")
"1 2 3 4 5 6 7 8 9 10 "
```

Run a command with input as infinite stream

```
# create infinite stream
iex> input_stream = Stream.repeatedly(fn -> "A" end)
iex> binary =
...> Exile.stream!(~w(cat), input: input_stream, ignore_epipe: true) # we need to ignore epipe since we are terminating the program before the input completes
...> |> Stream.take(2) # we must limit since the input stream is infinite
...> |> Enum.into("")
iex> is_binary(binary)
true
iex> "AAAAA" <> _ = binary
```

Run a command with input Collectable

```
# Exile calls the callback with a sink where the process can push the data
iex> Exile.stream!(~w(cat), input: fn sink ->
...> Stream.map(1..10, fn num -> "#{num} " end)
...> |> Stream.into(sink) # push to the external process
...> |> Stream.run()
...> end)
...> |> Stream.take(100) # we must limit since the input stream is infinite
...> |> Enum.into("")
"1 2 3 4 5 6 7 8 9 10 "
```

When the command wait for the input stream to close

```
# base64 command wait for the input to close and writes data to stdout at once
iex> Exile.stream!(~w(base64), input: ["abcdef"])
...> |> Enum.into("")
"YWJjZGVm\n"
```

When the command exit with an error

```
iex> Exile.stream!(["sh", "-c", "exit 4"])
...> |> Enum.into("")
** (Exile.Process.Error) command exited with status: 4
```

With `max_chunk_size` set

```
iex> data =
...> Exile.stream!(~w(cat /dev/urandom), max_chunk_size: 100, ignore_epipe: true)
...> |> Stream.take(5)
...> |> Enum.into("")
iex> byte_size(data)
500
```

When input and output run at different rate

```
iex> input_stream = Stream.map(1..1000, fn num -> "X #{num} X\n" end)
iex> Exile.stream!(~w(grep 250), input: input_stream)
...> |> Enum.into("")
"X 250 X\n"
```

With stderr enabled

```
iex> Exile.stream!(["sh", "-c", "echo foo\necho bar >> /dev/stderr"], enable_stderr: true)
...> |> Enum.to_list()
[{:stdout, "foo\n"}, {:stderr, "bar\n"}]
```

For more details about stream API, see `Exile.stream!/2`.

For more details about inner working, please check `Exile.Process`
documentation.


## Rationale

Existing approaches
Expand Down
16 changes: 13 additions & 3 deletions c_src/exile.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ static ERL_NIF_TERM ATOM_UNDEFINED;
static ERL_NIF_TERM ATOM_INVALID_FD;
static ERL_NIF_TERM ATOM_SELECT_CANCEL_ERROR;
static ERL_NIF_TERM ATOM_EAGAIN;
static ERL_NIF_TERM ATOM_EPIPE;

static ERL_NIF_TERM ATOM_SIGKILL;
static ERL_NIF_TERM ATOM_SIGTERM;
static ERL_NIF_TERM ATOM_SIGKILL;
static ERL_NIF_TERM ATOM_SIGPIPE;

static void close_fd(int *fd) {
if (*fd != FD_CLOSED) {
Expand Down Expand Up @@ -143,6 +145,8 @@ static ERL_NIF_TERM nif_write(ErlNifEnv *env, int argc,
if (retval != 0)
return make_error(env, enif_make_int(env, retval));
return make_error(env, ATOM_EAGAIN);
} else if (write_errno == EPIPE) {
return make_error(env, ATOM_EPIPE);
} else {
perror("write()");
return make_error(env, enif_make_int(env, write_errno));
Expand Down Expand Up @@ -227,6 +231,8 @@ static ERL_NIF_TERM read_fd(ErlNifEnv *env, int *fd, int max_size) {
if (retval != 0)
return make_error(env, enif_make_int(env, retval));
return make_error(env, ATOM_EAGAIN);
} else if (read_errno == EPIPE) {
return make_error(env, ATOM_EPIPE);
} else {
perror("read_fd()");
return make_error(env, enif_make_int(env, read_errno));
Expand Down Expand Up @@ -298,7 +304,9 @@ static ERL_NIF_TERM nif_kill(ErlNifEnv *env, int argc,
ret = kill(pid, SIGKILL);
else if (enif_compare(argv[1], ATOM_SIGTERM) == 0)
ret = kill(pid, SIGTERM);
else
else if (enif_compare(argv[1], ATOM_SIGPIPE) == 0) {
ret = kill(pid, SIGPIPE);
} else
return enif_make_badarg(env);

if (ret != 0) {
Expand Down Expand Up @@ -326,10 +334,12 @@ static int on_load(ErlNifEnv *env, void **priv, ERL_NIF_TERM load_info) {
ATOM_UNDEFINED = enif_make_atom(env, "undefined");
ATOM_INVALID_FD = enif_make_atom(env, "invalid_fd_resource");
ATOM_EAGAIN = enif_make_atom(env, "eagain");
ATOM_EPIPE = enif_make_atom(env, "epipe");
ATOM_SELECT_CANCEL_ERROR = enif_make_atom(env, "select_cancel_error");

ATOM_SIGTERM = enif_make_atom(env, "sigterm");
ATOM_SIGKILL = enif_make_atom(env, "sigkill");
ATOM_SIGPIPE = enif_make_atom(env, "sigpipe");

return 0;
}
Expand All @@ -347,5 +357,5 @@ static ErlNifFunc nif_funcs[] = {
{"nif_is_os_pid_alive", 1, nif_is_os_pid_alive, USE_DIRTY_IO},
{"nif_kill", 2, nif_kill, USE_DIRTY_IO}};

ERL_NIF_INIT(Elixir.Exile.ProcessNif, nif_funcs, &on_load, NULL, NULL,
ERL_NIF_INIT(Elixir.Exile.Process.Nif, nif_funcs, &on_load, NULL, NULL,
&on_unload)
18 changes: 9 additions & 9 deletions c_src/spawner.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ static void close_pipes(int pipes[3][2]) {
}

static int exec_process(char const *bin, char *const *args, int socket,
bool use_stderr) {
bool enable_stderr) {
int pipes[3][2] = {{0, 0}, {0, 0}, {0, 0}};
int r_cmdin, w_cmdin, r_cmdout, w_cmdout, r_cmderr, w_cmderr;
int i;
Expand Down Expand Up @@ -165,7 +165,7 @@ static int exec_process(char const *bin, char *const *args, int socket,
_exit(FORK_EXEC_FAILURE);
}

if (use_stderr) {
if (enable_stderr) {
close(STDERR_FILENO);
close(r_cmderr);
if (dup2(w_cmderr, STDERR_FILENO) < 0) {
Expand All @@ -189,16 +189,16 @@ static int exec_process(char const *bin, char *const *args, int socket,
_exit(FORK_EXEC_FAILURE);
}

static int spawn(const char *socket_path, const char *use_stderr_str,
static int spawn(const char *socket_path, const char *enable_stderr_str,
const char *bin, char *const *args) {
int socket_fd;
struct sockaddr_un socket_addr;
bool use_stderr;
bool enable_stderr;

if (strcmp(use_stderr_str, "true") == 0) {
use_stderr = true;
if (strcmp(enable_stderr_str, "true") == 0) {
enable_stderr = true;
} else {
use_stderr = false;
enable_stderr = false;
}

socket_fd = socket(AF_UNIX, SOCK_STREAM, 0);
Expand All @@ -222,7 +222,7 @@ static int spawn(const char *socket_path, const char *use_stderr_str,

debug("connected to exile");

if (exec_process(bin, args, socket_fd, use_stderr) != 0)
if (exec_process(bin, args, socket_fd, enable_stderr) != 0)
return EXIT_FAILURE;

// we should never reach here
Expand All @@ -244,7 +244,7 @@ int main(int argc, const char *argv[]) {

exec_argv[i - 3] = NULL;

debug("socket path: %s use_stderr: %s bin: %s", argv[1], argv[2], argv[3]);
debug("socket path: %s enable_stderr: %s bin: %s", argv[1], argv[2], argv[3]);
status = spawn(argv[1], argv[2], argv[3], (char *const *)exec_argv);
}

Expand Down
Loading

0 comments on commit 9829789

Please sign in to comment.