Skip to content

Commit

Permalink
v0.2.1; bugfixes and features.
Browse files Browse the repository at this point in the history
  • Loading branch information
fresh2dev committed Jun 21, 2023
1 parent 0101c8d commit d820d9b
Show file tree
Hide file tree
Showing 17 changed files with 817 additions and 564 deletions.
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,24 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## v0.2.1

### Added

- `clear_waiting()` function to clear the waiting queue.
- Added `stop_on_lane_error` parameter to `ezpq.Job` to allow for short-circuiting a synchronous lane if a job in the lane fails. When set to `True` and the preceding job has a non-zero exit code, this job will not be run. Note that this is to be set per-job for flexibility.
- Additional unit tests.

### Changed

- `stop_all()` function now clears the waiting queue and terminate running jobs. This addresses a bug where a queue would fail to close when disposing with jobs still in the waiting queue.
- The default `poll` for the queue itself is still `0.1`. Now, the default `poll` for `get` and `wait` is equal to the `poll` for the queue itself, as it makes no sense to check for changes more freqeuntly than changes could arise.

### Removed

- Removed functions `has_waiting`, `has_work`, and `has_completed`. Use `size(...)` for this.
- Renamed `Queue.is_started` to `Queue.is_running`.

## v0.2.0

### Added
Expand Down
42 changes: 37 additions & 5 deletions README.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def random_sleep(x):
return n
```

```{python, echo=TRUE}
```{python, echo=TRUE, eval=FALSE}
start = time.time()
output = [random_sleep(x) for x in range(60)]
Expand All @@ -100,9 +100,17 @@ end = time.time()
print('> Runtime: ' + str(end - start))
```

```
## '> Runtime: 58.932034969329834'
```

Here is the function ran in parallel with an `ezpq` Queue of 6 workers. Thus, the runtime of the above operation will be reduced from ~60s to ~10s.

