Skip to content

Commit

Permalink
Merge pull request #3599 from onyx-dot-app/bugfix/doc-sync
Browse files Browse the repository at this point in the history
quick hack to prevent resyncing the same doc
  • Loading branch information
rkuo-danswer authored Jan 5, 2025
2 parents af2061c + 371d1cc commit 2fc5825
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 0 deletions.
7 changes: 7 additions & 0 deletions backend/onyx/background/celery/tasks/vespa/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,13 @@ def vespa_metadata_sync_task(
# the sync might repeat again later
mark_document_as_synced(document_id, db_session)

redis_syncing_key = RedisConnectorCredentialPair.make_redis_syncing_key(
document_id
)
r = get_redis_client(tenant_id=tenant_id)
r.delete(redis_syncing_key)
# r.hdel(RedisConnectorCredentialPair.SYNCING_HASH, document_id)

task_logger.info(f"doc={document_id} action=sync chunks={chunks_affected}")
except SoftTimeLimitExceeded:
task_logger.info(f"SoftTimeLimitExceeded exception. doc={document_id}")
Expand Down
25 changes: 25 additions & 0 deletions backend/onyx/redis/redis_connector_credential_pair.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
FENCE_PREFIX = PREFIX + "_fence"
TASKSET_PREFIX = PREFIX + "_taskset"

# SYNCING_HASH = PREFIX + ":vespa_syncing"
SYNCING_PREFIX = PREFIX + ":vespa_syncing"

def __init__(self, tenant_id: str | None, id: int) -> None:
super().__init__(tenant_id, str(id))

Expand All @@ -56,6 +59,10 @@ def set_skip_docs(self, skip_docs: set[str]) -> None:
# the list on the fly
self.skip_docs = skip_docs

@staticmethod
def make_redis_syncing_key(doc_id: str) -> str:
return f"{RedisConnectorCredentialPair.SYNCING_PREFIX}:{doc_id}"

def generate_tasks(
self,
celery_app: Celery,
Expand All @@ -64,6 +71,9 @@ def generate_tasks(
lock: RedisLock,
tenant_id: str | None,
) -> tuple[int, int] | None:
# an arbitrary number in seconds to prevent the same doc from syncing repeatedly
SYNC_EXPIRATION = 24 * 60 * 60

last_lock_time = time.monotonic()

async_results = []
Expand Down Expand Up @@ -92,6 +102,14 @@ def generate_tasks(
if doc.id in self.skip_docs:
continue

# is the document sync already queued?
# if redis_client.hexists(doc.id):
# continue

redis_syncing_key = self.make_redis_syncing_key(doc.id)
if redis_client.exists(redis_syncing_key):
continue

# celery's default task id format is "dd32ded3-00aa-4884-8b21-42f8332e7fac"
# the key for the result is "celery-task-meta-dd32ded3-00aa-4884-8b21-42f8332e7fac"
# we prefix the task id so it's easier to keep track of who created the task
Expand All @@ -104,6 +122,13 @@ def generate_tasks(
RedisConnectorCredentialPair.get_taskset_key(), custom_task_id
)

# track the doc.id in redis so that we don't resubmit it repeatedly
# redis_client.hset(
# self.SYNCING_HASH, doc.id, custom_task_id
# )

redis_client.set(redis_syncing_key, custom_task_id, ex=SYNC_EXPIRATION)

# Priority on sync's triggered by new indexing should be medium
result = celery_app.send_task(
OnyxCeleryTask.VESPA_METADATA_SYNC_TASK,
Expand Down
3 changes: 3 additions & 0 deletions backend/onyx/redis/redis_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ def __getattribute__(self, item: str) -> Any:
"sadd",
"srem",
"scard",
"hexists",
"hset",
"hdel",
] # Regular methods that need simple prefixing

if item == "scan_iter":
Expand Down
2 changes: 2 additions & 0 deletions backend/onyx/redis/redis_usergroup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
from onyx.db.models import Document
from onyx.redis.redis_object_helper import RedisObjectHelper
from onyx.utils.variable_functionality import fetch_versioned_implementation
from onyx.utils.variable_functionality import global_version
Expand Down Expand Up @@ -73,6 +74,7 @@ def generate_tasks(

stmt = construct_document_select_by_usergroup(int(self._id))
for doc in db_session.scalars(stmt).yield_per(1):
doc = cast(Document, doc)
current_time = time.monotonic()
if current_time - last_lock_time >= (
CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT / 4
Expand Down

0 comments on commit 2fc5825

Please sign in to comment.