-
Notifications
You must be signed in to change notification settings - Fork 120
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Executor: force Function Executor RunTask concurrency=1
Previously before in-Process Function Executor was introduced max task concurrency in Executor was 1. We bring the previous behavior here but enforce the concurrency of 1 per Function Executor (not for the whole Executor). It's better this way cause it's hard for customers to reason about implications of running multiple concurrent tasks per Function Executor. Also added tests that verify task concurrency for the same and different functions. We'll revise this concurrency policy in the future if we allow customers to configure their functions' concurrency explicitly via function attributes. I also did a big refactoring which results in task policy implementation simplification. This is how the current task policies are implemented now: ``` await state.wait_running_tasks_less(1) if state.function_id_with_version != _function_id_with_version(task): await state.destroy_function_executor() state.function_id_with_version = _function_id_with_version(task) ``` There also many other refactorings in this PR that help to implement the upcoming features in Executor and simplify the code base. Testing: make check make test
- Loading branch information
Showing
20 changed files
with
805 additions
and
583 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
139 changes: 120 additions & 19 deletions
139
python-sdk/indexify/executor/function_executor/function_executor.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,32 +1,133 @@ | ||
import asyncio | ||
from typing import Any, Optional | ||
|
||
import grpc | ||
|
||
# Timeout for Function Executor startup in seconds. | ||
# The timeout is counted from the moment when the Function Executor environment | ||
# is fully prepared and the Function Executor gets started. | ||
FUNCTION_EXECUTOR_READY_TIMEOUT_SEC = 5 | ||
from indexify.common_util import get_httpx_client | ||
from indexify.function_executor.proto.function_executor_pb2 import ( | ||
InitializeRequest, | ||
InitializeResponse, | ||
) | ||
from indexify.function_executor.proto.function_executor_pb2_grpc import ( | ||
FunctionExecutorStub, | ||
) | ||
|
||
from .invocation_state_client import InvocationStateClient | ||
from .server.function_executor_server import ( | ||
FUNCTION_EXECUTOR_SERVER_READY_TIMEOUT_SEC, | ||
FunctionExecutorServer, | ||
) | ||
from .server.function_executor_server_factory import ( | ||
FunctionExecutorServerConfiguration, | ||
FunctionExecutorServerFactory, | ||
) | ||
|
||
|
||
class FunctionExecutor: | ||
"""Abstract interface for a FunctionExecutor. | ||
"""Executor side class supporting a running FunctionExecutorServer. | ||
FunctionExecutor primary responsibility is creation and initialization | ||
of all resources associated with a particular Function Executor Server | ||
including the Server itself. FunctionExecutor owns all these resources | ||
and provides other Executor components with access to them. | ||
FunctionExecutor is a class that executes tasks for a particular function. | ||
FunctionExecutor implements the gRPC server that listens for incoming tasks. | ||
Addition of any business logic besides resource management is discouraged. | ||
Please add such logic to other classes managed by this class. | ||
""" | ||
|
||
async def channel(self) -> grpc.aio.Channel: | ||
"""Returns a async gRPC channel to the Function Executor. | ||
def __init__(self, server_factory: FunctionExecutorServerFactory, logger: Any): | ||
self._server_factory: FunctionExecutorServerFactory = server_factory | ||
self._logger = logger.bind(module=__name__) | ||
self._server: Optional[FunctionExecutorServer] = None | ||
self._channel: Optional[grpc.aio.Channel] = None | ||
self._invocation_state_client: Optional[InvocationStateClient] = None | ||
self._initialized = False | ||
|
||
async def initialize( | ||
self, | ||
config: FunctionExecutorServerConfiguration, | ||
initialize_request: InitializeRequest, | ||
base_url: str, | ||
config_path: Optional[str], | ||
): | ||
"""Creates and initializes a FunctionExecutorServer and all resources associated with it.""" | ||
try: | ||
self._server = await self._server_factory.create( | ||
config=config, logger=self._logger | ||
) | ||
self._channel = await self._server.create_channel(self._logger) | ||
await _channel_ready(self._channel) | ||
|
||
stub: FunctionExecutorStub = FunctionExecutorStub(self._channel) | ||
await _initialize_server(stub, initialize_request) | ||
|
||
self._invocation_state_client = InvocationStateClient( | ||
stub=stub, | ||
base_url=base_url, | ||
http_client=get_httpx_client(config_path=config_path, make_async=True), | ||
graph=initialize_request.graph_name, | ||
namespace=initialize_request.namespace, | ||
logger=self._logger, | ||
) | ||
await self._invocation_state_client.start() | ||
|
||
self._initialized = True | ||
except Exception: | ||
await self.destroy() | ||
raise | ||
|
||
def channel(self) -> grpc.aio.Channel: | ||
self._check_initialized() | ||
return self._channel | ||
|
||
def invocation_state_client(self) -> InvocationStateClient: | ||
self._check_initialized() | ||
return self._invocation_state_client | ||
|
||
async def destroy(self): | ||
"""Destroys all resources owned by this FunctionExecutor. | ||
Never raises any exceptions but logs them.""" | ||
try: | ||
if self._invocation_state_client is not None: | ||
await self._invocation_state_client.destroy() | ||
self._invocation_state_client = None | ||
except Exception as e: | ||
self._logger.error( | ||
"failed to destroy FunctionExecutor invocation state client", exc_info=e | ||
) | ||
|
||
try: | ||
if self._channel is not None: | ||
await self._channel.close() | ||
self._channel = None | ||
except Exception as e: | ||
self._logger.error( | ||
"failed to close FunctionExecutorServer channel", exc_info=e | ||
) | ||
|
||
try: | ||
if self._server is not None: | ||
await self._server_factory.destroy(self._server, self._logger) | ||
self._server = None | ||
except Exception as e: | ||
self._logger.error("failed to destroy FunctionExecutorServer", exc_info=e) | ||
|
||
def _check_initialized(self): | ||
if not self._initialized: | ||
raise RuntimeError("FunctionExecutor is not initialized") | ||
|
||
|
||
The channel is in ready state and can be used for all gRPC communication with the Function Executor | ||
and can be shared among coroutines running in the same event loop in the same thread. Users should | ||
not close the channel as it's reused for all requests. | ||
Raises Exception if an error occurred.""" | ||
raise NotImplementedError | ||
async def _channel_ready(channel: grpc.aio.Channel): | ||
await asyncio.wait_for( | ||
channel.channel_ready(), | ||
timeout=FUNCTION_EXECUTOR_SERVER_READY_TIMEOUT_SEC, | ||
) | ||
|
||
def state(self) -> Optional[Any]: | ||
"""Returns optional state object. | ||
|
||
The state object can be used to associate any data with the Function Executor. | ||
""" | ||
raise NotImplementedError | ||
async def _initialize_server( | ||
stub: FunctionExecutorStub, initialize_request: InitializeRequest | ||
): | ||
initialize_response: InitializeResponse = await stub.initialize(initialize_request) | ||
if not initialize_response.success: | ||
raise Exception("initialize RPC failed at function executor server") |
26 changes: 0 additions & 26 deletions
26
python-sdk/indexify/executor/function_executor/function_executor_factory.py
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.