```{python, eval=FALSE, echo=TRUE}
import time
import random
import ezpq
start = time.time()
with ezpq.Queue(6) as Q:
Expand Down Expand Up @@ -244,7 +252,7 @@ with ezpq.Queue(6) as Q:
Q.put(random_sleep, x)
# repeatedly print sizes until complete.
while Q.has_work():
while Q.size(waiting=True, working=True):
print_sizes(Q)
time.sleep(1)
Expand All @@ -253,7 +261,7 @@ with ezpq.Queue(6) as Q:

### wait

The `wait()` method will block execution until all jobs complete. It also accepts a `timeout` parameter, given in seconds. The return value is the count of jobs that did not complete. Thus, a return value greater than 0 indicates the timeout was exceeded. The parameter `poll` can be used to adjust how frequently (in seconds) the operation checks for completed jobs (default=0.1).
The `wait()` method will block execution until all jobs complete. It also accepts a `timeout` parameter, given in seconds. The return value is the count of jobs that did not complete. Thus, a return value greater than 0 indicates the timeout was exceeded. The parameter `poll` can be used to adjust how frequently (in seconds) the operation checks for completed jobs.

New in v0.2.0, include `show_progress=True` to show a progress bar while waiting. This is equivalent to a call to `waitpb()`.

Expand All @@ -262,7 +270,7 @@ New in v0.2.0, include `show_progress=True` to show a progress bar while waiting

### get

`get()` retrieves and deletes ("pop") the highest priority job from the completed queue, if one is available. If the completed queue is empty, `get()` returns `None`. However, `get()` will wait for a completed job if the `poll` frequency is greater than 0. If the timeout is exceeded, `None` is returned.
`get()` retrieves and deletes ("pop") the highest priority job from the completed queue, if one is available. If the completed queue is empty, `get()` returns `None`. However, `get()` will wait for a completed job if `wait`, `poll`, or `timeout` are specified. If the timeout is exceeded, `None` is returned.

```{python, echo=TRUE}
with ezpq.Queue(6) as Q:
Expand All @@ -274,7 +282,7 @@ with ezpq.Queue(6) as Q:
# repeatedly `get()` until queue is empty.
for i in range(n_inputs):
output[i] = Q.get(poll=0.1)
output[i] = Q.get(wait=True)
```

### collect
Expand Down Expand Up @@ -326,6 +334,30 @@ When you have jobs that are dependent upon another, you can use "lanes" to execu

In the above graphic, notice how same-colored bars never overlap. These bars represent jobs that are in the same lane, which executed synchronously.

### Lane Error Handling

You may want to short-circuit a synchronous lane if a job in the lane fails. You can do this by specifying `skip_on_lane_error=True` when putting a job in the queue. If specified and the preceding job has a non-zero exit code, this job will not be run.

```{python, echo=TRUE}
def reciprocal(x):
time.sleep(0.1) # slow things down
return 1/x # will throw DivideByZero exception
```

```{python, echo=TRUE}
import random
with ezpq.Queue(6) as Q:
for i in range(100):
Q.put(reciprocal, random.randint(0, 10), lane=i%5, suppress_errors=True, stop_on_lane_error=True)
Q.wait()
output = Q.collect()
plt = ezpq.Plot(output).build(facet_by='lane', color_by='exitcode', color_pal=['red', 'blue'])
plt.save('docs/imgs/lane_error.png')
```

![](docs/imgs/lane_error.png)

## ezpq.Plot

The `Plot` class is used to visualize the wait, start, and end times for each job that entered the queueing system. The class is initialized with a list of dicts; exactly what is returned from a call to `collect()` or `map()`.
Expand Down
73 changes: 52 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# `ezpq`: an easy parallel queueing system.

Read this on [GitHub](https://github.com/dm3ll3n/ezpq) or [my site](https://www.donaldmellenbruch.com/project/ezpq/).

* [`ezpq`: an easy parallel queueing system.](#ezpq-an-easy-parallel-queueing-system)
* [Overview](#overview)
* [Features](#features)
Expand All @@ -17,9 +15,13 @@ Read this on [GitHub](https://github.com/dm3ll3n/ezpq) or [my site](https://www.
* [map](#map)
* [dispose](#dispose)
* [Synchronous Lanes](#synchronous-lanes)
* [Lane Error Handling](#lane-error-handling)
* [ezpq.Plot](#ezpqplot)
* [More Examples](#more-examples)

Read this on [GitHub](https://github.com/dm3ll3n/ezpq) or [my
site](https://www.donaldmellenbruch.com/project/ezpq/).

## Overview

`ezpq` implements a parallel queueing system consisting of:
Expand Down Expand Up @@ -97,6 +99,9 @@ Thus, the runtime of the above operation will be reduced from ~60s to
~10s.

``` python
import time
import random
import ezpq
start = time.time()
with ezpq.Queue(6) as Q:
output = Q.map(random_sleep, range(60))
Expand Down Expand Up @@ -138,7 +143,7 @@ print( output[0] )
## {'args': [0],
## 'callback': None,
## 'cancelled': False,
## 'ended': datetime.datetime(2019, 1, 28, 17, 45, 29, 943860),
## 'ended': datetime.datetime(2019, 2, 18, 20, 21, 0, 902915),
## 'exception': None,
## 'exitcode': 0,
## 'function': 'random_sleep',
Expand All @@ -148,11 +153,11 @@ print( output[0] )
## 'name': 1,
## 'output': 1.3444218515250481,
## 'priority': 100,
## 'processed': datetime.datetime(2019, 1, 28, 17, 45, 29, 998175),
## 'qid': 'd6eaaf93',
## 'runtime': 1.3502492904663086,
## 'started': datetime.datetime(2019, 1, 28, 17, 45, 28, 593611),
## 'submitted': datetime.datetime(2019, 1, 28, 17, 45, 28, 489300),
## 'processed': datetime.datetime(2019, 2, 18, 20, 21, 0, 955396),
## 'qid': 'f4717edb',
## 'runtime': 1.3515939712524414,
## 'started': datetime.datetime(2019, 2, 18, 20, 20, 59, 551321),
## 'submitted': datetime.datetime(2019, 2, 18, 20, 20, 59, 446199),
## 'timeout': 0}

