Skip to content

Commit

Permalink
Merge pull request #4 from mnemonica-ai/development
Browse files Browse the repository at this point in the history
Release 0.0.5
  • Loading branch information
p1nox authored Apr 29, 2024
2 parents 407b744 + a369a09 commit e454f17
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 4 deletions.
1 change: 1 addition & 0 deletions oshepherd/worker/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def create_celery_app(config: WorkerConfig):
)
celery_app.conf.update(
result_expires=config.RESULTS_EXPIRES,
broker_transport_options=config.BROKER_TRANSPORT_OPTIONS,
)
celery_app.autodiscover_tasks([TASKS_MODULE])

Expand Down
6 changes: 6 additions & 0 deletions oshepherd/worker/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,9 @@ class WorkerConfig(BaseModel):
CONCURRENCY: Optional[int] = 1
PREFETCH_MULTIPLIER: Optional[int] = 1
RESULTS_EXPIRES: Optional[int] = 3600
BROKER_TRANSPORT_OPTIONS: Optional[dict] = {
"max_retries": 5,
"interval_start": 0,
"interval_step": 0.1,
"interval_max": 0.5,
}
53 changes: 53 additions & 0 deletions oshepherd/worker/ollama_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from celery import Task as CeleryTask
from redis.exceptions import ConnectionError as RedisConnectionError
from amqp.exceptions import RecoverableConnectionError as AMQPConnectionError
from httpx import ConnectError


class OllamaCeleryTask(CeleryTask):
"""
Base Celery Task class for Ollama tasks.
Handling error events related to connectivity issues in each worker:
1. RedisConnectionError: `redis.exceptions.ConnectionError: Error while reading from 104 Connection reset by peer`
2. AMQPConnectionError: `amqp.exceptions.RecoverableConnectionError: Socket was disconnected`
3. ConnectionResetError: `ConnectionResetError: [Errno 104] Connection reset by peer`
4. TimeoutError: `TimeoutError: [Errno 110] Connection timed out`
5. ConnectError: `httpx.ConnectError: [Errno 61] Connection refused`
"""

autoretry_for = (
RedisConnectionError,
AMQPConnectionError,
ConnectionResetError,
TimeoutError,
ConnectError,
)
retry_kwargs = { # Retry up to 5 times, with a 1 second delay between retries
"max_retries": 5,
"countdown": 1,
}
retry_backoff = True # Enable exponential backoff
retry_backoff_max = 60 # Maximum retry delay in seconds
retry_jitter = True # Add a random jitter to delay to prevent all tasks from retrying at the same time

def refresh_connections(self):
with self.app.connection() as connection:
print(" > Refresh broker connection")
connection.ensure_connection(max_retries=3)

if hasattr(self.app.backend, "ensure_connection"):
print(" > Refresh backend connection")
self.app.backend.ensure_connection(max_retries=3)

def on_retry(self, exc, task_id, args, kwargs, einfo):
print(f"Retrying task {self.name}:{task_id}, attempt {self.request.retries}")
self.refresh_connections()

def on_success(self, retval, task_id, args, kwargs):
print(f"Completed task {self.name}:{task_id} successfully")

def on_failure(self, exc, task_id, args, kwargs, einfo):
print(f"Failed task {self.name}:{task_id} due to {exc} - {einfo}")

def run(self, *args, **kwargs):
raise NotImplementedError("Tasks must implement its run method")
12 changes: 10 additions & 2 deletions oshepherd/worker/tasks.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import json
import ollama
from oshepherd.worker.app import celery_app
from oshepherd.worker.ollama_task import OllamaCeleryTask


@celery_app.task(name="oshepherd.worker.tasks.make_generate_request")
def make_generate_request(request_str: str):
@celery_app.task(
name="oshepherd.worker.tasks.make_generate_request",
bind=True,
base=OllamaCeleryTask,
)
def make_generate_request(self, request_str: str):
try:
request = json.loads(request_str)
print(f"# make_generate_request request {request}")
Expand All @@ -20,4 +25,7 @@ def make_generate_request(request_str: str):
f" * error response {response}",
)

# Rethrow exception in order to be handled by base class
raise

return response
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "oshepherd"
version = "0.0.4"
version = "0.0.5"
description = "The Oshepherd guiding the Ollama(s) inference orchestration."
readme = "README.md"
authors = [
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setup(
name="oshepherd",
version="0.0.4",
version="0.0.5",
description="The Oshepherd guiding the Ollama(s) inference orchestration.",
long_description=open('README.md').read(),
long_description_content_type='text/markdown',
Expand Down

0 comments on commit e454f17

Please sign in to comment.