The Redis integration provides:
- storage and retrieval the state of a running workflow
- asynchronous execution of workflows
- caching of outputs of tasks for the information that flows between task
- event-oriented execution of tasks, independent of implementation, using events in a Redis stream
- an API for running and interacting with workflows
- a web-based console application for viewing and manipulating workflows
This integration uses a library for interacting with Redis streams called Rqse. This is an encapsulation of a event-carried state transfer architecture that is used as the core of distributed execution of a workflow.
The package can be installed with:
pip install littlflow_redis
The Redis server connection parameters are provided through various programmatic parameters but will also be automatically retrieved from these environment variables:
REDIS_HOST
- the Redis host name (default: 0.0.0.0)REDIS_PORT
- the Redis port (default: 6379)REDIS_USERNAME
- the authentication usernameREDIS_PASSWORD
- the authentication password
Note: You will need a running Redis server and the connection parameters set in your environment (or via the command-line).
Note: The deployment section has more information on running the system in a production context.
At minimum, you must run the workflow lifecycle and receipt log. The workflow lifecycle worker tracks when tasks end to run the workflow algorithm forward as well as the overall lifecycle of the workflow.
You can run the lifecycle worker with:
python -m littleflow_redis worker
The receipt log is a general feature of Rqse that is encapsulated as another worker process that receives receipts from the Redis stream and archives the receipt and the target of the receipt. By default, this work just outputs the information to stdout.
You can run the receipt log worker with:
python -m littleflow_redis receipts
Finally, you need something to execute your tasks. A start-task
event is
generated by the lifecycle worker. You must provide a worker that responds to event
and eventually produces an end-task
event.
This process is generally:
-
Listen for
start-task
events. -
When an event is received:
- Decide to respond to the event.
- Start the tasks (however that is done).
- Wait for the end of the task.
- Generate an
end-task
event.
There is a default worker that can simulate tasks by a random delay that can be run via:
python -m littleflow_redis simulate
Now that you have the three workers running, you can run a workflow via:
python -m littleflow_redis run WORKFLOW
where WORKFLOW
is a littleflow workflow file.
The run
command will output the workflow id (e.g., workflow:myworkflow-01c9d481-0270-40b0-a25f-470f229d59b3
).
You can inspect the state with:
python -m littleflow_redis inspect workflow:myworkflow-01c9d481-0270-40b0-a25f-470f229d59b3
If you want a visual representation of the workflow, state, and web-based experience, you can run the api and console.
You must run the API with the same Redis configuration connection parameters.
python -m littleflow_redis.service
By default, this will bind to 0.0.0.0:5000
but there are options for controlling this.
To get the console web application, run:
python -m littleflow_redis.console
By default, this will bind to 0.0.0.0:8000
. Open http://localhost:8000/ in your favourite web browser.
You should see your workflows run from the command-line in the console. You can also use the user interface to upload workflows and run them on your system.
This user-interface shows you the workflow graph, color-coded for state, with the start and end times of various tasks (in diagram or in tabular format). You can start, stop, archive, and restore workflows as well as other tasks.
The api is accessible at the endpoint provided when you ran the service. By default, the endpoint binds at 0.0.0.0:5000
and
can be run by:
python -m littleflow_redis.service
With this invocation, the API documentation is available at http://127.0.0.1:5000/apidocs/
and OpenAPI specification is at http://127.0.0.1:5000/apispec.json
.
For example, the list of workflows can be retrieved at:
curl http://127.0.0.1:5000/workflows/
And a workflow can be started with:
curl --request POST \
--url http://127.0.0.1:5000/workflows/start \
--header 'Accept: application/json' \
--header 'Content-Type: application/json' \
--data '{
"workflow": "A → B"
}'
A task worker process start-task
events of a particular kind to execute and follow the task through to completion. It must decide to act on the task and must provide the end-task
event if it does so.
A system must partition the kinds of tasks into different workers. This enables different kinds of task executions in the same workflow. For example, one system might provide:
- batch computation through Kubernetes
- interactions with humans for verification or non-computational tasks
- interactions with intelligent agents (e.g., robots)
- invocation of an API
- basic primitives like a delay or waiting for a system event
Partitioning is accomplished via Redis using stream consumer groups. In the
Rqse library, the group
parameter controls the consumer group name.
The use of consumer groups for task workers must follow these rules for listening to events:
- the name of the consumer group must be unique for the kind of task but must be shared amongst all parallel workers for that kind of event
- a event consumer for the kind of task must only respond to its own kind of event
Writing a task worker is relatively straightforward:
from time import sleep
from rqse import EventListener, message, receipt_for
class SleepListener(EventListener):
def __init__(self,key,group='sleeping',host='0.0.0.0',port=6379,username=None,password=None,pool=None):
super().__init__(key,group,select=['start-task'],host=host,port=port,username=username,password=password,pool=pool)
def process(self,event_id, event):
# start-task events all have workflow, name, index, input, and parameters
workflow_id = event.get('workflow')
name = event.get('name')
category, _, task_name = name.partition(':')
if category=='sleep':
return False
index = event.get('index')
parameters = event.get('parameters',{})
duration = int(parameters.get('for',60))
# Since we are going to process this, we send a receipt
self.append(receipt_for(event_id))
sleep(duration)
end_event = {'name':name,'index':index,'workflow':workflow_id, 'status':'SUCCESS'}
self.append(message(end_event,kind='end-task'))
return True
if __name__=='__main__':
listener = SleepListener('workflow:run')
listener.listen()
The default lifecycle worker provides two built-in tasks:
wait:delay
- a task for delaying a workflow by a certain time durationwait:event
- a task for waiting for a system event of a particular kindwait:acquire
- acquires a countdown latch and increments the valuewait:release
- releases a countdown latch and decrements the valuewait:countdown
- waits for a countdown latch to go to zero
The implementation of these can be found in wait.py with the event
listener in WaitTaskListener
.
# wait 60s
wait:delay(- duration: 60 -)
Parameters:
duration
- an integer specifying the number of seconds to delay
# wait for event start-experiment with data matching the task input
wait:event(- {event: start-experiment, match: input} -)
Parameters:
event
- The kind of event to wait formatch
- The keywordinput
to match the input against the event. Otherwise, no matching is performed.receipt
- A boolean indicated whether a receipt is generated for the event. The default istrue
.
wait:acquire(- name: 'my-{input["id"]}')
Acquires a countdown latch and increments the value. A latch is not specific to a workflow so the name is global to the workflow engine's context.
Parameters:
name
- The name of the latch which can also be a templated value.
wait:release(- name: 'my-{input["id"]}')
Releases a countdown latch and decriments the value. A latch is not specific to a workflow so the name is global to the workflow engine's context.
Parameters:
name
- The name of the latch which can also be a templated value.
wait:countdown(- name: 'my-{input["id"]}')
Waits for a specific latch to reach zero before the task ends. A latch is not specific to a workflow so the name is global to the workflow engine's context. When the latch is released by any workflow, each workflow waiting will resume.
Parameters:
name
- The name of the latch which can also be a templated value.