Skip to content

Commit

Permalink
Fix Null Value in PG (#3559)
Browse files Browse the repository at this point in the history
* k

* k

* k

* k

* k
  • Loading branch information
yuhongsun96 authored Dec 29, 2024
1 parent e2700b2 commit f4806da
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 8 deletions.
42 changes: 37 additions & 5 deletions backend/onyx/background/indexing/run_indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from onyx.configs.constants import MilestoneRecordType
from onyx.connectors.connector_runner import ConnectorRunner
from onyx.connectors.factory import instantiate_connector
from onyx.connectors.models import Document
from onyx.connectors.models import IndexAttemptMetadata
from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id
from onyx.db.connector_credential_pair import get_last_successful_attempt_time
Expand Down Expand Up @@ -90,6 +91,35 @@ def _get_connector_runner(
)


def strip_null_characters(doc_batch: list[Document]) -> list[Document]:
cleaned_batch = []
for doc in doc_batch:
cleaned_doc = doc.model_copy()

if "\x00" in cleaned_doc.id:
logger.warning(f"NUL characters found in document ID: {cleaned_doc.id}")
cleaned_doc.id = cleaned_doc.id.replace("\x00", "")

if "\x00" in cleaned_doc.semantic_identifier:
logger.warning(
f"NUL characters found in document semantic identifier: {cleaned_doc.semantic_identifier}"
)
cleaned_doc.semantic_identifier = cleaned_doc.semantic_identifier.replace(
"\x00", ""
)

for section in cleaned_doc.sections:
if section.link and "\x00" in section.link:
logger.warning(
f"NUL characters found in document link for document: {cleaned_doc.id}"
)
section.link = section.link.replace("\x00", "")

cleaned_batch.append(cleaned_doc)

return cleaned_batch


class ConnectorStopSignal(Exception):
"""A custom exception used to signal a stop in processing."""

Expand Down Expand Up @@ -238,7 +268,9 @@ def _run_indexing(
)

batch_description = []
for doc in doc_batch:

doc_batch_cleaned = strip_null_characters(doc_batch)
for doc in doc_batch_cleaned:
batch_description.append(doc.to_short_descriptor())

doc_size = 0
Expand All @@ -258,15 +290,15 @@ def _run_indexing(

# real work happens here!
new_docs, total_batch_chunks = indexing_pipeline(
document_batch=doc_batch,
document_batch=doc_batch_cleaned,
index_attempt_metadata=index_attempt_md,
)

batch_num += 1
net_doc_change += new_docs
chunk_count += total_batch_chunks
document_count += len(doc_batch)
all_connector_doc_ids.update(doc.id for doc in doc_batch)
document_count += len(doc_batch_cleaned)
all_connector_doc_ids.update(doc.id for doc in doc_batch_cleaned)

# commit transaction so that the `update` below begins
# with a brand new transaction. Postgres uses the start
Expand All @@ -276,7 +308,7 @@ def _run_indexing(
db_session.commit()

if callback:
callback.progress("_run_indexing", len(doc_batch))
callback.progress("_run_indexing", len(doc_batch_cleaned))

# This new value is updated every batch, so UI can refresh per batch update
update_docs_indexed(
Expand Down
28 changes: 25 additions & 3 deletions backend/onyx/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
from onyx.db.enums import IndexModelStatus
from onyx.db.enums import TaskStatus
from onyx.db.pydantic_type import PydanticType
from onyx.utils.logger import setup_logger
from onyx.utils.special_types import JSON_ro
from onyx.file_store.models import FileDescriptor
from onyx.llm.override_models import LLMOverride
Expand All @@ -65,13 +66,17 @@
from shared_configs.enums import EmbeddingProvider
from shared_configs.enums import RerankerProvider

logger = setup_logger()


class Base(DeclarativeBase):
__abstract__ = True


class EncryptedString(TypeDecorator):
impl = LargeBinary
# This type's behavior is fully deterministic and doesn't depend on any external factors.
cache_ok = True

def process_bind_param(self, value: str | None, dialect: Dialect) -> bytes | None:
if value is not None:
Expand All @@ -86,6 +91,8 @@ def process_result_value(self, value: bytes | None, dialect: Dialect) -> str | N

class EncryptedJson(TypeDecorator):
impl = LargeBinary
# This type's behavior is fully deterministic and doesn't depend on any external factors.
cache_ok = True

def process_bind_param(self, value: dict | None, dialect: Dialect) -> bytes | None:
if value is not None:
Expand All @@ -102,6 +109,21 @@ def process_result_value(
return value


class NullFilteredString(TypeDecorator):
impl = String
# This type's behavior is fully deterministic and doesn't depend on any external factors.
cache_ok = True

def process_bind_param(self, value: str | None, dialect: Dialect) -> str | None:
if value is not None and "\x00" in value:
logger.warning(f"NUL characters found in value: {value}")
return value.replace("\x00", "")
return value

def process_result_value(self, value: str | None, dialect: Dialect) -> str | None:
return value


"""
Auth/Authz (users, permissions, access) Tables
"""
Expand Down Expand Up @@ -451,16 +473,16 @@ class Document(Base):

# this should correspond to the ID of the document
# (as is passed around in Onyx)
id: Mapped[str] = mapped_column(String, primary_key=True)
id: Mapped[str] = mapped_column(NullFilteredString, primary_key=True)
from_ingestion_api: Mapped[bool] = mapped_column(
Boolean, default=False, nullable=True
)
# 0 for neutral, positive for mostly endorse, negative for mostly reject
boost: Mapped[int] = mapped_column(Integer, default=DEFAULT_BOOST)
hidden: Mapped[bool] = mapped_column(Boolean, default=False)
semantic_id: Mapped[str] = mapped_column(String)
semantic_id: Mapped[str] = mapped_column(NullFilteredString)
# First Section's link
link: Mapped[str | None] = mapped_column(String, nullable=True)
link: Mapped[str | None] = mapped_column(NullFilteredString, nullable=True)

# The updated time is also used as a measure of the last successful state of the doc
# pulled from the source (to help skip reindexing already updated docs in case of
Expand Down

0 comments on commit f4806da

Please sign in to comment.