Skip to content

Commit

Permalink
Function allowlist for Executor
Browse files Browse the repository at this point in the history
Making the executors only run functions of specified compute graphs.
The functions are specified as

`indexify-cli executor --function <namespace>:<workflow>:<compute_func>:<version> --function ...`.

The allowlist is used with Executors in production setups. E.g. only run
a certain function in a Kubernetes POD with an Executor.
This is a default production setup because each function has different
resource usage and it’s easier to size containers per function.
Same with other container configs like secrets, roles, volumes.

`indexify-cli executor --dev` mode is still present. It makes server try
to run any function on the executor. This is convenient for development
of Indexify and for integration testing.

Co-authored-by: Diptanu Gon Choudhury <[email protected]>
Co-authored-by: Eugene Batalov <[email protected]>
  • Loading branch information
diptanu and eabatalov committed Jan 9, 2025
1 parent 799cbfe commit 474ea2a
Show file tree
Hide file tree
Showing 12 changed files with 144 additions and 226 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ jobs:
with:
run: |
cd indexify
poetry run indexify-cli executor &
poetry run indexify-cli executor --dev &
echo $! > /tmp/indexify-executor.pid &
wait-on: |
Expand Down
6 changes: 3 additions & 3 deletions executor/README.md
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
## Overview

Executor registers at Indexify Server and continiously pulls tasks assigned to it from the Indexify Server
Executor registers at Indexify Server and continuously pulls tasks assigned to it from the Indexify Server
and executes them. While registering it shares its capabilities like available hardware with the Indexify
Server and periodically updates the Server about its current state. Executor spins up Function Executors
to run customer functions. Executor should never link with Indexify Python-SDK. It should not know anything
about programming languages and runtime environments used by Indexify Functions. Function Executor is
responsible for this.

This package doesn't provide an executable entry point that runs an Executor. This is intentional
as Executor has many configurable subcomponents. `indexify` package provides a cli with `executor`
as Executor has many configurable sub-components. `indexify` package provides a cli with `executor`
command that runs Executor with functionality available in Open Source offering.

## Deployment

### Production setup

A single Executor runs in a Virtual Machine, container or a in bare metal host. An Indexify cluster
is scaled by adding more Executor hosts. Open Source users manage and scale the hosts themself e.g.
is scaled by adding more Executor hosts. Open Source users manage and scale the hosts themselves e.g.
using Kubernetes, any other orchestrator or even manually. E.g. the users provision secrets,
persistent volumes to each host using the orchestrator or manually. Each Executor runs a single function.
The function name and other qualifiers are defined in Executor arguments.
Expand Down
10 changes: 8 additions & 2 deletions executor/src/executor/api_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,18 @@ class Task(BaseModel):
"image_uri defines the URI of the image of this task. Optional since some executors do not require it."


class FunctionURI(BaseModel):
namespace: str
compute_graph: str
compute_fn: str
version: int


class ExecutorMetadata(BaseModel):
id: str
executor_version: str
addr: str
image_name: str
image_hash: str
function_allowlist: Optional[List[FunctionURI]] = None
labels: Dict[str, Any]


Expand Down
16 changes: 7 additions & 9 deletions executor/src/executor/executor.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import asyncio
import signal
from pathlib import Path
from typing import Any, Optional
from typing import Any, List, Optional

import structlog
from function_executor.proto.function_executor_pb2 import SerializedObject

