Skip to content

Commit

Permalink
permission sync in cloud and beat expiry adjustment (#3544)
Browse files Browse the repository at this point in the history
* try fixing exception in cloud

* raise beat expiry ... 60 seconds might be starving certain tasks completely

* adjust expiry down to 10 min

* raise concurrency overflow for indexing worker.

* parent pid check

* fix comment

* fix parent pid check, also actually raise an exception from the task if the spawned task exit status is bad

* fix pid check

* some cleanup and task wait fixes

* review fixes

* comment some code so we don't change too many things at once

---------

Co-authored-by: Richard Kuo (Danswer) <[email protected]>
Co-authored-by: Richard Kuo <[email protected]>
  • Loading branch information
3 people authored Dec 31, 2024
1 parent a83c9b4 commit bec0f9f
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 49 deletions.
7 changes: 6 additions & 1 deletion backend/onyx/background/celery/apps/indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,12 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
logger.info(f"Multiprocessing start method: {multiprocessing.get_start_method()}")

SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_APP_NAME)
SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=sender.concurrency)

# rkuo: been seeing transient connection exceptions here, so upping the connection count
# from just concurrency/concurrency to concurrency/concurrency*2
SqlEngine.init_engine(
pool_size=sender.concurrency, max_overflow=sender.concurrency * 2
)

app_base.wait_for_redis(sender, **kwargs)
app_base.wait_for_db(sender, **kwargs)
Expand Down
5 changes: 4 additions & 1 deletion backend/onyx/background/celery/configs/indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
# Indexing worker specific ... this lets us track the transition to STARTED in redis
# We don't currently rely on this but it has the potential to be useful and
# indexing tasks are not high volume
task_track_started = True

# we don't turn this on yet because celery occasionally runs tasks more than once
# which means a duplicate run might change the task state unexpectedly
# task_track_started = True

worker_concurrency = CELERY_WORKER_INDEXING_CONCURRENCY
worker_pool = "threads"
Expand Down
22 changes: 14 additions & 8 deletions backend/onyx/background/celery/tasks/beat_schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryTask

# choosing 15 minutes because it roughly gives us enough time to process many tasks
# we might be able to reduce this greatly if we can run a unified
# loop across all tenants rather than tasks per tenant

BEAT_EXPIRES_DEFAULT = 15 * 60 # 15 minutes (in seconds)

# we set expires because it isn't necessary to queue up these tasks
# it's only important that they run relatively regularly
tasks_to_schedule = [
Expand All @@ -13,7 +19,7 @@
"schedule": timedelta(seconds=20),
"options": {
"priority": OnyxCeleryPriority.HIGH,
"expires": 60,
"expires": BEAT_EXPIRES_DEFAULT,
},
},
{
Expand All @@ -22,7 +28,7 @@
"schedule": timedelta(seconds=20),
"options": {
"priority": OnyxCeleryPriority.HIGH,
"expires": 60,
"expires": BEAT_EXPIRES_DEFAULT,
},
},
{
Expand All @@ -31,7 +37,7 @@
"schedule": timedelta(seconds=15),
"options": {
"priority": OnyxCeleryPriority.HIGH,
"expires": 60,
"expires": BEAT_EXPIRES_DEFAULT,
},
},
{
Expand All @@ -40,7 +46,7 @@
"schedule": timedelta(seconds=15),
"options": {
"priority": OnyxCeleryPriority.HIGH,
"expires": 60,
"expires": BEAT_EXPIRES_DEFAULT,
},
},
{
Expand All @@ -49,7 +55,7 @@
"schedule": timedelta(seconds=3600),
"options": {
"priority": OnyxCeleryPriority.LOWEST,
"expires": 60,
"expires": BEAT_EXPIRES_DEFAULT,
},
},
{
Expand All @@ -58,7 +64,7 @@
"schedule": timedelta(seconds=5),
"options": {
"priority": OnyxCeleryPriority.HIGH,
"expires": 60,
"expires": BEAT_EXPIRES_DEFAULT,
},
},
{
Expand All @@ -67,7 +73,7 @@
"schedule": timedelta(seconds=30),
"options": {
"priority": OnyxCeleryPriority.HIGH,
"expires": 60,
"expires": BEAT_EXPIRES_DEFAULT,
},
},
{
Expand All @@ -76,7 +82,7 @@
"schedule": timedelta(seconds=20),
"options": {
"priority": OnyxCeleryPriority.HIGH,
"expires": 60,
"expires": BEAT_EXPIRES_DEFAULT,
},
},
]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import time
from datetime import datetime
from datetime import timedelta
from datetime import timezone
from time import sleep
from uuid import uuid4

