Skip to content

Commit

Permalink
improve concurrency doc
Browse files Browse the repository at this point in the history
  • Loading branch information
ebonnal committed Oct 8, 2024
1 parent ecc0ebf commit 3b802a1
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 6 deletions.
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ assert list(negative_integer_strings) == ['0', '-1', '-2', '-3', '-4', '-5', '-6

### thread-based concurrency

> Applies the transformation concurrently using a thread queue of size `concurrency`:
> Applies the transformation via `concurrency` threads:
```python
import requests
Expand All @@ -123,7 +123,10 @@ pokemon_names: Stream[str] = (
assert list(pokemon_names) == ['bulbasaur', 'ivysaur', 'venusaur']
```

> Preserves the upstream order by default (FIFO) but you can set `ordered=False` for *First Done First Out*.
> Preserves the upstream order by default (FIFO), but you can set `ordered=False` for *First Done First Out*.
> `concurrency` is also the size of the buffer containing not-yet-yielded results. **If the buffer is full, the iteration over the upstream is stopped** until some results are yielded out of the buffer.

### process-based concurrency

Expand Down
8 changes: 4 additions & 4 deletions streamable/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ def foreach(
Args:
effect (Callable[[T], Any]): The function to be applied to each element as a side effect.
concurrency (int): Represents both the number of threads used to concurrently apply the `effect` and the number of elements buffered (default is 1, meaning no multithreading).
concurrency (int): Represents both the number of threads used to concurrently apply the `effect` and the size of the buffer containing not-yet-yielded elements. If the buffer is full, the iteration over the upstream is stopped until some elements are yielded out of the buffer. (default is 1, meaning no multithreading).
ordered (bool): If `concurrency` > 1, whether to preserve the order of upstream elements or to yield them as soon as they are processed (default preserves order).
via_processes (bool): If `concurrency` > 1, applies `effect` concurrently using processes instead of threads (default is threads).
Returns:
Expand All @@ -264,7 +264,7 @@ def aforeach(
Args:
effect (Callable[[T], Any]): The asynchronous function to be applied to each element as a side effect.
concurrency (int): Represents both the number of async tasks concurrently applying the `effect` and the number of elements buffered.
concurrency (int): Represents both the number of async tasks concurrently applying the `effect` and the size of the buffer containing not-yet-yielded elements. If the buffer is full, the iteration over the upstream is stopped until some elements are yielded out of the buffer.
ordered (bool): If `concurrency` > 1, whether to preserve the order of upstream elements or to yield them as soon as they are processed (default preserves order).
Returns:
Stream[T]: A stream of upstream elements, unchanged.
Expand Down Expand Up @@ -309,7 +309,7 @@ def map(
Args:
transformation (Callable[[T], R]): The function to be applied to each element.
concurrency (int): Represents both the number of threads used to concurrently apply `transformation` and the number of results buffered (default is 1, meaning no multithreading).
concurrency (int): Represents both the number of threads used to concurrently apply `transformation` and the size of the buffer containing not-yet-yielded results. If the buffer is full, the iteration over the upstream is stopped until some results are yielded out of the buffer. (default is 1, meaning no multithreading).
ordered (bool): If `concurrency` > 1, whether to preserve the order of upstream elements or to yield them as soon as they are processed (default preserves order).
via_processes (bool): If `concurrency` > 1, applies `transformation` concurrently using processes instead of threads (default via threads).
Returns:
Expand All @@ -329,7 +329,7 @@ def amap(
Args:
transformation (Callable[[T], Coroutine[Any, Any, U]]): The asynchronous function to be applied to each element.
concurrency (int): Represents both the number of async tasks concurrently applying `transformation` and the number of results buffered.
concurrency (int): Represents both the number of async tasks concurrently applying `transformation` and the size of the buffer containing not-yet-yielded results. If the buffer is full, the iteration over the upstream is stopped until some results are yielded out of the buffer.
ordered (bool): If `concurrency` > 1, whether to preserve the order of upstream elements or to yield them as soon as they are processed (default preserves order).
Returns:
Stream[R]: A stream of transformed elements.
Expand Down

0 comments on commit 3b802a1

Please sign in to comment.