Skip to content

Commit

Permalink
Enable action cancellation
Browse files Browse the repository at this point in the history
It is now possible to gracefully cancel actions by DELETEing
the invocation. This relies on the action in question using
a CancelHook dependency, and periodically checking it.

I've refactored slightly, combining three invocation-related
dependencies into one submodule.
  • Loading branch information
rwb27 committed Dec 15, 2023
1 parent 9446ea2 commit 0026835
Show file tree
Hide file tree
Showing 10 changed files with 148 additions and 36 deletions.
55 changes: 53 additions & 2 deletions src/labthings_fastapi/actions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
from ..thing_description.model import LinkElement
from ..file_manager import FileManager
from .invocation_model import InvocationModel, InvocationStatus
from ..dependencies.invocation_logger import invocation_logger
from ..dependencies.invocation import (
CancelHook,
InvocationCancelledError,
invocation_logger,
)

if TYPE_CHECKING:
# We only need these imports for type hints, so this avoids circular imports.
Expand All @@ -43,6 +47,7 @@ def __init__(
default_stop_timeout: float = 5,
log_len: int = 1000,
id: Optional[uuid.UUID] = None,
cancel_hook: Optional[CancelHook] = None,
):
Thread.__init__(self, daemon=True)

Expand All @@ -51,6 +56,7 @@ def __init__(
self.thing_ref = weakref.ref(thing)
self.input = input if input is not None else EmptyInput()
self.dependencies = dependencies if dependencies is not None else {}
self.cancel_hook = cancel_hook

# A UUID for the Invocation (not the same as the threading.Thread ident)
self._ID = id if id is not None else uuid.uuid4() # Task ID
Expand Down Expand Up @@ -124,6 +130,15 @@ def action(self):
def thing(self):
return self.thing_ref()

def cancel(self):
"""Cancel the task by requesting the code to stop
This is very much not guaranteed to work: the action must use
a CancelHook dependency and periodically check it.
"""
if self.cancel_hook is not None:
self.cancel_hook.set()

def response(self, request: Optional[Request] = None):
if request:
href = str(request.url_for("action_invocation", id=self.id))
Expand Down Expand Up @@ -173,7 +188,7 @@ def run(self):
with self._status_lock:
self._return_value = ret
self._status = InvocationStatus.COMPLETED
except SystemExit as e:
except InvocationCancelledError as e:
logging.error(e)
with self._status_lock:
self._status = InvocationStatus.CANCELLED
Expand Down Expand Up @@ -251,6 +266,7 @@ def invoke_action(
id: uuid.UUID,
input: Any,
dependencies: dict[str, Any],
cancel_hook: CancelHook,
) -> Invocation:
"""Invoke an action, returning the thread where it's running"""
thread = Invocation(
Expand All @@ -259,6 +275,7 @@ def invoke_action(
input=input,
dependencies=dependencies,
id=id,
cancel_hook=cancel_hook,
)
self.append_invocation(thread)
thread.start()
Expand Down Expand Up @@ -353,6 +370,40 @@ def action_invocation_output(id: uuid.UUID):
return invocation.output.response()
return invocation.output

@app.delete(
ACTION_INVOCATIONS_PATH + "/{id}",
response_model=None,
responses={
200: {
"description": "Cancel request sent",
},
404: {"description": "Invocation ID not found"},
503: {"description": "Invocation may not be cancelled"},
},
)
def delete_invocation(id: uuid.UUID) -> None:
"""Cancel an action invocation"""
with self._invocations_lock:
try:
invocation: Any = self._invocations[id]
except KeyError:
raise HTTPException(
status_code=404,
detail="No action invocation found with ID {id}",
)
if invocation.status not in [
InvocationStatus.RUNNING,
InvocationStatus.PENDING,
]:
raise HTTPException(
status_code=503,
detail=(
f"The invocation is {invocation.status} "
"and may not be cancelled."
),
)
invocation.cancel()

@app.get(
ACTION_INVOCATIONS_PATH + "/{id}/files",
responses={
Expand Down
55 changes: 55 additions & 0 deletions src/labthings_fastapi/dependencies/invocation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""FastAPI dependency for an invocation ID"""
from __future__ import annotations
import uuid
from typing import Annotated
from fastapi import Depends
import logging
import threading


def invocation_id() -> uuid.UUID:
"""Return a UUID for an action invocation
This is for use as a FastAPI dependency, to allow other dependencies to
access the invocation ID. Useful for e.g. file management.
"""
return uuid.uuid4()


InvocationID = Annotated[uuid.UUID, Depends(invocation_id)]


def invocation_logger(id: InvocationID) -> logging.Logger:
"""Retrieve a logger object for an action invocation"""
return logging.getLogger(f"labthings_fastapi.actions.{id}")


InvocationLogger = Annotated[logging.Logger, Depends(invocation_logger)]


class InvocationCancelledError(SystemExit):
pass


class CancelEvent(threading.Event):
def __init__(self, id: InvocationID):
threading.Event.__init__(self)
self.invocation_id = id

def raise_if_set(self):
"""Raise a CancelledError if the event is set"""
if self.is_set():
raise InvocationCancelledError("The action was cancelled.")

def sleep(self, timeout: float):
"""Sleep for a given time in seconds, but raise an exception if cancelled"""
if self.wait(timeout):
raise InvocationCancelledError("The action was cancelled.")


def invocation_cancel_hook(id: InvocationID) -> CancelHook:
"""Get a cancel hook belonging to a particular invocation"""
return CancelEvent(id)


CancelHook = Annotated[CancelEvent, Depends(invocation_cancel_hook)]
17 changes: 0 additions & 17 deletions src/labthings_fastapi/dependencies/invocation_id.py

This file was deleted.

12 changes: 0 additions & 12 deletions src/labthings_fastapi/dependencies/invocation_logger.py

This file was deleted.

4 changes: 3 additions & 1 deletion src/labthings_fastapi/descriptors/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from fastapi import Body, FastAPI, Request, BackgroundTasks
from pydantic import create_model
from ..actions import InvocationModel
from ..dependencies.invocation_id import InvocationID
from ..dependencies.invocation import CancelHook, InvocationID
from ..utilities.introspection import (
EmptyInput,
StrictEmptyInput,
Expand Down Expand Up @@ -128,6 +128,7 @@ def start_action(
request: Request,
body,
id: InvocationID,
cancel_hook: CancelHook,
background_tasks: BackgroundTasks,
**dependencies,
):
Expand All @@ -138,6 +139,7 @@ def start_action(
input=body,
dependencies=dependencies,
id=id,
cancel_hook=cancel_hook,
)
background_tasks.add_task(thing.action_manager.expire_invocations)
return action.response(request=request)
Expand Down
2 changes: 1 addition & 1 deletion src/labthings_fastapi/file_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from .thing_description.model import LinkElement
import os

from .dependencies.invocation_id import InvocationID
from .dependencies.invocation import InvocationID


class FileManager:
Expand Down
33 changes: 33 additions & 0 deletions tests/test_action_cancel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""
This tests the log that is returned in an action invocation
"""
from fastapi.testclient import TestClient
from labthings_fastapi.thing_server import ThingServer
from temp_client import poll_task, task_href
from labthings_fastapi.thing import Thing
from labthings_fastapi.decorators import thing_action
from labthings_fastapi.descriptors import PropertyDescriptor
from labthings_fastapi.dependencies.invocation import CancelHook


class ThingOne(Thing):
counter = PropertyDescriptor(int, 0)

@thing_action
def count_slowly(self, cancel: CancelHook):
for i in range(10):
cancel.sleep(0.1)
self.counter += 1


def test_invocation_logging():
server = ThingServer()
thing_one = ThingOne()
server.add_thing(thing_one, "/thing_one")
with TestClient(server.app) as client:
r = client.post("/thing_one/count_slowly")
r.raise_for_status()
dr = client.delete(task_href(r.json()))
invocation = poll_task(client, r.json())
assert invocation["status"] == "cancelled"
assert thing_one.counter < 9
2 changes: 1 addition & 1 deletion tests/test_action_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from temp_client import poll_task
from labthings_fastapi.thing import Thing
from labthings_fastapi.decorators import thing_action
from labthings_fastapi.dependencies.invocation_logger import InvocationLogger
from labthings_fastapi.dependencies.invocation import InvocationLogger


class ThingOne(Thing):
Expand Down
2 changes: 1 addition & 1 deletion tests/test_dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"""

from fastapi import Depends, FastAPI, Request
from labthings_fastapi.dependencies.invocation_id import InvocationID
from labthings_fastapi.dependencies.invocation import InvocationID
from labthings_fastapi.file_manager import FileManagerDep
from fastapi.testclient import TestClient
from module_with_deps import FancyIDDep
Expand Down
2 changes: 1 addition & 1 deletion tests/test_dependencies_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from fastapi import Depends, FastAPI
from fastapi.testclient import TestClient
from module_with_deps import FancyIDDep, FancyID, ClassDependsOnFancyID
from labthings_fastapi.dependencies.invocation_id import InvocationID, invocation_id
from labthings_fastapi.dependencies.invocation import InvocationID, invocation_id
from labthings_fastapi.file_manager import FileManager
from uuid import UUID

Expand Down

0 comments on commit 0026835

Please sign in to comment.