Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Invocation state proxy for Function Executors and new API for functions #1134

Merged
merged 1 commit into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 4 additions & 11 deletions python-sdk/indexify/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@
console = Console(theme=custom_theme)

app = typer.Typer(pretty_exceptions_enable=False, no_args_is_help=True)
config_path_option: Optional[str] = typer.Option(
None, help="Path to the TLS configuration file"
)


@app.command(
Expand Down Expand Up @@ -209,7 +206,9 @@ def executor(
dev: Annotated[
bool, typer.Option("--dev", "-d", help="Run the executor in development mode")
] = False,
config_path: Optional[str] = config_path_option,
config_path: Optional[str] = typer.Option(
None, help="Path to the TLS configuration file"
),
executor_cache: Optional[str] = typer.Option(
"~/.indexify/executor_cache", help="Path to the executor cache directory"
),
Expand Down Expand Up @@ -264,27 +263,21 @@ def function_executor(
function_executor_server_address: str = typer.Option(
help="Function Executor server address"
),
indexify_server_address: str = typer.Option(help="Indexify server address"),
dev: Annotated[
bool, typer.Option("--dev", "-d", help="Run the executor in development mode")
] = False,
config_path: Optional[str] = config_path_option,
):
if not dev:
configure_production_logging()

logger.info(
"starting function executor server",
function_executor_server_address=function_executor_server_address,
indexify_server_address=indexify_server_address,
config_path=config_path,
)

FunctionExecutorServer(
server_address=function_executor_server_address,
service=FunctionExecutorService(
indexify_server_address=indexify_server_address, config_path=config_path
),
service=FunctionExecutorService(),
).run()


Expand Down
26 changes: 15 additions & 11 deletions python-sdk/indexify/executor/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ async def _fetch_url(
self, url: str, resource_description: str, logger: Any
) -> SerializedObject:
logger.info(f"fetching {resource_description}", url=url)
response = await self._client.get(url)
response: httpx.Response = await self._client.get(url)
try:
response.raise_for_status()
except httpx.HTTPStatusError as e:
Expand All @@ -153,13 +153,17 @@ async def _fetch_url(
)
raise

# We're hardcoding the content type currently used by Python SDK. It might change in the future.
# There's no other way for now to determine if the response is a bytes or string.
if response.headers["content-type"] == "application/octet-stream":
return SerializedObject(
bytes=response.content, content_type=response.headers["content-type"]
)
else:
return SerializedObject(
string=response.text, content_type=response.headers["content-type"]
)
return serialized_object_from_http_response(response)


def serialized_object_from_http_response(response: httpx.Response) -> SerializedObject:
# We're hardcoding the content type currently used by Python SDK. It might change in the future.
# There's no other way for now to determine if the response is a bytes or string.
if response.headers["content-type"] == "application/octet-stream":
return SerializedObject(
bytes=response.content, content_type=response.headers["content-type"]
)
else:
return SerializedObject(
string=response.text, content_type=response.headers["content-type"]
)
12 changes: 6 additions & 6 deletions python-sdk/indexify/executor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@ def __init__(
self._logger.info("running the extractor with TLS enabled")
protocol = "https"

self._server_addr = server_addr
self._base_url = f"{protocol}://{self._server_addr}"
self._code_path = code_path
self._function_worker = FunctionWorker(
function_executor_factory=ProcessFunctionExecutorFactory(
indexify_server_address=server_addr,
development_mode=development_mode,
config_path=config_path,
)
),
base_url=self._base_url,
config_path=config_path,
)
self._server_addr = server_addr
self._base_url = f"{protocol}://{self._server_addr}"
self._code_path = code_path
self._downloader = Downloader(
code_path=code_path, base_url=self._base_url, config_path=config_path
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import grpc

from indexify.common_util import get_httpx_client
from indexify.function_executor.proto.function_executor_pb2 import (
InitializeRequest,
InitializeResponse,
Expand All @@ -13,6 +14,7 @@

from .function_executor import FunctionExecutor
from .function_executor_factory import FunctionExecutorFactory
from .invocation_state_client import InvocationStateClient


class FunctionExecutorMap:
Expand All @@ -21,10 +23,17 @@ class FunctionExecutorMap:
The map is safe to use by multiple couroutines running in event loop on the same thread
but it's not thread safe (can't be used from different threads concurrently)."""

def __init__(self, factory: FunctionExecutorFactory):
def __init__(
self,
factory: FunctionExecutorFactory,
base_url: str,
config_path: Optional[str],
):
self._factory = factory
self._base_url = base_url
self._config_path = config_path
# Map of initialized Function executors ready to run tasks.
# function ID -> FunctionExecutor
# Function ID -> FunctionExecutor.
self._executors: Dict[str, FunctionExecutor] = {}
# We have to do all operations under this lock because we need to ensure
# that we don't create more Function Executors than required. This is important
Expand Down Expand Up @@ -52,6 +61,7 @@ async def get_or_create(
return self._executors[id]

executor: Optional[FunctionExecutor] = None
invocation_state_client: Optional[InvocationStateClient] = None
try:
executor = await self._factory.create(logger, state=initial_state)
channel: grpc.aio.Channel = await executor.channel()
Expand All @@ -61,7 +71,22 @@ async def get_or_create(
)
if not initialize_response.success:
raise Exception("initialize RPC failed at function executor")
invocation_state_client = InvocationStateClient(
stub=stub,
base_url=self._base_url,
http_client=get_httpx_client(
config_path=self._config_path, make_async=True
),
graph=initialize_request.graph_name,
namespace=initialize_request.namespace,
logger=logger,
)
await invocation_state_client.start()
# This is dirty but requires refactoring to implement properly.
initial_state.invocation_state_client = invocation_state_client
except Exception:
if invocation_state_client is not None:
await invocation_state_client.destroy()
if executor is not None:
await self._factory.destroy(executor=executor, logger=logger)
# Function Executor creation or initialization failed.
Expand All @@ -82,10 +107,14 @@ async def delete(
# Function Executor was already deleted or replaced and the caller is not aware of this.
return
del self._executors[id]
if function_executor.state().invocation_state_client is not None:
await function_executor.state().invocation_state_client.destroy()
await self._factory.destroy(executor=function_executor, logger=logger)

async def clear(self, logger):
async with self._executors_lock:
while self._executors:
id, function_executor = self._executors.popitem()
if function_executor.state().invocation_state_client is not None:
await function_executor.state().invocation_state_client.destroy()
await self._factory.destroy(function_executor, logger)
Loading
Loading