Skip to content

Commit

Permalink
tighter signaling to prevent indexing cleanup from hitting tasks that…
Browse files Browse the repository at this point in the history
… are just starting (#2867)

* better indexing synchronization

* add logging for fence wait

* handle the task not creating

* add more logging
  • Loading branch information
rkuo-danswer authored Oct 21, 2024
1 parent c516f35 commit 6aad114
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 5 deletions.
52 changes: 49 additions & 3 deletions backend/danswer/background/celery/tasks/indexing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,19 @@ def try_creating_indexing_task(

custom_task_id = f"{rci.generator_task_id_prefix}_{uuid4()}"

# create the index attempt ... just for tracking purposes
# set a basic fence to start
fence_value = RedisConnectorIndexingFenceData(
index_attempt_id=None,
started=None,
submitted=datetime.now(timezone.utc),
celery_task_id=None,
)
r.set(rci.fence_key, fence_value.model_dump_json())

# create the index attempt for tracking purposes
# code elsewhere checks for index attempts without an associated redis key
# and cleans them up
# therefore we must create the attempt and the task after the fence goes up
index_attempt_id = create_index_attempt(
cc_pair.id,
search_settings.id,
Expand All @@ -276,17 +288,19 @@ def try_creating_indexing_task(
priority=DanswerCeleryPriority.MEDIUM,
)
if not result:
return None
raise RuntimeError("send_task for connector_indexing_proxy_task failed.")

# set this only after all tasks have been added
# now fill out the fence with the rest of the data
fence_value = RedisConnectorIndexingFenceData(
index_attempt_id=index_attempt_id,
started=None,
submitted=datetime.now(timezone.utc),
celery_task_id=result.id,
)

r.set(rci.fence_key, fence_value.model_dump_json())
except Exception:
r.delete(rci.fence_key)
task_logger.exception("Unexpected exception")
return None
finally:
Expand Down Expand Up @@ -371,6 +385,38 @@ def connector_indexing_task(

rci = RedisConnectorIndexing(cc_pair_id, search_settings_id)

while True:
# read related data and evaluate/print task progress
fence_value = cast(bytes, r.get(rci.fence_key))
if fence_value is None:
task_logger.info(
f"connector_indexing_task: fence_value not found: fence={rci.fence_key}"
)
raise

try:
fence_json = fence_value.decode("utf-8")
fence_data = RedisConnectorIndexingFenceData.model_validate_json(
cast(str, fence_json)
)
except ValueError:
task_logger.exception(
f"connector_indexing_task: fence_data not decodeable: fence={rci.fence_key}"
)
raise

if fence_data.index_attempt_id is None or fence_data.celery_task_id is None:
task_logger.info(
f"connector_indexing_task - Waiting for fence: fence={rci.fence_key}"
)
sleep(1)
continue

task_logger.info(
f"connector_indexing_task - Fence found, continuing...: fence={rci.fence_key}"
)
break

lock = r.lock(
rci.generator_lock_key,
timeout=CELERY_INDEXING_LOCK_TIMEOUT,
Expand Down
4 changes: 2 additions & 2 deletions backend/danswer/background/celery/tasks/shared/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@


class RedisConnectorIndexingFenceData(BaseModel):
index_attempt_id: int
index_attempt_id: int | None
started: datetime | None
submitted: datetime
celery_task_id: str
celery_task_id: str | None


@shared_task(
Expand Down
4 changes: 4 additions & 0 deletions backend/danswer/background/celery/tasks/vespa/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,10 @@ def monitor_ccpair_indexing_taskset(
"monitor_ccpair_indexing_taskset: generator_progress_value is not an integer."
)

if fence_data.index_attempt_id is None or fence_data.celery_task_id is None:
# the task is still setting up
return

# Read result state BEFORE generator_complete_key to avoid a race condition
result: AsyncResult = AsyncResult(fence_data.celery_task_id)
result_state = result.state
Expand Down

0 comments on commit 6aad114

Please sign in to comment.