Skip to content

Commit

Permalink
added starmap; bump version.
Browse files Browse the repository at this point in the history
  • Loading branch information
fresh2dev committed Jun 21, 2023
1 parent d820d9b commit cbfa4a5
Show file tree
Hide file tree
Showing 9 changed files with 3,469 additions and 3,240 deletions.
74 changes: 56 additions & 18 deletions README.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,23 @@ import ezpq

# `ezpq`: an easy parallel queueing system.

Read this on [GitHub](https://github.com/dm3ll3n/ezpq) or [my site](https://www.donaldmellenbruch.com/project/ezpq/).
> Read this on [GitHub](https://github.com/dm3ll3n/ezpq) or [my site](https://www.donaldmellenbruch.com/project/ezpq/).
## How to get it

Install from [PyPI](https://pypi.org/project/ezpq/) with:

```python
pip install ezpq
```

Optional packages:

```python
pip install pandas # required for plots
pip install plotnine # required for plots
pip install tqdm # required for progress bars
```

## Overview

Expand Down Expand Up @@ -59,22 +75,6 @@ The queueing system uses `multiprocessing.Process` by default and can also run j
* Built-in logging to CSV.
* Customizable visualizations of queue operations.

## How to get it

Install from [PyPI](https://pypi.org/project/ezpq/) with:

```python
pip install ezpq
```

Optional packages:

```python
pip install pandas # required for plots
pip install plotnine # required for plots
pip install tqdm # required for progress bars
```

## Quickstart

Suppose you wanted to speed up the following code, which runs 60 operations that take anywhere from 0s to 2s. With an average job time of ~1s, this operation should take ~60s.
Expand Down Expand Up @@ -311,6 +311,44 @@ with ezpq.Queue(6) as Q:

![](docs/imgs/tqdm_map.gif)

### starmap

`starmap` is similar to `map`, but operates on a list of lists, with each nested list being unpacked as arguments to the function.

```{python, echo=TRUE}
def my_pow(x, k):
return '{}^{} = {}'.format(x, k, x**k)
# list of lists to iterate over.
args_list = [[x, x%4] # (x, k)
for x in range(100)]
# starmap
with ezpq.Queue(10) as Q:
output = Q.starmap(my_pow, iterable=args_list)
[x['output'] for x in output[:10]]
```

### startmapkw

Same as `starmap`, but operations on a list of *dicts* to be expanded as kwargs to the function.

```{python, echo=TRUE}
def my_pow(x, k):
return '{}^{} = {}'.format(x, k, x**k)
# list of dicts to iterate over.
kwargs_list = [{ 'x':x, 'k':x%4 } # (x, k)
for x in range(100)]
# starmapkw
with ezpq.Queue(10) as Q:
output = Q.starmapkw(my_pow, iterable=kwargs_list)
[x['output'] for x in output[:10]]
```

### dispose

The queueing operations performed by `ezpq.Queue` are performed on a periodic basis. By default, the `poll` parameter for a Queue is `0.1` seconds. This "pulse" thread will continue firing until the Queue is disposed of.
Expand All @@ -336,7 +374,7 @@ In the above graphic, notice how same-colored bars never overlap. These bars rep

### Lane Error Handling

You may want to short-circuit a synchronous lane if a job in the lane fails. You can do this by specifying `skip_on_lane_error=True` when putting a job in the queue. If specified and the preceding job has a non-zero exit code, this job will not be run.
You may want to short-circuit a synchronous lane if a job in the lane fails. You can do this by specifying `stop_on_lane_error=True` when putting a job in the queue. If specified and the preceding job has a non-zero exit code, this job will not be run.

```{python, echo=TRUE}
def reciprocal(x):
Expand Down
138 changes: 87 additions & 51 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,26 +1,42 @@
# `ezpq`: an easy parallel queueing system.

* [`ezpq`: an easy parallel queueing system.](#ezpq-an-easy-parallel-queueing-system)
* [Overview](#overview)
* [Features](#features)
* [How to get it](#how-to-get-it)
* [Quickstart](#quickstart)
* [ezpq.Queue](#ezpqqueue)
* [ezpq.Job](#ezpqjob)
* [put](#put)
* [size](#size)
* [wait](#wait)
* [get](#get)
* [collect](#collect)
* [map](#map)
* [dispose](#dispose)
* [Synchronous Lanes](#synchronous-lanes)
* [Lane Error Handling](#lane-error-handling)
* [ezpq.Plot](#ezpqplot)
* [More Examples](#more-examples)

Read this on [GitHub](https://github.com/dm3ll3n/ezpq) or [my
site](https://www.donaldmellenbruch.com/project/ezpq/).
> Read this on [GitHub](https://github.com/dm3ll3n/ezpq) or [my site](https://www.donaldmellenbruch.com/project/ezpq/).
- [How to get it](#how-to-get-it)
- [Overview](#overview)
- [Features](#features)
- [Quickstart](#quickstart)
- [ezpq.Queue](#ezpq.queue)
- [ezpq.Job](#ezpq.job)
- [put](#put)
- [size](#size)
- [wait](#wait)
- [get](#get)
- [collect](#collect)
- [map](#map)
- [starmap](#starmap)
- [startmapkw](#startmapkw)
- [dispose](#dispose)
- [Synchronous Lanes](#synchronous-lanes)
- [Lane Error Handling](#lane-error-handling)
- [ezpq.Plot](#ezpq.plot)
- [More Examples](#more-examples)

## How to get it

Install from [PyPI](https://pypi.org/project/ezpq/) with:

``` python
pip install ezpq
```

Optional packages:

``` python
pip install pandas # required for plots
pip install plotnine # required for plots
pip install tqdm # required for progress bars
```

## Overview

Expand Down Expand Up @@ -53,22 +69,6 @@ also run jobs with `threading.Thread`.
- Built-in logging to CSV.
- Customizable visualizations of queue operations.

## How to get it

Install from [PyPI](https://pypi.org/project/ezpq/) with:

``` python
pip install ezpq
```

Optional packages:

``` python
pip install pandas # required for plots
pip install plotnine # required for plots
pip install tqdm # required for progress bars
```

## Quickstart

Suppose you wanted to speed up the following code, which runs 60
Expand Down Expand Up @@ -143,7 +143,7 @@ print( output[0] )
## {'args': [0],
## 'callback': None,
## 'cancelled': False,
## 'ended': datetime.datetime(2019, 2, 18, 20, 21, 0, 902915),
## 'ended': datetime.datetime(2019, 3, 13, 0, 48, 52, 811248),
## 'exception': None,
## 'exitcode': 0,
## 'function': 'random_sleep',
Expand All @@ -153,11 +153,11 @@ print( output[0] )
## 'name': 1,
## 'output': 1.3444218515250481,
## 'priority': 100,
## 'processed': datetime.datetime(2019, 2, 18, 20, 21, 0, 955396),
## 'qid': 'f4717edb',
## 'runtime': 1.3515939712524414,
## 'started': datetime.datetime(2019, 2, 18, 20, 20, 59, 551321),
## 'submitted': datetime.datetime(2019, 2, 18, 20, 20, 59, 446199),
## 'processed': datetime.datetime(2019, 3, 13, 0, 48, 52, 867387),
## 'qid': '13318d36',
## 'runtime': 1.3500409126281738,
## 'started': datetime.datetime(2019, 3, 13, 0, 48, 51, 461207),
## 'submitted': datetime.datetime(2019, 3, 13, 0, 48, 51, 357405),
## 'timeout': 0}

Easily convert output to a `pandas` dataframe:
Expand All @@ -169,11 +169,11 @@ print( df.head()[['id', 'output', 'runtime', 'exitcode']] )
```

## id output runtime exitcode
## 0 1 1.344422 1.351594 0
## 1 2 0.634364 0.640723 0
## 2 3 1.456034 1.461620 0
## 3 4 0.737965 0.743645 0
## 4 5 0.736048 0.742260 0
## 0 1 1.344422 1.350041 0
## 1 2 0.634364 0.638938 0
## 2 3 1.456034 1.459830 0
## 3 4 0.737965 0.741742 0
## 4 5 0.736048 0.739848 0

Use `ezpq.Plot` to generate a Gannt chart of the job timings.

Expand Down Expand Up @@ -335,9 +335,9 @@ with ezpq.Queue(6) as Q:
## 'Total: 60; Waiting: 31; Working: 6; Completed: 23'
## 'Total: 60; Waiting: 24; Working: 6; Completed: 30'
## 'Total: 60; Waiting: 17; Working: 6; Completed: 37'
## 'Total: 60; Waiting: 12; Working: 6; Completed: 42'
## 'Total: 60; Waiting: 11; Working: 6; Completed: 43'
## 'Total: 60; Waiting: 6; Working: 6; Completed: 48'
## 'Total: 60; Waiting: 1; Working: 6; Completed: 53'
## 'Total: 60; Waiting: 0; Working: 5; Completed: 55'
## 'Total: 60; Waiting: 0; Working: 1; Completed: 59'
## 'Total: 60; Waiting: 0; Working: 0; Completed: 60'

Expand Down Expand Up @@ -407,6 +407,42 @@ call. Include `show_progress=True` to get output `tqdm` progress bar.

![](docs/imgs/tqdm_map.gif)

### starmap

`starmap` is similar to `map`, but operates on a list of lists, with
each nested list being unpacked as arguments to the function.

``` python
def my_pow(x, k):
return '{}^{} = {}'.format(x, k, x**k)
# list of lists to iterate over.
args_list = [[x, x%4] # (x, k)
for x in range(100)]
# starmap
with ezpq.Queue(10) as Q:
output = Q.starmap(my_pow, iterable=args_list)

[x['output'] for x in output[:10]]
```

### startmapkw

Same as `starmap`, but operations on a list of *dicts* to be expanded as
kwargs to the function.

``` python
def my_pow(x, k):
return '{}^{} = {}'.format(x, k, x**k)
# list of dicts to iterate over.
kwargs_list = [{ 'x':x, 'k':x%4 } # (x, k)
for x in range(100)]
# starmapkw
with ezpq.Queue(10) as Q:
output = Q.starmapkw(my_pow, iterable=kwargs_list)

[x['output'] for x in output[:10]]
```

### dispose

The queueing operations performed by `ezpq.Queue` are performed on a
Expand Down Expand Up @@ -435,7 +471,7 @@ synchronously.
### Lane Error Handling

You may want to short-circuit a synchronous lane if a job in the lane
fails. You can do this by specifying `skip_on_lane_error=True` when
fails. You can do this by specifying `stop_on_lane_error=True` when
putting a job in the queue. If specified and the preceding job has a
non-zero exit code, this job will not be run.

Expand Down
Loading

0 comments on commit cbfa4a5

Please sign in to comment.