diff --git a/task_chain/README.md b/task_chain/README.md new file mode 100644 index 00000000..4a27aed9 --- /dev/null +++ b/task_chain/README.md @@ -0,0 +1,78 @@ +# Task Chain + +This project demonstrates running a sequence of tasks with fault tolerance. + +The workflow is resilient to errors in each step (with the ability to retry +each failing step on-demand), as well as server-side failures (thanks to +AutoKitteh's durable execution). + +Slack screenshot 1 +Slack screenshot 2 + +This directory contains three variants of this project: + +1. **Single-workflow approach**: a single workflow runs all the tasks, + including retry loops; it handles Slack interactions using runtime event + subscriptions + + 1. ["Basic" mode](./single_workflow/basic/) - an explicit specification of + the transitions between steps, and each step is retried in its own loop + + 2. ["Advanced" mode](./single_workflow/advanced/) - a single loop iterates + over a global list of all the steps, and handles all the retries + +```mermaid +flowchart LR + slack{"`Slack + Event`"} + task1[Task 1] + task2[Task 2] + task3[Task 3] + task4[Task 4] + message{"`Retry/Abort + Message`"} + wend(("`Workflow + End`")) + + subgraph Workflow 1 + direction LR + slack -. Slash Command .-> task1 --> task2 --> task3 + task3 -- Success --> task4 -.-> wend + task3 -- Error --> message + message -- Retry --> task3 + message -. Abort .-> wend + end +``` + +2. **[Event-driven approach](./event_driven/)**: a single workflow runs + multiple tasks, except for retries, which branch into separate workflows + +```mermaid +flowchart TB + slack1{"`Slack + Event`"} + task1[Task 1] + task2[Task 2] + task3a[Task 3] + error(("`Workflow + Error`")) + + slack2{"`Slack + Event`"} + task3b[Task 3 Retry] + task4[Task 4] + success(("`Workflow + Success`")) + + subgraph w1 [Workflow 1] + direction LR + slack1 -. Slash Command .-> task1 --> task2 --> task3a -.-> error + end + + subgraph w2 [Workflow 2] + direction LR + slack2 -. Retry Button Clicked .-> task3b --> task4 -.-> success + end + + w1 -. Retry/Abort Message .-> w2 +``` diff --git a/task_chain/event_driven/autokitteh.yaml b/task_chain/event_driven/autokitteh.yaml new file mode 100644 index 00000000..fd4d2bbd --- /dev/null +++ b/task_chain/event_driven/autokitteh.yaml @@ -0,0 +1,23 @@ +# This YAML file is a declarative manifest that describes the setup +# of an AutoKitteh project that runs a sequence of tasks, using an +# event-driven approach. +# +# After applying this file, initialize this AutoKitteh project's +# Slack connection. + +version: v1 + +project: + name: task_chain + connections: + - name: slack_conn + integration: slack + triggers: + - name: slack_slash_command + connection: slack_conn + event_type: slash_command + call: program.py:on_slack_slash_command + - name: slack_interaction + connection: slack_conn + event_type: interaction + call: program.py:on_slack_interaction diff --git a/task_chain/event_driven/interactive_message.json.txt b/task_chain/event_driven/interactive_message.json.txt new file mode 100644 index 00000000..ca0fefc8 --- /dev/null +++ b/task_chain/event_driven/interactive_message.json.txt @@ -0,0 +1,49 @@ +[ + { + "type": "header", + "text": { + "type": "plain_text", + "emoji": true, + "text": ":warning: Workflow Error" + } + }, + { + "type": "divider" + }, + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": "MESSAGE" + } + }, + { + "type": "divider" + }, + { + "type": "actions", + "elements": [ + { + "type": "button", + "text": { + "type": "plain_text", + "emoji": true, + "text": "Retry" + }, + "value": "retry", + "action_id": "RETRY INDEX" + }, + { + "type": "button", + "style": "danger", + "text": { + "type": "plain_text", + "emoji": true, + "text": "Abort" + }, + "value": "abort", + "action_id": "ABORT INDEX" + } + ] + } +] diff --git a/task_chain/event_driven/program.py b/task_chain/event_driven/program.py new file mode 100644 index 00000000..94b9ffaf --- /dev/null +++ b/task_chain/event_driven/program.py @@ -0,0 +1,84 @@ +"""This module uses an event-driven approach for the task-chain project. + +A single workflow runs all the tasks, except retries: + +1. First workflow: + - Trigger: Slack slash command + - Task 1 -> Task 2 -> Task 3 (error) -> Workflow error +2. Second workflow: + - Trigger: user clicks the "Retry" button in Slack + - Task 3 (retry) -> Task 4 -> Successful workflow completion +""" + +from pathlib import Path +import random + +from autokitteh.slack import slack_client + + +slack = slack_client("slack_conn") + + +def step1(): + print("Step 1 is doing stuff...") + + +def step2(): + print("Step 2 is doing stuff...") + + +def step3(): + print("Step 3 is doing stuff...") + if random.choice([True, False]): + raise RuntimeError("Something bad happened") + + +def step4(): + print("Step 4 is doing stuff...") + + +tasks = [step1, step2, step3, step4] + + +def on_slack_slash_command(event): + """Use a Slack slash command from a user to start a chain of tasks.""" + run_tasks(0, event.data.user_id) + + +def run_tasks(start_index, user_id): + # Note to the interested reader: it's easy to improve this project + # to traverse a dynamic DAG, instead of a simple static list. + for i, task in enumerate(tasks): + if i >= start_index: + run_retriable_task(task, i, user_id) + + message = "Workflow completed successfully :smiley_cat:" + slack.chat_postMessage(channel=user_id, text=message) + + +def run_retriable_task(task, i, user_id): + try: + task() + except Exception as e: + ask_user_retry_or_abort(task.__name__, e, i, user_id) + raise e # Abort the current workflow. + + message = f"Task `{task.__name__}` completed" + slack.chat_postMessage(channel=user_id, text=message) + + +def ask_user_retry_or_abort(task_name, error, i, user_id): + message = f"The task `{task_name}` failed: `{error}`" + blocks = Path("interactive_message.json.txt").read_text() + blocks = blocks.replace("MESSAGE", message).replace("INDEX", str(i)) + slack.chat_postMessage(channel=user_id, text="Workflow error", blocks=blocks) + + +def on_slack_interaction(event): + """Handle the user's response (retry / abort) in a new workflow.""" + if event.data.actions[0]["value"] == "abort": + return + + # This workflow's starting point is a retry of the failed task in the aborted workflow. + i = int(event.data.actions[0]["action_id"].split()[-1]) + run_tasks(i, event.data.user.id) diff --git a/task_chain/images/slack1.png b/task_chain/images/slack1.png new file mode 100644 index 00000000..884a6429 Binary files /dev/null and b/task_chain/images/slack1.png differ diff --git a/task_chain/images/slack2.png b/task_chain/images/slack2.png new file mode 100644 index 00000000..47ca2bba Binary files /dev/null and b/task_chain/images/slack2.png differ diff --git a/task_chain/single_workflow/advanced/autokitteh.yaml b/task_chain/single_workflow/advanced/autokitteh.yaml new file mode 100644 index 00000000..edb9bf9b --- /dev/null +++ b/task_chain/single_workflow/advanced/autokitteh.yaml @@ -0,0 +1,19 @@ +# This YAML file is a declarative manifest that describes the setup +# of an AutoKitteh project that runs a sequence of tasks, using an +# advanced single-workflow approach. +# +# After applying this file, initialize this AutoKitteh project's +# Slack connection. + +version: v1 + +project: + name: task_chain + connections: + - name: slack_conn + integration: slack + triggers: + - name: slack_slash_command + connection: slack_conn + event_type: slash_command + call: program.py:on_slack_slash_command diff --git a/task_chain/single_workflow/advanced/interactive_message.json.txt b/task_chain/single_workflow/advanced/interactive_message.json.txt new file mode 100644 index 00000000..a3dd5bfa --- /dev/null +++ b/task_chain/single_workflow/advanced/interactive_message.json.txt @@ -0,0 +1,47 @@ +[ + { + "type": "header", + "text": { + "type": "plain_text", + "emoji": true, + "text": ":warning: Workflow Error" + } + }, + { + "type": "divider" + }, + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": "MESSAGE" + } + }, + { + "type": "divider" + }, + { + "type": "actions", + "elements": [ + { + "type": "button", + "text": { + "type": "plain_text", + "emoji": true, + "text": "Retry" + }, + "value": "retry", + }, + { + "type": "button", + "style": "danger", + "text": { + "type": "plain_text", + "emoji": true, + "text": "Abort" + }, + "value": "abort", + } + ] + } +] diff --git a/task_chain/single_workflow/advanced/program.py b/task_chain/single_workflow/advanced/program.py new file mode 100644 index 00000000..e7b80b09 --- /dev/null +++ b/task_chain/single_workflow/advanced/program.py @@ -0,0 +1,77 @@ +"""This module uses a single-workflow approach for the task-chain project. + +A single workflow runs all the tasks, including retry loops. +It handles Slack interactions using runtime event subscriptions. +""" + +from pathlib import Path +import random + +import autokitteh +from autokitteh.slack import slack_client + + +slack = slack_client("slack_conn") + + +def step1(): + print("Step 1 is doing stuff...") + + +def step2(): + print("Step 2 is doing stuff...") + + +def step3(): + print("Step 3 is doing stuff...") + if random.choice([True, False]): + raise RuntimeError("Something bad happened") + + +def step4(): + print("Step 4 is doing stuff...") + + +tasks = [step1, step2, step3, step4] + + +def on_slack_slash_command(event): + """Use a Slack slash command from a user to start a chain of tasks.""" + user_id = event.data.user_id + + # Note to the interested reader: it's easy to improve this project + # to traverse a dynamic DAG, instead of a simple static list. + success = True + while len(tasks) > 0 and success: + success = run_retriable_task(tasks[0], user_id) + + if success: + message = "Workflow completed successfully :smiley_cat:" + slack.chat_postMessage(channel=user_id, text=message) + + +def run_retriable_task(task, user_id) -> bool: + try: + task() + except Exception as e: + return ask_user_retry_or_abort(task.__name__, e, user_id) + + message = f"Task `{task.__name__}` completed" + slack.chat_postMessage(channel=user_id, text=message) + + global tasks + tasks.remove(task) + return True + + +def ask_user_retry_or_abort(task_name, error, user_id) -> bool: + sub = autokitteh.subscribe("slack_conn", 'event_type == "interaction"') + + blocks = Path("interactive_message.json.txt").read_text() + blocks = blocks.replace("MESSAGE", f"The task `{task_name}` failed: `{error}`") + slack.chat_postMessage(channel=user_id, text="Workflow error", blocks=blocks) + + # Wait for and handle the user's response in this workflow. + event = autokitteh.next_event(sub) + autokitteh.unsubscribe(sub) + return event.actions[0]["value"] == "retry" diff --git a/task_chain/single_workflow/basic/autokitteh.yaml b/task_chain/single_workflow/basic/autokitteh.yaml new file mode 100644 index 00000000..16a651e8 --- /dev/null +++ b/task_chain/single_workflow/basic/autokitteh.yaml @@ -0,0 +1,19 @@ +# This YAML file is a declarative manifest that describes the setup +# of an AutoKitteh project that runs a sequence of tasks, using a +# basic single-workflow approach. +# +# After applying this file, initialize this AutoKitteh project's +# Slack connection. + +version: v1 + +project: + name: task_chain + connections: + - name: slack_conn + integration: slack + triggers: + - name: slack_slash_command + connection: slack_conn + event_type: slash_command + call: program.py:on_slack_slash_command diff --git a/task_chain/single_workflow/basic/interactive_message.json.txt b/task_chain/single_workflow/basic/interactive_message.json.txt new file mode 100644 index 00000000..a3dd5bfa --- /dev/null +++ b/task_chain/single_workflow/basic/interactive_message.json.txt @@ -0,0 +1,47 @@ +[ + { + "type": "header", + "text": { + "type": "plain_text", + "emoji": true, + "text": ":warning: Workflow Error" + } + }, + { + "type": "divider" + }, + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": "MESSAGE" + } + }, + { + "type": "divider" + }, + { + "type": "actions", + "elements": [ + { + "type": "button", + "text": { + "type": "plain_text", + "emoji": true, + "text": "Retry" + }, + "value": "retry", + }, + { + "type": "button", + "style": "danger", + "text": { + "type": "plain_text", + "emoji": true, + "text": "Abort" + }, + "value": "abort", + } + ] + } +] diff --git a/task_chain/single_workflow/basic/program.py b/task_chain/single_workflow/basic/program.py new file mode 100644 index 00000000..866287e8 --- /dev/null +++ b/task_chain/single_workflow/basic/program.py @@ -0,0 +1,78 @@ +"""This module uses a single-workflow approach for the task-chain project. + +A single workflow runs all the tasks, including retry loops. +It handles Slack interactions using runtime event subscriptions. +""" + +from pathlib import Path +import random + +import autokitteh +from autokitteh.slack import slack_client + + +slack = slack_client("slack_conn") + + +def step1(): + print("Step 1 is doing stuff...") + + +def step2(): + print("Step 2 is doing stuff...") + + +def step3(): + print("Step 3 is doing stuff...") + if random.choice([True, False]): + raise RuntimeError("Something bad happened") + + +def step4(): + print("Step 4 is doing stuff...") + + +def on_slack_slash_command(event): + """Use a Slack slash command from a user to start a chain of tasks.""" + user_id = event.data.user_id + + if not run_retriable_task(step1, user_id): + return + if not run_retriable_task(step2, user_id): + return + if not run_retriable_task(step3, user_id): + return + if not run_retriable_task(step4, user_id): + return + + message = "Workflow completed successfully :smiley_cat:" + slack.chat_postMessage(channel=user_id, text=message) + + +def run_retriable_task(task, user_id) -> bool: + result = True + while result: + try: + task() + break + except Exception as e: + result = ask_user_retry_or_abort(task.__name__, e, user_id) + + if result: + message = f"Task `{task.__name__}` completed" + slack.chat_postMessage(channel=user_id, text=message) + + return result + + +def ask_user_retry_or_abort(task_name, error, user_id) -> bool: + sub = autokitteh.subscribe("slack_conn", 'event_type == "interaction"') + + blocks = Path("interactive_message.json.txt").read_text() + blocks = blocks.replace("MESSAGE", f"The task `{task_name}` failed: `{error}`") + slack.chat_postMessage(channel=user_id, text="Workflow error", blocks=blocks) + + # Wait for and handle the user's response in this workflow. + event = autokitteh.next_event(sub) + autokitteh.unsubscribe(sub) + return event.actions[0]["value"] == "retry"