from celery import Celery
Expand All @@ -18,6 +20,7 @@
from onyx.background.celery.apps.app_base import task_logger
from onyx.configs.app_configs import JOB_TIMEOUT
from onyx.configs.constants import CELERY_PERMISSIONS_SYNC_LOCK_TIMEOUT
from onyx.configs.constants import CELERY_TASK_WAIT_FOR_FENCE_TIMEOUT
from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import DANSWER_REDIS_FUNCTION_LOCK_PREFIX
from onyx.configs.constants import DocumentSource
Expand Down Expand Up @@ -91,7 +94,7 @@ def _is_external_doc_permissions_sync_due(cc_pair: ConnectorCredentialPair) -> b
def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> None:
r = get_redis_client(tenant_id=tenant_id)

lock_beat = r.lock(
lock_beat: RedisLock = r.lock(
OnyxRedisLocks.CHECK_CONNECTOR_DOC_PERMISSIONS_SYNC_BEAT_LOCK,
timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT,
)
Expand Down Expand Up @@ -219,6 +222,43 @@ def connector_permission_sync_generator_task(

r = get_redis_client(tenant_id=tenant_id)

# this wait is needed to avoid a race condition where
# the primary worker sends the task and it is immediately executed
# before the primary worker can finalize the fence
start = time.monotonic()
while True:
if time.monotonic() - start > CELERY_TASK_WAIT_FOR_FENCE_TIMEOUT:
raise ValueError(
f"connector_permission_sync_generator_task - timed out waiting for fence to be ready: "
f"fence={redis_connector.permissions.fence_key}"
)

if not redis_connector.permissions.fenced: # The fence must exist
raise ValueError(
f"connector_permission_sync_generator_task - fence not found: "
f"fence={redis_connector.permissions.fence_key}"
)

payload = redis_connector.permissions.payload # The payload must exist
if not payload:
raise ValueError(
"connector_permission_sync_generator_task: payload invalid or not found"
)

if payload.celery_task_id is None:
logger.info(
f"connector_permission_sync_generator_task - Waiting for fence: "
f"fence={redis_connector.permissions.fence_key}"
)
sleep(1)
continue

logger.info(
f"connector_permission_sync_generator_task - Fence found, continuing...: "
f"fence={redis_connector.permissions.fence_key}"
)
break

lock: RedisLock = r.lock(
OnyxRedisLocks.CONNECTOR_DOC_PERMISSIONS_SYNC_LOCK_PREFIX
+ f"_{redis_connector.id}",
Expand Down Expand Up @@ -254,8 +294,11 @@ def connector_permission_sync_generator_task(
if not payload:
raise ValueError(f"No fence payload found: cc_pair={cc_pair_id}")

payload.started = datetime.now(timezone.utc)
redis_connector.permissions.set_fence(payload)
new_payload = RedisConnectorPermissionSyncPayload(
started=datetime.now(timezone.utc),
celery_task_id=payload.celery_task_id,
)
redis_connector.permissions.set_fence(new_payload)

document_external_accesses: list[DocExternalAccess] = doc_sync_func(cc_pair)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def _is_external_group_sync_due(cc_pair: ConnectorCredentialPair) -> bool:
def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> None:
r = get_redis_client(tenant_id=tenant_id)

lock_beat = r.lock(
lock_beat: RedisLock = r.lock(
OnyxRedisLocks.CHECK_CONNECTOR_EXTERNAL_GROUP_SYNC_BEAT_LOCK,
timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT,
)
Expand Down Expand Up @@ -162,7 +162,7 @@ def try_creating_external_group_sync_task(

LOCK_TIMEOUT = 30

lock = r.lock(
lock: RedisLock = r.lock(
DANSWER_REDIS_FUNCTION_LOCK_PREFIX + "try_generate_external_group_sync_tasks",
timeout=LOCK_TIMEOUT,
)
Expand Down
Loading

0 comments on commit bec0f9f

Please sign in to comment.