Easily convert output to a `pandas` dataframe:
Expand All @@ -164,11 +169,11 @@ print( df.head()[['id', 'output', 'runtime', 'exitcode']] )
```

## id output runtime exitcode
## 0 1 1.344422 1.350249 0
## 1 2 0.634364 0.638975 0
## 2 3 1.456034 1.460431 0
## 3 4 0.737965 0.742028 0
## 4 5 0.736048 0.740672 0
## 0 1 1.344422 1.351594 0
## 1 2 0.634364 0.640723 0
## 2 3 1.456034 1.461620 0
## 3 4 0.737965 0.743645 0
## 4 5 0.736048 0.742260 0

Use `ezpq.Plot` to generate a Gannt chart of the job timings.

Expand Down Expand Up @@ -236,7 +241,7 @@ queue with a call to `submit()`.

## Help on function __init__ in module ezpq.Job:
##
## __init__(self, function, args=None, kwargs=None, name=None, priority=100, lane=None, timeout=0, suppress_errors=False)
## __init__(self, function, args=None, kwargs=None, name=None, priority=100, lane=None, timeout=0, suppress_errors=False, stop_on_lane_error=False)
## Defines what to run within a `ezpq.Queue`, and how to run it.
##
## Args:
Expand Down Expand Up @@ -316,7 +321,7 @@ with ezpq.Queue(6) as Q:
for x in range(60):
Q.put(random_sleep, x)
# repeatedly print sizes until complete.
while Q.has_work():
while Q.size(waiting=True, working=True):
print_sizes(Q)
time.sleep(1)
print_sizes(Q)
Expand All @@ -330,9 +335,9 @@ with ezpq.Queue(6) as Q:
## 'Total: 60; Waiting: 31; Working: 6; Completed: 23'
## 'Total: 60; Waiting: 24; Working: 6; Completed: 30'
## 'Total: 60; Waiting: 17; Working: 6; Completed: 37'
## 'Total: 60; Waiting: 11; Working: 6; Completed: 43'
## 'Total: 60; Waiting: 12; Working: 6; Completed: 42'
## 'Total: 60; Waiting: 6; Working: 6; Completed: 48'
## 'Total: 60; Waiting: 0; Working: 5; Completed: 55'
## 'Total: 60; Waiting: 1; Working: 6; Completed: 53'
## 'Total: 60; Waiting: 0; Working: 1; Completed: 59'
## 'Total: 60; Waiting: 0; Working: 0; Completed: 60'

Expand All @@ -343,7 +348,7 @@ also accepts a `timeout` parameter, given in seconds. The return value
is the count of jobs that did not complete. Thus, a return value greater
than 0 indicates the timeout was exceeded. The parameter `poll` can be
used to adjust how frequently (in seconds) the operation checks for
completed jobs (default=0.1).
completed jobs.

New in v0.2.0, include `show_progress=True` to show a progress bar while
waiting. This is equivalent to a call to `waitpb()`.
Expand All @@ -355,8 +360,8 @@ waiting. This is equivalent to a call to `waitpb()`.
`get()` retrieves and deletes (“pop”) the highest priority job from the
completed queue, if one is available. If the completed queue is empty,
`get()` returns `None`. However, `get()` will wait for a completed job
if the `poll` frequency is greater than 0. If the timeout is exceeded,
`None` is returned.
if `wait`, `poll`, or `timeout` are specified. If the timeout is
exceeded, `None` is returned.

``` python
with ezpq.Queue(6) as Q:
Expand All @@ -368,7 +373,7 @@ with ezpq.Queue(6) as Q:

# repeatedly `get()` until queue is empty.
for i in range(n_inputs):
output[i] = Q.get(poll=0.1)
output[i] = Q.get(wait=True)
```

### collect
Expand Down Expand Up @@ -427,6 +432,32 @@ In the above graphic, notice how same-colored bars never overlap. These
bars represent jobs that are in the same lane, which executed
synchronously.

### Lane Error Handling

You may want to short-circuit a synchronous lane if a job in the lane
fails. You can do this by specifying `skip_on_lane_error=True` when
putting a job in the queue. If specified and the preceding job has a
non-zero exit code, this job will not be run.

``` python
def reciprocal(x):
time.sleep(0.1) # slow things down
return 1/x # will throw DivideByZero exception
```

``` python
import random
with ezpq.Queue(6) as Q:
for i in range(100):
Q.put(reciprocal, random.randint(0, 10), lane=i%5, suppress_errors=True, stop_on_lane_error=True)
Q.wait()
output = Q.collect()
plt = ezpq.Plot(output).build(facet_by='lane', color_by='exitcode', color_pal=['red', 'blue'])
plt.save('docs/imgs/lane_error.png')
```

![](docs/imgs/lane_error.png)

## ezpq.Plot

The `Plot` class is used to visualize the wait, start, and end times for
Expand Down
Loading

0 comments on commit d820d9b

Please sign in to comment.