Skip to content

Commit

Permalink
Add pv/3 iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
seriyps committed Oct 25, 2024
1 parent b51b5e6 commit e2f594d
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 11 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ jobs:
- name: Run Proper Tests
run: rebar3 proper -c

- name: Run EUnit Tests
run: rebar3 eunit -c

- name: Coverage
run: rebar3 cover --verbose --min_coverage 80 # zip/3 can only be fully tested on OTP-26+

Expand Down
24 changes: 23 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,29 @@ OTP `lists` module.
Functions `iterator_pmap:pmap/2` and `iterator_pmap:pmap/3` provide parallel version
of `iterator:map/2`: it takes iterator as input and returns a new iterator where map function
is executed for each input element in parallel on a pool of worker processes.
While elements of input are processed in parallel, the ordering of elements is preserved.
The `ordered` parameter controls if the parallel map should preserve the order of the original
iterator or it is allowed to reshuffle the elements (so it outputs elements which are processed
faster - earlier, increasing the throughput).

Another non-standard function is `pv/3` (from `man pv` - "pipe view"). A pass-through iterator
that can be added somewhere in the pipeline to periodically (either every `for_each_n` elements
or every `every_s` seconds) report the current progress of a long-running iterator:

```erlang
I0 = ...,
I1 = iterator:pv(
fun(SampleElement, TimePassed, ItemsPassed, TotalItems) ->
TimeS = erlang:convert_time_unit(TimePassed, native, second),
?LOG_INFO("Processed ~p items. Pace is ~p per-second. Current item: ~p",
[TotalItems, ItemsPassed / TimeS, SampleElement])
end,
#{for_each_n => 1000,
every_s => 30},
I0),
...
```
This example will log current progress either every 30 seconds or after processing every 1000
elements (whichever triggers first).

## Setup