from .api_objects import Task
from .api_objects import FunctionURI, Task
from .downloader import Downloader
from .function_executor.server.function_executor_server_factory import (
FunctionExecutorServerFactory,
Expand All @@ -21,11 +21,10 @@ def __init__(
self,
executor_id: str,
code_path: Path,
function_allowlist: Optional[List[FunctionURI]],
function_executor_server_factory: FunctionExecutorServerFactory,
server_addr: str = "localhost:8900",
config_path: Optional[str] = None,
name_alias: Optional[str] = None,
image_hash: Optional[str] = None,
):
self._logger = structlog.get_logger(module=__name__)
self._should_run = True
Expand All @@ -38,7 +37,7 @@ def __init__(
self._server_addr = server_addr
self._base_url = f"{protocol}://{self._server_addr}"
self._code_path = code_path
self._task_runnner = TaskRunner(
self._task_runner = TaskRunner(
function_executor_server_factory=function_executor_server_factory,
base_url=self._base_url,
config_path=config_path,
Expand All @@ -50,8 +49,7 @@ def __init__(
protocol=protocol,
indexify_server_addr=self._server_addr,
executor_id=executor_id,
name_alias=name_alias,
image_hash=image_hash,
function_allowlist=function_allowlist,
config_path=config_path,
)
self._task_reporter = TaskReporter(
Expand Down Expand Up @@ -98,7 +96,7 @@ async def _run_task(self, task: Task) -> None:
await self._downloader.download_init_value(task)
)
logger.info("task_execution_started")
output: TaskOutput = await self._task_runnner.run(
output: TaskOutput = await self._task_runner.run(
TaskInput(
task=task,
graph=graph,
Expand Down Expand Up @@ -134,7 +132,7 @@ async def _report_task_outcome(self, output: TaskOutput, logger: Any) -> None:
async def _shutdown(self, loop):
self._logger.info("shutting_down")
self._should_run = False
await self._task_runnner.shutdown()
await self._task_runner.shutdown()
for task in asyncio.all_tasks(loop):
task.cancel()

Expand Down
10 changes: 4 additions & 6 deletions executor/src/executor/task_fetcher.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import json
from importlib.metadata import version
from typing import AsyncGenerator, Optional
from typing import AsyncGenerator, List, Optional

import structlog
from httpx_sse import aconnect_sse
from python_utils.http_client import get_httpx_client

from .api_objects import ExecutorMetadata, Task
from .api_objects import ExecutorMetadata, FunctionURI, Task
from .runtime_probes import ProbeInfo, RuntimeProbes


Expand All @@ -18,8 +18,7 @@ def __init__(
protocol: str,
indexify_server_addr: str,
executor_id: str,
name_alias: Optional[str] = None,
image_hash: Optional[int] = None,
function_allowlist: Optional[List[FunctionURI]],
config_path: Optional[str] = None,
):
self._protocol: str = protocol
Expand All @@ -32,8 +31,7 @@ def __init__(
id=executor_id,
executor_version=version("indexify-executor"),
addr="",
image_name=probe_info.image_name if name_alias is None else name_alias,
image_hash=(probe_info.image_hash if image_hash is None else image_hash),
function_allowlist=function_allowlist,
labels=probe_info.labels,
)

Expand Down
85 changes: 44 additions & 41 deletions indexify/src/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@
import nanoid
import structlog
import typer
from executor.api_objects import FunctionURI
from executor.executor import Executor
from executor.function_executor.server.subprocess_function_executor_server_factory import (
SubprocessFunctionExecutorServerFactory,
)
from rich.console import Console
from rich.panel import Panel
from rich.text import Text
from rich.theme import Theme

from indexify.functions_sdk.image import Build, GetDefaultPythonImage, Image
from indexify.functions_sdk.image import GetDefaultPythonImage, Image

logger = structlog.get_logger(module=__name__)

Expand Down Expand Up @@ -67,8 +67,10 @@ def server_dev_mode():
print("starting indexify server and executor in dev mode...")
print("press Ctrl+C to stop the server and executor.")
print(f"server binary path: {indexify_server_path}")
commands = [indexify_server_path, "indexify-cli executor"]

commands: List[List[str]] = [
[indexify_server_path, "--dev"],
["indexify-cli", "executor", "--dev"],
]
processes = []
stop_event = threading.Event()

Expand Down Expand Up @@ -99,7 +101,7 @@ def signal_handler(sig, frame):

for cmd in commands:
process = subprocess.Popen(
cmd.split(),
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=1,
Expand Down Expand Up @@ -151,32 +153,6 @@ def build_image(
_create_image(obj, python_sdk_path)


@app.command(help="Build platform images for function names")
def build_platform_image(
workflow_file_path: Annotated[str, typer.Argument()],
image_names: Optional[List[str]] = None,
build_service="https://api.tensorlake.ai/images/v1",
):

globals_dict = {}

# Add the folder in the workflow file path to the current Python path
folder_path = os.path.dirname(workflow_file_path)
if folder_path not in sys.path:
sys.path.append(folder_path)

try:
exec(open(workflow_file_path).read(), globals_dict)
except FileNotFoundError as e:
raise Exception(
f"Could not find workflow file to execute at: " f"`{workflow_file_path}`"
)
for _, obj in globals_dict.items():
if type(obj) and isinstance(obj, Image):
if image_names is None or obj._image_name in image_names:
_create_platform_image(obj, build_service)


@app.command(help="Build default image for indexify")
def build_default_image(
python_version: Optional[str] = typer.Option(
Expand Down Expand Up @@ -204,6 +180,15 @@ def executor(
dev: Annotated[
bool, typer.Option("--dev", "-d", help="Run the executor in development mode")
] = False,
function_uris: Annotated[
Optional[List[str]],
typer.Option(
"--function",
"-f",
help="Function that the executor will run "
"specified as <namespace>:<workflow>:<function>:<version>",
),
] = None,
config_path: Optional[str] = typer.Option(
None, help="Path to the TLS configuration file"
),
Expand All @@ -214,15 +199,13 @@ def executor(
ports: Tuple[int, int] = typer.Option(
(50000, 51000), help="Range of localhost TCP ports to be used by the executor"
),
name_alias: Optional[str] = typer.Option(
None, help="Image name override for the executor"
),
image_hash: Optional[str] = typer.Option(
None, help="Image hash override for the executor"
),
):
if not dev:
configure_production_logging()
if function_uris is None:
raise typer.BadParameter(
"At least one function must be specified when not running in development mode"
)

id = nanoid.generate()
logger.info(
Expand All @@ -233,8 +216,7 @@ def executor(
executor_version=version("indexify-executor"),
executor_cache=executor_cache,
ports=ports,
name_alias=name_alias,
image_hash=image_hash,
functions=function_uris,
dev_mode=dev,
)

Expand All @@ -258,15 +240,36 @@ def executor(
server_addr=server_addr,
config_path=config_path,
code_path=executor_cache,
name_alias=name_alias,
image_hash=image_hash,
function_allowlist=_parse_function_uris(function_uris),
function_executor_server_factory=SubprocessFunctionExecutorServerFactory(
development_mode=dev,
server_ports=range(ports[0], ports[1]),
),
).run()


def _parse_function_uris(uri_strs: Optional[List[str]]) -> Optional[List[FunctionURI]]:
if uri_strs is None:
return None

uris: List[FunctionURI] = []
for uri_str in uri_strs:
tokens = uri_str.split(":")
if len(tokens) != 4:
raise typer.BadParameter(
"Function should be specified as <namespace>:<workflow>:<function>:<version>"
)
uris.append(
FunctionURI(
namespace=tokens[0],
compute_graph=tokens[1],
compute_fn=tokens[2],
version=int(tokens[3]),
)
)
return uris


def _create_image(image: Image, python_sdk_path):
console.print(
Text("Creating container for ", style="cyan"),
Expand Down
Loading

0 comments on commit 474ea2a

Please sign in to comment.