Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ENG-1380): task chain demo project #36

Merged
merged 33 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
8661046
Initial draft
daabr Aug 5, 2024
7784237
2 approaches side-by-side
daabr Aug 5, 2024
a958ea1
Fix bug
daabr Aug 6, 2024
467c172
Use project var to select workflow mode
daabr Aug 6, 2024
317b30d
Fix bug in retrying event-based task
daabr Aug 6, 2024
ab8e54b
Cosmetic tweak
daabr Aug 6, 2024
e1130cb
Don't handle interaction events in new WF in single-WF mode
daabr Aug 6, 2024
ed460d5
Restructure
daabr Aug 7, 2024
3bb20ab
Documentation
daabr Aug 7, 2024
4e63195
Fix flowchart
daabr Aug 7, 2024
8002175
Flowchart fix
daabr Aug 7, 2024
981ec10
Fix flowchart
daabr Aug 7, 2024
920994a
Fix flowchart
daabr Aug 7, 2024
2df79f6
Fix flowchart
daabr Aug 7, 2024
61c05ad
Fix flowchart
daabr Aug 7, 2024
dfa7c04
Final text tweaks
daabr Aug 7, 2024
1b9cc8f
Tweaks
daabr Aug 7, 2024
9128a1f
Flowchart tweak
daabr Aug 7, 2024
c3704bf
Flowchart tweak
daabr Aug 7, 2024
c850629
Fix flowchart
daabr Aug 7, 2024
a00641c
Fix flowchart
daabr Aug 7, 2024
8cead83
Flowchart for single-WF
daabr Aug 7, 2024
1bc0789
Fix flowchart
daabr Aug 7, 2024
a10882e
Fix: don't forget i after first retry
daabr Aug 7, 2024
2e1c9ba
improvement: step3 can sometimes succeed
daabr Aug 7, 2024
5d9a42a
final README improvements
daabr Aug 7, 2024
542dd27
resize screenshot
daabr Aug 7, 2024
95575ad
tweak size
daabr Aug 7, 2024
d4837c6
Add another screenshot
daabr Aug 7, 2024
e743e01
Remove whitespace
daabr Aug 7, 2024
ff54665
Flowchart tweak
daabr Aug 7, 2024
31677c1
fix flowchart
daabr Aug 7, 2024
a52baac
fix flowchart
daabr Aug 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions task_chain/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Task Chain

This project demonstrates running a sequence of tasks with fault tolerance.

The workflow is resilient to errors in each step (with the ability to retry
each failing step on-demand), as well as server-side failures (thanks to
AutoKitteh's durable execution).

<img src="./images/slack1.png" width="366" height="210" alt="Slack screenshot 1">
<img src="./images/slack2.png" width="366" height="295" alt="Slack screenshot 2">

This directory contains three variants of this project:

1. **Single-workflow approach**: a single workflow runs all the tasks,
including retry loops; it handles Slack interactions using runtime event
subscriptions

1. ["Basic" mode](./single_workflow/basic/) - an explicit specification of
the transitions between steps, and each step is retried in its own loop

2. ["Advanced" mode](./single_workflow/advanced/) - a single loop iterates
over a global list of all the steps, and handles all the retries

```mermaid
flowchart LR
slack{"`Slack
Event`"}
task1[Task 1]
task2[Task 2]
task3[Task 3]
task4[Task 4]
message{"`Retry/Abort
Message`"}
wend(("`Workflow
End`"))

subgraph Workflow 1
direction LR
slack -. Slash Command .-> task1 --> task2 --> task3
task3 -- Success --> task4 -.-> wend
task3 -- Error --> message
message -- Retry --> task3
message -. Abort .-> wend
end
```

2. **[Event-driven approach](./event_driven/)**: a single workflow runs
multiple tasks, except for retries, which branch into separate workflows

```mermaid
flowchart TB
slack1{"`Slack
Event`"}
task1[Task 1]
task2[Task 2]
task3a[Task 3]
error(("`Workflow
Error`"))

slack2{"`Slack
Event`"}
task3b[Task 3 Retry]
task4[Task 4]
success(("`Workflow
Success`"))

subgraph w1 [Workflow 1]
direction LR
slack1 -. Slash Command .-> task1 --> task2 --> task3a -.-> error
end

subgraph w2 [Workflow 2]
direction LR
slack2 -. Retry Button Clicked .-> task3b --> task4 -.-> success
end

w1 -. Retry/Abort Message .-> w2
```
23 changes: 23 additions & 0 deletions task_chain/event_driven/autokitteh.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# This YAML file is a declarative manifest that describes the setup
# of an AutoKitteh project that runs a sequence of tasks, using an
# event-driven approach.
#
# After applying this file, initialize this AutoKitteh project's
# Slack connection.

version: v1

project:
name: task_chain
connections:
- name: slack_conn
integration: slack
triggers:
- name: slack_slash_command
connection: slack_conn
event_type: slash_command
call: program.py:on_slack_slash_command
- name: slack_interaction
connection: slack_conn
event_type: interaction
call: program.py:on_slack_interaction
49 changes: 49 additions & 0 deletions task_chain/event_driven/interactive_message.json.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
[
{
"type": "header",
"text": {
"type": "plain_text",
"emoji": true,
"text": ":warning: Workflow Error"
}
},
{
"type": "divider"
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "MESSAGE"
}
},
{
"type": "divider"
},
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {
"type": "plain_text",
"emoji": true,
"text": "Retry"
},
"value": "retry",
"action_id": "RETRY INDEX"
},
{
"type": "button",
"style": "danger",
"text": {
"type": "plain_text",
"emoji": true,
"text": "Abort"
},
"value": "abort",
"action_id": "ABORT INDEX"
}
]
}
]
84 changes: 84 additions & 0 deletions task_chain/event_driven/program.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
"""This module uses an event-driven approach for the task-chain project.

A single workflow runs all the tasks, except retries:

1. First workflow:
- Trigger: Slack slash command
- Task 1 -> Task 2 -> Task 3 (error) -> Workflow error
2. Second workflow:
- Trigger: user clicks the "Retry" button in Slack
- Task 3 (retry) -> Task 4 -> Successful workflow completion
"""