Expand Down
92 changes: 92 additions & 0 deletions src/iterator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
map/2,
mapfoldl/3,
nthtail/2,
pv/3,
sublist/2,
takewhile/2,
zip/3
Expand Down Expand Up @@ -485,6 +486,97 @@ maybe_next(done) ->
maybe_next(#iter{} = Iter) ->
next(Iter).

-record(pv, {
f :: fun((any(), integer(), integer(), integer()) -> any()),
for_each_n :: pos_integer(),
every_s :: pos_integer(),
last_report_time :: integer(),
last_report_n :: non_neg_integer(),
total_n :: non_neg_integer(),
inner_i :: iterator:iterator(any())
}).

%% @doc Passthrough iterator one can use to periodically report the progress of the inner iterator.
%% Name comes from `pv' (pipe view) Unix utility. See `man pv'.
%% @param F function to call when one of the conditions triggers. Function arguments:
%% - `Data' - current element of the inner iterator (sample)
%% - `TimePassed' - time passed since the last report in native units
%% - `ItemsPassed' - number of items passed since the last report
%% - `TotalItems' - total number of items passed since the start
%% `TimePassed' + `ItemsPassed' are convenient to calculate the speed of the stream.
%% @param Opts trigger condition options:
%% - `for_each_n' - trigger every N-th element
%% - `every_s' - trigger every S seconds
%% @param InnerIter inner iterator to wrap
%%
%% Keep in mind that whichever trigger condition is met first, the `F' function will be called and
%% counters/timers will reset. So if you set `for_each_n' to 1000 and `every_s' to 30, then the
%% function will be called either as counter reaches 1000 or 30 seconds pass since the last call.
%%
%% If it takes more than `every_s' seconds to process a single element, the function will be called
%% with additional delay.
-spec pv(
fun(
(Type, TimePassed :: integer(), ItemsPassed :: integer(), TotalItems :: integer()) -> any()
),
#{
for_each_n => pos_integer(),
every_s => pos_integer()
},
iterator:iterator(Type)
) -> iterator:iterator(Type) when
Type :: any().
pv(F, Opts, InnerIter) when is_function(F, 4) ->
Start = erlang:monotonic_time(),
State = #pv{
f = F,
every_s = maps:get(every_s, Opts, 30),
for_each_n = maps:get(for_each_n, Opts, 1000),
last_report_time = Start,
last_report_n = 0,
total_n = 0,
inner_i = InnerIter
},
iterator:new(fun yield_pv/1, State).

yield_pv(
#pv{
f = F,
every_s = TimeTrigger,
for_each_n = CountTrigger,
last_report_time = LastReportT,
last_report_n = LastReportN,
total_n = N,
inner_i = InnerIter
} = St
) ->
case iterator:next(InnerIter) of
{ok, Data, NewInnerIter} ->
NextN = N + 1,
ItemsProcessed = NextN - LastReportN,
CountCondition = ItemsProcessed >= CountTrigger,
Now = erlang:monotonic_time(),
TimePassed = Now - LastReportT,
TimeCondition = erlang:convert_time_unit(TimePassed, native, second) >= TimeTrigger,
if
CountCondition orelse TimeCondition ->
F(Data, TimePassed, ItemsProcessed, NextN),
{Data, St#pv{
last_report_time = Now,
last_report_n = NextN,
total_n = NextN,
inner_i = NewInnerIter
}};
true ->
{Data, St#pv{
total_n = NextN,
inner_i = NewInnerIter
}}
end;
done ->
done
end.

%% @doc Iterator over .eterm file (file containing dot-terminated Erlang terms)
%% XXX: never abandon this iterator from long-running processes! It would leak file descriptor!
%% Either consume it to the end or close with `iterator:close/1' explicitly.
Expand Down
55 changes: 55 additions & 0 deletions test/iterator_tests.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
%% @doc Unit-tests for `iterator'.

-module(iterator_tests).

-include_lib("eunit/include/eunit.hrl").

pv_each_n_test() ->
ForEachN = 5,
I0 = iterator:from_list(lists:seq(1, 50)),
Counter = counters:new(1, []),
I1 = iterator:pv(
fun(_, _Time, NItems, TotalItems) ->
ok = counters:add(Counter, 1, ForEachN),
?assertEqual(TotalItems, counters:get(Counter, 1)),
?assertEqual(ForEachN, NItems)
end,
#{
for_each_n => ForEachN,
% large so it never triggers
every_s => 120
},
I0
),
iterator:to_list(I1).

%% XXX: ths test can be flaky because it relies on the sleep time
pv_every_s_test() ->
EveryS = 1,
Size = 70,
Sleep = 50,
ApproxPerBatch = (EveryS * 1000) div Sleep,
I0 = iterator:from_list(lists:seq(1, Size)),
I1 = iterator:map(
fun(X) ->
timer:sleep(Sleep),
X
end,
I0
),
I2 = iterator:pv(
fun(_, Time, NItems, _TotalItems) ->
TimeMs = erlang:convert_time_unit(Time, native, millisecond),
%% We can't assert exact values here because of the sleep
?assert(abs(TimeMs - (EveryS * 1000)) < 30, [{time, TimeMs}]),
?assert(abs(NItems - ApproxPerBatch) < 4, [{n_items, NItems}]),
io:format("ok!~n", [])
end,
#{
% large so it never triggers
for_each_n => 100,
every_s => EveryS
},
I1
),
iterator:to_list(I2).
32 changes: 22 additions & 10 deletions test/prop_pmap.erl
Original file line number Diff line number Diff line change
Expand Up @@ -245,14 +245,26 @@ links() ->
lists:sort(L).

assert_links(Links0) ->
%% Wait up to 50 * 100 = 5s
assert_links(Links0, 50, 100).

assert_links(Links0, N, Sleep) when N > 0 ->
Links = links(),
?assertEqual(
Links0,
Links,
[
{extra, [
{P, erlang:process_info(P)}
|| P <- ordsets:subtract(Links, Links0)
]}
]
).
if
length(Links0) =:= length(Links) ->
?assertEqual(
Links0,
Links,
[
{extra, [
{P, erlang:process_info(P)}
|| P <- ordsets:subtract(Links, Links0)
]}
]
);
true ->
timer:sleep(Sleep),
assert_links(Links0, N - 1, Sleep)
end;
assert_links(Links, _, _) ->
?assertEqual(Links, links()).

0 comments on commit e2f594d

Please sign in to comment.