Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Function allowlist for Executor and manual Graph management #1146

Merged
merged 2 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ jobs:
with:
run: |
cd indexify
poetry run indexify-cli executor &
poetry run indexify-cli executor --dev &
echo $! > /tmp/indexify-executor.pid &

wait-on: |
Expand Down
6 changes: 3 additions & 3 deletions executor/README.md
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
## Overview

Executor registers at Indexify Server and continiously pulls tasks assigned to it from the Indexify Server
Executor registers at Indexify Server and continuously pulls tasks assigned to it from the Indexify Server
and executes them. While registering it shares its capabilities like available hardware with the Indexify
Server and periodically updates the Server about its current state. Executor spins up Function Executors
to run customer functions. Executor should never link with Indexify Python-SDK. It should not know anything
about programming languages and runtime environments used by Indexify Functions. Function Executor is
responsible for this.

This package doesn't provide an executable entry point that runs an Executor. This is intentional
as Executor has many configurable subcomponents. `indexify` package provides a cli with `executor`
as Executor has many configurable sub-components. `indexify` package provides a cli with `executor`
command that runs Executor with functionality available in Open Source offering.

## Deployment

### Production setup

A single Executor runs in a Virtual Machine, container or a in bare metal host. An Indexify cluster
is scaled by adding more Executor hosts. Open Source users manage and scale the hosts themself e.g.
is scaled by adding more Executor hosts. Open Source users manage and scale the hosts themselves e.g.
using Kubernetes, any other orchestrator or even manually. E.g. the users provision secrets,
persistent volumes to each host using the orchestrator or manually. Each Executor runs a single function.
The function name and other qualifiers are defined in Executor arguments.
Expand Down
12 changes: 9 additions & 3 deletions executor/src/executor/api_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,23 @@ class Task(BaseModel):
invocation_id: str
input_key: str
reducer_output_id: Optional[str] = None
graph_version: int
graph_version: str
image_uri: Optional[str] = None
"image_uri defines the URI of the image of this task. Optional since some executors do not require it."


class FunctionURI(BaseModel):
namespace: str
compute_graph: str
compute_fn: str
version: str


class ExecutorMetadata(BaseModel):
id: str
executor_version: str
addr: str
image_name: str
image_hash: str
function_allowlist: Optional[List[FunctionURI]] = None
labels: Dict[str, Any]


Expand Down
3 changes: 2 additions & 1 deletion executor/src/executor/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ async def download_graph(self, task: Task) -> SerializedObject:
self.code_path,
"graph_cache",
task.namespace,
f"{task.compute_graph}.{task.graph_version}",
task.compute_graph,
task.graph_version,
)
# Filesystem operations are synchronous.
# Run in a separate thread to not block the main event loop.
Expand Down
16 changes: 7 additions & 9 deletions executor/src/executor/executor.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import asyncio
import signal
from pathlib import Path
from typing import Any, Optional
from typing import Any, List, Optional

import structlog
from function_executor.proto.function_executor_pb2 import SerializedObject