from pathlib import Path
import random

from autokitteh.slack import slack_client


slack = slack_client("slack_conn")


def step1():
print("Step 1 is doing stuff...")


def step2():
print("Step 2 is doing stuff...")


def step3():
print("Step 3 is doing stuff...")
if random.choice([True, False]):
raise RuntimeError("Something bad happened")


def step4():
print("Step 4 is doing stuff...")


tasks = [step1, step2, step3, step4]


def on_slack_slash_command(event):
"""Use a Slack slash command from a user to start a chain of tasks."""
run_tasks(0, event.data.user_id)


def run_tasks(start_index, user_id):
# Note to the interested reader: it's easy to improve this project
# to traverse a dynamic DAG, instead of a simple static list.
for i, task in enumerate(tasks):
if i >= start_index:
run_retriable_task(task, i, user_id)

message = "Workflow completed successfully :smiley_cat:"
slack.chat_postMessage(channel=user_id, text=message)


def run_retriable_task(task, i, user_id):
try:
task()
except Exception as e:
ask_user_retry_or_abort(task.__name__, e, i, user_id)
raise e # Abort the current workflow.

message = f"Task `{task.__name__}` completed"
slack.chat_postMessage(channel=user_id, text=message)


def ask_user_retry_or_abort(task_name, error, i, user_id):
message = f"The task `{task_name}` failed: `{error}`"
blocks = Path("interactive_message.json.txt").read_text()
blocks = blocks.replace("MESSAGE", message).replace("INDEX", str(i))
slack.chat_postMessage(channel=user_id, text="Workflow error", blocks=blocks)


def on_slack_interaction(event):
"""Handle the user's response (retry / abort) in a new workflow."""
if event.data.actions[0]["value"] == "abort":
return

# This workflow's starting point is a retry of the failed task in the aborted workflow.
i = int(event.data.actions[0]["action_id"].split()[-1])
run_tasks(i, event.data.user.id)
Binary file added task_chain/images/slack1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added task_chain/images/slack2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
19 changes: 19 additions & 0 deletions task_chain/single_workflow/advanced/autokitteh.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# This YAML file is a declarative manifest that describes the setup
# of an AutoKitteh project that runs a sequence of tasks, using an
# advanced single-workflow approach.
#
# After applying this file, initialize this AutoKitteh project's
# Slack connection.

version: v1

project:
name: task_chain
connections:
- name: slack_conn
integration: slack
triggers:
- name: slack_slash_command
connection: slack_conn
event_type: slash_command
call: program.py:on_slack_slash_command
47 changes: 47 additions & 0 deletions task_chain/single_workflow/advanced/interactive_message.json.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
[
{
"type": "header",
"text": {
"type": "plain_text",
"emoji": true,
"text": ":warning: Workflow Error"
}
},
{
"type": "divider"
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "MESSAGE"
}
},
{
"type": "divider"
},
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {
"type": "plain_text",
"emoji": true,
"text": "Retry"
},
"value": "retry",
},
{
"type": "button",
"style": "danger",
"text": {
"type": "plain_text",
"emoji": true,
"text": "Abort"
},
"value": "abort",
}
]
}
]
77 changes: 77 additions & 0 deletions task_chain/single_workflow/advanced/program.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""This module uses a single-workflow approach for the task-chain project.

A single workflow runs all the tasks, including retry loops.
It handles Slack interactions using runtime event subscriptions.
"""

from pathlib import Path
import random

import autokitteh
from autokitteh.slack import slack_client


slack = slack_client("slack_conn")


def step1():
print("Step 1 is doing stuff...")


def step2():
print("Step 2 is doing stuff...")


def step3():
print("Step 3 is doing stuff...")
if random.choice([True, False]):
raise RuntimeError("Something bad happened")


def step4():
print("Step 4 is doing stuff...")


tasks = [step1, step2, step3, step4]


def on_slack_slash_command(event):
"""Use a Slack slash command from a user to start a chain of tasks."""
user_id = event.data.user_id

# Note to the interested reader: it's easy to improve this project
# to traverse a dynamic DAG, instead of a simple static list.
success = True
while len(tasks) > 0 and success:
success = run_retriable_task(tasks[0], user_id)

if success:
message = "Workflow completed successfully :smiley_cat:"
slack.chat_postMessage(channel=user_id, text=message)


def run_retriable_task(task, user_id) -> bool:
try:
task()
except Exception as e:
return ask_user_retry_or_abort(task.__name__, e, user_id)

message = f"Task `{task.__name__}` completed"
slack.chat_postMessage(channel=user_id, text=message)

global tasks
tasks.remove(task)
return True


def ask_user_retry_or_abort(task_name, error, user_id) -> bool:
sub = autokitteh.subscribe("slack_conn", 'event_type == "interaction"')

blocks = Path("interactive_message.json.txt").read_text()
blocks = blocks.replace("MESSAGE", f"The task `{task_name}` failed: `{error}`")
slack.chat_postMessage(channel=user_id, text="Workflow error", blocks=blocks)

# Wait for and handle the user's response in this workflow.
event = autokitteh.next_event(sub)
autokitteh.unsubscribe(sub)
return event.actions[0]["value"] == "retry"
Loading