Skip to content

Commit

Permalink
Change Graph version type from u32 to string and set it in SDK
Browse files Browse the repository at this point in the history
This allows users to implement versioning of their graphs with
any semantic they want. Use a random uuid by default for the
graph versions so if a user doesn't want to manage versions
manually we just always update the graph on each RemoteGraph.deploy().
  • Loading branch information
eabatalov committed Jan 10, 2025
1 parent d85f840 commit cf28112
Show file tree
Hide file tree
Showing 33 changed files with 588 additions and 467 deletions.
4 changes: 2 additions & 2 deletions executor/src/executor/api_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class Task(BaseModel):
invocation_id: str
input_key: str
reducer_output_id: Optional[str] = None
graph_version: int
graph_version: str
image_uri: Optional[str] = None
"image_uri defines the URI of the image of this task. Optional since some executors do not require it."

Expand All @@ -20,7 +20,7 @@ class FunctionURI(BaseModel):
namespace: str
compute_graph: str
compute_fn: str
version: int
version: str


class ExecutorMetadata(BaseModel):
Expand Down
3 changes: 2 additions & 1 deletion executor/src/executor/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ async def download_graph(self, task: Task) -> SerializedObject:
self.code_path,
"graph_cache",
task.namespace,
f"{task.compute_graph}.{task.graph_version}",
task.compute_graph,
task.graph_version,
)
# Filesystem operations are synchronous.
# Run in a separate thread to not block the main event loop.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__(
self,
request: RunTaskRequest,
graph_name: str,
graph_version: int,
graph_version: str,
function_name: str,
function: Union[IndexifyFunction, IndexifyRouter],
invocation_state: InvocationState,
Expand All @@ -51,7 +51,7 @@ def __init__(
context=GraphInvocationContext(
invocation_id=request.graph_invocation_id,
graph_name=graph_name,
graph_version=str(graph_version),
graph_version=graph_version,
invocation_state=invocation_state,
),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ message SerializedObject {
message InitializeRequest {
optional string namespace = 1;
optional string graph_name = 2;
optional int32 graph_version = 3;
optional string graph_version = 3;
optional string function_name = 5;
optional SerializedObject graph = 7;
}
Expand Down Expand Up @@ -88,7 +88,7 @@ message RouterOutput {
message RunTaskRequest {
optional string namespace = 1;
optional string graph_name = 2;
optional int32 graph_version = 3;
optional string graph_version = 3;
optional string function_name = 4;
optional string graph_invocation_id = 5;
optional string task_id = 6;
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ class InitializeRequest(_message.Message):
GRAPH_FIELD_NUMBER: _ClassVar[int]
namespace: str
graph_name: str
graph_version: int
graph_version: str
function_name: str
graph: SerializedObject
def __init__(
self,
namespace: _Optional[str] = ...,
graph_name: _Optional[str] = ...,
graph_version: _Optional[int] = ...,
graph_version: _Optional[str] = ...,
function_name: _Optional[str] = ...,
graph: _Optional[_Union[SerializedObject, _Mapping]] = ...,
) -> None: ...
Expand Down Expand Up @@ -161,7 +161,7 @@ class RunTaskRequest(_message.Message):
FUNCTION_INIT_VALUE_FIELD_NUMBER: _ClassVar[int]
namespace: str
graph_name: str
graph_version: int
graph_version: str
function_name: str
graph_invocation_id: str
task_id: str
Expand All @@ -171,7 +171,7 @@ class RunTaskRequest(_message.Message):
self,
namespace: _Optional[str] = ...,
graph_name: _Optional[str] = ...,
graph_version: _Optional[int] = ...,
graph_version: _Optional[str] = ...,
function_name: _Optional[str] = ...,
graph_invocation_id: _Optional[str] = ...,
task_id: _Optional[str] = ...,
Expand Down
4 changes: 2 additions & 2 deletions function_executor/src/function_executor/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(self):
self._logger = structlog.get_logger(module=__name__)
self._namespace: Optional[str] = None
self._graph_name: Optional[str] = None
self._graph_version: Optional[int] = None
self._graph_version: Optional[str] = None
self._function_name: Optional[str] = None
self._function: Optional[Union[IndexifyFunction, IndexifyRouter]] = None
self._invocation_state_proxy_server: Optional[InvocationStateProxyServer] = None
Expand All @@ -52,7 +52,7 @@ def initialize(
self._logger = self._logger.bind(
namespace=request.namespace,
graph_name=request.graph_name,
graph_version=str(request.graph_version),
graph_version=request.graph_version,
function_name=request.function_name,
)
graph_serializer = get_serializer(request.graph.content_type)
Expand Down
4 changes: 2 additions & 2 deletions function_executor/tests/src/tests/test_invocation_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def _initialize_function_executor(self, stub: FunctionExecutorStub):
InitializeRequest(
namespace="test",
graph_name="test",
graph_version=1,
graph_version="1",
function_name="set_invocation_state",
graph=SerializedObject(
bytes=CloudPickleSerializer.serialize(
Expand Down Expand Up @@ -228,7 +228,7 @@ def _initialize_function_executor(self, graph: Graph, stub: FunctionExecutorStub
InitializeRequest(
namespace="test",
graph_name="test",
graph_version=1,
graph_version="1",
function_name="get_invocation_state",
graph=SerializedObject(
bytes=CloudPickleSerializer.serialize(
Expand Down
4 changes: 2 additions & 2 deletions function_executor/tests/src/tests/test_max_payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def test_max_function_input_size(self):
InitializeRequest(
namespace="test",
graph_name="test",
graph_version=1,
graph_version="1",
function_name="validate_max_input",
graph=SerializedObject(
bytes=CloudPickleSerializer.serialize(
Expand Down Expand Up @@ -117,7 +117,7 @@ def test_max_function_output_size(self):
InitializeRequest(
namespace="test",
graph_name="test",
graph_version=1,
graph_version="1",
function_name="generate_max_output",
graph=SerializedObject(
bytes=CloudPickleSerializer.serialize(
Expand Down
4 changes: 2 additions & 2 deletions function_executor/tests/src/tests/test_run_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def test_function_success(self):
InitializeRequest(
namespace="test",
graph_name="test",
graph_version=1,
graph_version="1",
function_name="extractor_b",
graph=SerializedObject(
bytes=CloudPickleSerializer.serialize(
Expand Down Expand Up @@ -117,7 +117,7 @@ def test_function_raises_error(self):
InitializeRequest(
namespace="test",
graph_name="test",
graph_version=1,
graph_version="1",
function_name="extractor_exception",
graph=SerializedObject(
bytes=CloudPickleSerializer.serialize(
Expand Down
2 changes: 1 addition & 1 deletion function_executor/tests/src/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def run_task(
RunTaskRequest(
namespace="test",
graph_name="test",
graph_version=1,
graph_version="1",
function_name=function_name,
graph_invocation_id="123",
task_id="test-task",
Expand Down
2 changes: 1 addition & 1 deletion indexify/src/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def _parse_function_uris(uri_strs: Optional[List[str]]) -> Optional[List[Functio
namespace=tokens[0],
compute_graph=tokens[1],
compute_fn=tokens[2],
version=int(tokens[3]),
version=tokens[3],
)
)
return uris
Expand Down
2 changes: 0 additions & 2 deletions python-sdk/src/indexify/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@
indexify_function,
indexify_router,
)
from .functions_sdk.pipeline import Pipeline
from .http_client import IndexifyClient
from .remote_graph import RemoteGraph
from .remote_pipeline import RemotePipeline
from .settings import DEFAULT_SERVICE_URL

__all__ = [
Expand Down
19 changes: 18 additions & 1 deletion python-sdk/src/indexify/functions_sdk/graph.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import importlib
import re
import sys
from collections import defaultdict
from queue import deque
Expand All @@ -16,6 +17,7 @@
)

import cloudpickle
import nanoid
from nanoid import generate
from pydantic import BaseModel
from typing_extensions import get_args, get_origin
Expand Down Expand Up @@ -70,14 +72,18 @@ def __init__(
start_node: IndexifyFunction,
description: Optional[str] = None,
tags: Dict[str, str] = {},
version: str = nanoid.generate(), # Update graph on every deployment unless user wants to manage the version manually.
):
_validate_identifier(version, "version")
_validate_identifier(name, "name")
self.name = name
self.description = description
self.nodes: Dict[str, Union[IndexifyFunction, IndexifyRouter]] = {}
self.routers: Dict[str, List[str]] = defaultdict(list)
self.edges: Dict[str, List[str]] = defaultdict(list)
self.accumulator_zero_values: Dict[str, Any] = {}
self.tags = tags
self.version = version

self.add_node(start_node)
if issubclass(start_node, IndexifyRouter):
Expand Down Expand Up @@ -217,6 +223,7 @@ def definition(self) -> ComputeGraphMetadata:
minor_version=sys.version_info.minor,
sdk_version=importlib.metadata.version("indexify-python-sdk"),
),
version=self.version,
)

def run(self, block_until_done: bool = False, **kwargs) -> str:
Expand All @@ -240,7 +247,7 @@ def run(self, block_until_done: bool = False, **kwargs) -> str:
self._local_graph_ctx = GraphInvocationContext(
invocation_id=input.id,
graph_name=self.name,
graph_version="1",
graph_version=self.version,
invocation_state=LocalInvocationState(),
)
self._run(input, outputs)
Expand Down Expand Up @@ -362,3 +369,13 @@ def output(
payload = payload_dict
outputs.append(payload)
return outputs


def _validate_identifier(value: str, name: str) -> None:
if len(value) > 200:
raise ValueError(f"{name} must be at most 200 characters")
# Following S3 object key naming restrictions.
if not re.match(r"^[a-zA-Z0-9!_\-.*'()]+$", value):
raise ValueError(
f"{name} must only contain alphanumeric characters or ! - _ . * ' ( )"
)
2 changes: 1 addition & 1 deletion python-sdk/src/indexify/functions_sdk/graph_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ComputeGraphMetadata(BaseModel):
accumulator_zero_values: Dict[str, bytes] = {}
runtime_information: RuntimeInformation
replaying: bool = False
version: Optional[int] = -1
version: str

def get_input_payload_serializer(self):
return get_serializer(self.start_node.compute_fn.input_encoder)
Expand Down
Loading

0 comments on commit cf28112

Please sign in to comment.