from .api_objects import Task
from .api_objects import FunctionURI, Task
from .downloader import Downloader
from .function_executor.server.function_executor_server_factory import (
FunctionExecutorServerFactory,
Expand All @@ -21,11 +21,10 @@ def __init__(
self,
executor_id: str,
code_path: Path,
function_allowlist: Optional[List[FunctionURI]],
function_executor_server_factory: FunctionExecutorServerFactory,
server_addr: str = "localhost:8900",
config_path: Optional[str] = None,
name_alias: Optional[str] = None,
image_hash: Optional[str] = None,
):
self._logger = structlog.get_logger(module=__name__)
self._should_run = True
Expand All @@ -38,7 +37,7 @@ def __init__(
self._server_addr = server_addr
self._base_url = f"{protocol}://{self._server_addr}"
self._code_path = code_path
self._task_runnner = TaskRunner(
self._task_runner = TaskRunner(
function_executor_server_factory=function_executor_server_factory,
base_url=self._base_url,
config_path=config_path,
Expand All @@ -50,8 +49,7 @@ def __init__(
protocol=protocol,
indexify_server_addr=self._server_addr,
executor_id=executor_id,
name_alias=name_alias,
image_hash=image_hash,
function_allowlist=function_allowlist,
config_path=config_path,
)
self._task_reporter = TaskReporter(
Expand Down Expand Up @@ -98,7 +96,7 @@ async def _run_task(self, task: Task) -> None:
await self._downloader.download_init_value(task)
)
logger.info("task_execution_started")
output: TaskOutput = await self._task_runnner.run(
output: TaskOutput = await self._task_runner.run(
TaskInput(
task=task,
graph=graph,
Expand Down Expand Up @@ -134,7 +132,7 @@ async def _report_task_outcome(self, output: TaskOutput, logger: Any) -> None:
async def _shutdown(self, loop):
self._logger.info("shutting_down")
self._should_run = False
await self._task_runnner.shutdown()
await self._task_runner.shutdown()
for task in asyncio.all_tasks(loop):
task.cancel()

Expand Down
10 changes: 4 additions & 6 deletions executor/src/executor/task_fetcher.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import json
from importlib.metadata import version
from typing import AsyncGenerator, Optional
from typing import AsyncGenerator, List, Optional

import structlog
from httpx_sse import aconnect_sse
from python_utils.http_client import get_httpx_client

from .api_objects import ExecutorMetadata, Task
from .api_objects import ExecutorMetadata, FunctionURI, Task
from .runtime_probes import ProbeInfo, RuntimeProbes


Expand All @@ -18,8 +18,7 @@ def __init__(
protocol: str,
indexify_server_addr: str,
executor_id: str,
name_alias: Optional[str] = None,
image_hash: Optional[int] = None,
function_allowlist: Optional[List[FunctionURI]],
config_path: Optional[str] = None,
):
self._protocol: str = protocol
Expand All @@ -32,8 +31,7 @@ def __init__(
id=executor_id,
executor_version=version("indexify-executor"),
addr="",
image_name=probe_info.image_name if name_alias is None else name_alias,
image_hash=(probe_info.image_hash if image_hash is None else image_hash),
function_allowlist=function_allowlist,
labels=probe_info.labels,
)

Expand Down
2 changes: 2 additions & 0 deletions executor/tests/src/tests/test_executor_behaviour.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def test_tls_configuration(self, mock_async_client, mock_sync_client, mock_file)
executor = Executor(
executor_id="unit-test",
code_path=Path("test"),
function_allowlist=None,
function_executor_server_factory=None,
server_addr=service_url,
config_path=config_path,
Expand Down Expand Up @@ -62,6 +63,7 @@ def test_no_tls_configuration(self):
executor = Executor(
executor_id="unit-test",
code_path=Path("test"),
function_allowlist=None,
function_executor_server_factory=None,
server_addr="localhost:8900",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__(
self,
request: RunTaskRequest,
graph_name: str,
graph_version: int,
graph_version: str,
function_name: str,
function: Union[IndexifyFunction, IndexifyRouter],
invocation_state: InvocationState,
Expand All @@ -51,7 +51,7 @@ def __init__(
context=GraphInvocationContext(
invocation_id=request.graph_invocation_id,
graph_name=graph_name,
graph_version=str(graph_version),
graph_version=graph_version,
invocation_state=invocation_state,
),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ message SerializedObject {
message InitializeRequest {
optional string namespace = 1;
optional string graph_name = 2;
optional int32 graph_version = 3;
optional string graph_version = 3;
optional string function_name = 5;
optional SerializedObject graph = 7;
}
Expand Down Expand Up @@ -88,7 +88,7 @@ message RouterOutput {
message RunTaskRequest {
optional string namespace = 1;
optional string graph_name = 2;
optional int32 graph_version = 3;
optional string graph_version = 3;
optional string function_name = 4;
optional string graph_invocation_id = 5;
optional string task_id = 6;
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ class InitializeRequest(_message.Message):
GRAPH_FIELD_NUMBER: _ClassVar[int]
namespace: str
graph_name: str
graph_version: int
graph_version: str
function_name: str
graph: SerializedObject
def __init__(
self,
namespace: _Optional[str] = ...,
graph_name: _Optional[str] = ...,
graph_version: _Optional[int] = ...,
graph_version: _Optional[str] = ...,
function_name: _Optional[str] = ...,
graph: _Optional[_Union[SerializedObject, _Mapping]] = ...,
) -> None: ...
Expand Down Expand Up @@ -161,7 +161,7 @@ class RunTaskRequest(_message.Message):
FUNCTION_INIT_VALUE_FIELD_NUMBER: _ClassVar[int]
namespace: str
graph_name: str
graph_version: int
graph_version: str
function_name: str
graph_invocation_id: str
task_id: str
Expand All @@ -171,7 +171,7 @@ class RunTaskRequest(_message.Message):
self,
namespace: _Optional[str] = ...,
graph_name: _Optional[str] = ...,
graph_version: _Optional[int] = ...,
graph_version: _Optional[str] = ...,
function_name: _Optional[str] = ...,
graph_invocation_id: _Optional[str] = ...,
task_id: _Optional[str] = ...,
Expand Down
4 changes: 2 additions & 2 deletions function_executor/src/function_executor/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(self):
self._logger = structlog.get_logger(module=__name__)
self._namespace: Optional[str] = None
self._graph_name: Optional[str] = None
self._graph_version: Optional[int] = None
self._graph_version: Optional[str] = None
self._function_name: Optional[str] = None
self._function: Optional[Union[IndexifyFunction, IndexifyRouter]] = None
self._invocation_state_proxy_server: Optional[InvocationStateProxyServer] = None
Expand All @@ -52,7 +52,7 @@ def initialize(
self._logger = self._logger.bind(
namespace=request.namespace,
graph_name=request.graph_name,
graph_version=str(request.graph_version),
graph_version=request.graph_version,
function_name=request.function_name,
)
graph_serializer = get_serializer(request.graph.content_type)
Expand Down
Loading
Loading