diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 26ba5b81..ed85baaf 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -17,7 +17,9 @@ app/api/api_v1/routers/pipeline_trigger.py @climatepolicyradar/deng tests/unit/app/core/test_pipeline.py @climatepolicyradar/deng tests/non_search/app/test_pipeline_trigger.py @climatepolicyradar/deng -app/core/ingestion/ @climatepolicyradar/deng +app/repository/pipeline.py @climatepolicyradar/deng +app/service/pipeline.py @climatepolicyradar/deng +app/service/pipeline.sql @climatepolicyradar/deng tests/search/search_fixtures/vespa_test_schema/ @climatepolicyradar/deng app/api/api_v1/routers/search.py @climatepolicyradar/tech-devs tests/search/ @climatepolicyradar/tech-devs diff --git a/.gitignore b/.gitignore index c58ef921..ca3845eb 100644 --- a/.gitignore +++ b/.gitignore @@ -196,3 +196,5 @@ requirements.txt # Vespa secrets directory secrets/* + +scripts/*.json diff --git a/app/api/api_v1/routers/pipeline_trigger.py b/app/api/api_v1/routers/pipeline_trigger.py index 13019b41..37463095 100644 --- a/app/api/api_v1/routers/pipeline_trigger.py +++ b/app/api/api_v1/routers/pipeline_trigger.py @@ -6,9 +6,9 @@ from app.api.api_v1.schemas.document import BulkIngestResult from app.clients.aws.client import S3Client, get_s3_client from app.clients.db.session import get_db -from app.core.ingestion.pipeline import generate_pipeline_ingest_input from app.core.validation.util import get_new_s3_prefix, write_documents_to_s3 from app.service.auth import get_superuser_details +from app.service.pipeline import get_db_state_content _LOGGER = logging.getLogger(__name__) @@ -22,11 +22,11 @@ def _start_ingest( ): """Writes a db state file to s3 which will trigger an ingest.""" try: - pipeline_ingest_input = generate_pipeline_ingest_input(db) + pipeline_ingest_input_content = get_db_state_content(db) write_documents_to_s3( s3_client=s3_client, s3_prefix=s3_prefix, - documents=pipeline_ingest_input, + content=pipeline_ingest_input_content, ) except Exception as e: _LOGGER.exception( diff --git a/app/api/api_v1/schemas/document.py b/app/api/api_v1/schemas/document.py index 91102978..44c9dfed 100644 --- a/app/api/api_v1/schemas/document.py +++ b/app/api/api_v1/schemas/document.py @@ -77,11 +77,12 @@ def _filter_climate_laws_url_from_source(cls, v): class FamilyContext(BaseModel): - """Used to given the family context when returning a FamilyDocument""" + """Used to give the family context when returning a FamilyDocument""" title: str import_id: str - geography: str + geography: str # Keep this for backward compatibility PDCT-1440 + geographies: list[str] category: str slug: str published_date: Optional[datetime] = None @@ -103,7 +104,8 @@ class FamilyAndDocumentsResponse(BaseModel): import_id: str title: str summary: str - geography: str + geography: str # Keep this for backward compatibility PDCT-1440 + geographies: list[str] category: str status: str metadata: dict diff --git a/app/core/download.py b/app/core/download.py index f41caf07..d2061530 100644 --- a/app/core/download.py +++ b/app/core/download.py @@ -1,5 +1,6 @@ """Functions to support browsing the RDS document structure""" +import os import zipfile from functools import lru_cache from io import BytesIO, StringIO @@ -16,7 +17,7 @@ @lru_cache() def _get_query_template(): - with open("./app/core/download.sql", "r") as file: + with open(os.path.join("app", "core", "download.sql"), "r") as file: return file.read() diff --git a/app/core/ingestion/match.py b/app/core/ingestion/match.py deleted file mode 100644 index bfbc1a0c..00000000 --- a/app/core/ingestion/match.py +++ /dev/null @@ -1,86 +0,0 @@ -import re -from typing import Optional, Set - -REGEX_ENDS_WITH_NUMBER = re.compile(r"(\D+)(\d+)$") - - -def _match_icase(unknown_value: str, allowed_set: Set) -> Optional[str]: - def try_case(value: str): - return value.upper() == unknown_value.upper() - - match = list(filter(try_case, allowed_set)) - if len(match) > 0: - return match[0] - - return None - - -def match_unknown_value(unknown_value: str, allowed_set: Set) -> Optional[str]: - # Just try a case insensitive match - match = _match_icase(unknown_value, allowed_set) - if match: - return match - - # Try with a plural - good for EV - match = _match_icase(unknown_value + "s", allowed_set) - if match: - return match - - # Try with no "ation" good for Transportation - if unknown_value.endswith("ation"): - match = _match_icase(unknown_value[0:-5], allowed_set) - if match: - return match - - # Try hyphenating trailing numbers - good for Covid19 - ends_with_number = REGEX_ENDS_WITH_NUMBER.match(unknown_value) - - if ends_with_number: - hyphenated_number = ( - ends_with_number.groups()[0].strip() + "-" + ends_with_number.groups()[1] - ) - - match = _match_icase(hyphenated_number, allowed_set) - if match: - return match - - # Try without an "es" ending - if unknown_value.endswith("es"): - no_plural = unknown_value[0:-2] - - match = _match_icase(no_plural, allowed_set) - if match: - return match - - # Try stripping any spaces - if " " in unknown_value: - no_spaces = unknown_value.replace(" ", "") - match = _match_icase(no_spaces, allowed_set) - if match: - return match - - # Try hyphenating Co... - if unknown_value.upper().startswith("CO"): - hyphenated_co = "Co-" + unknown_value[2:].strip() - match = _match_icase(hyphenated_co, allowed_set) - if match: - return match - - # Try hyphenating multi - if unknown_value.upper().startswith("MULTI "): - hyphenated_multi = "Multi-" + unknown_value[5:].strip() - match = _match_icase(hyphenated_multi, allowed_set) - if match: - return match - - # Try adding brackets to multi words - words = unknown_value.split(" ") - if len(words) > 2: - abbrev = "".join([w[0] for w in words]) - - with_abbrev = f"{unknown_value} ({abbrev})" - match = _match_icase(with_abbrev, allowed_set) - if match: - return match - - return None diff --git a/app/core/ingestion/pipeline.py b/app/core/ingestion/pipeline.py deleted file mode 100644 index 809ba061..00000000 --- a/app/core/ingestion/pipeline.py +++ /dev/null @@ -1,143 +0,0 @@ -import logging -from datetime import datetime, timezone -from typing import Sequence, Tuple, cast - -from db_client.models.dfce import DocumentStatus -from db_client.models.dfce.family import ( - Corpus, - Family, - FamilyCorpus, - FamilyDocument, - PhysicalDocument, -) -from db_client.models.dfce.metadata import FamilyMetadata -from db_client.models.organisation import Organisation -from sqlalchemy.orm import Session - -from app.api.api_v1.schemas.document import DocumentParserInput -from app.repository.geography import get_geo_subquery -from app.repository.lookups import doc_type_from_family_document_metadata - -_LOGGER = logging.getLogger(__name__) - -MetadataType = dict[str, list[str]] - - -def generate_pipeline_ingest_input(db: Session) -> Sequence[DocumentParserInput]: - """Generates a complete view of the current document database as pipeline input""" - _LOGGER.info("Running pipeline family query") - geo_subquery = get_geo_subquery(db) - - query = ( - db.query( - Family, FamilyDocument, FamilyMetadata, geo_subquery.c.value, Organisation, Corpus, PhysicalDocument # type: ignore - ) - .join(Family, Family.import_id == FamilyDocument.family_import_id) - .join(FamilyCorpus, FamilyCorpus.family_import_id == Family.import_id) - .join(Corpus, Corpus.import_id == FamilyCorpus.corpus_import_id) - .join(FamilyMetadata, Family.import_id == FamilyMetadata.family_import_id) - .join(Organisation, Organisation.id == Corpus.organisation_id) - .join( - PhysicalDocument, PhysicalDocument.id == FamilyDocument.physical_document_id - ) - .filter(FamilyDocument.document_status != DocumentStatus.DELETED) - .filter(geo_subquery.c.family_import_id == Family.import_id) # type: ignore - ) - - query_result = cast( - Sequence[ - Tuple[ - Family, - FamilyDocument, - FamilyMetadata, - str, - Organisation, - Corpus, - PhysicalDocument, - ] - ], - query.all(), - ) - fallback_date = datetime(1900, 1, 1, tzinfo=timezone.utc) - _LOGGER.info("Running pipeline document query") - documents: Sequence[DocumentParserInput] = [ - DocumentParserInput( - name=cast(str, family.title), # All documents in a family indexed by title - document_title=cast(str, physical_document.title), - description=cast(str, family.description), - category=str(family.family_category), - publication_ts=family.published_date or fallback_date, - import_id=cast(str, family_document.import_id), - slug=cast(str, family_document.slugs[-1].name), - family_import_id=cast(str, family.import_id), - family_slug=cast(str, family.slugs[-1].name), - source_url=( - cast(str, family_document.physical_document.source_url) - if family_document.physical_document is not None - else None - ), - download_url=None, - type=doc_type_from_family_document_metadata(family_document), - source=cast(str, organisation.name), - geography=cast(str, geography), - geographies=[cast(str, geography)], - corpus_import_id=cast(str, corpus.import_id), - corpus_type_name=cast(str, corpus.corpus_type_name), - collection_title=None, - collection_summary=None, - languages=[ - cast(str, lang.name) - for lang in ( - family_document.physical_document.languages - if family_document.physical_document is not None - else [] - ) - ], - metadata=flatten_pipeline_metadata( - cast(MetadataType, family_metadata.value), - cast(MetadataType, family_document.valid_metadata), - ), - ) - for ( - family, - family_document, - family_metadata, - geography, - organisation, - corpus, - physical_document, - ) in query_result - ] - - # TODO: Revert to raise a ValueError when the issue is resolved - database_doc_count = ( - db.query(FamilyDocument) - .filter(FamilyDocument.document_status != DocumentStatus.DELETED) - .count() - ) - if len(documents) > database_doc_count: - _LOGGER.warning( - "Potential Row Explosion. Ingest input is returning more documents than exist in the database", - extra={ - "ingest_count": len(documents), - "database_count": database_doc_count, - }, - ) - - return documents - - -def flatten_pipeline_metadata( - family_metadata: MetadataType, document_metadata: MetadataType -) -> MetadataType: - """Combines metadata objects ready for the pipeline""" - - metadata = {} - - for k, v in family_metadata.items(): - metadata[f"family.{k}"] = v - - for k, v in document_metadata.items(): - metadata[f"document.{k}"] = v - - return metadata diff --git a/app/core/validation/util.py b/app/core/validation/util.py index 52e6ae3b..13d2c176 100644 --- a/app/core/validation/util.py +++ b/app/core/validation/util.py @@ -6,7 +6,6 @@ from io import BytesIO from typing import Any, Collection, Mapping, Optional, Sequence, Union -from app.api.api_v1.schemas.document import DocumentParserInput from app.clients.aws.client import S3Client from app.clients.aws.s3_document import S3Document from app.config import INGEST_TRIGGER_ROOT, PIPELINE_BUCKET @@ -40,22 +39,18 @@ def get_value(data_node: Mapping[str, str]) -> str: def write_documents_to_s3( - s3_client: S3Client, - s3_prefix: str, - documents: Sequence[DocumentParserInput], + s3_client: S3Client, s3_prefix: str, content: dict[str, Any] ) -> Union[S3Document, bool]: """ Write current state of documents into S3 to trigger a pipeline run after bulk ingest :param [S3Client] s3_client: an S3 client to use to write data :param [str] s3_prefix: prefix into which to write the document updates in s3 + :param [dict[str, Any]] content: db state document content :param [Sequence[DocumentCreateRequest]] documents: a sequence of document specifications to write to S3 """ - json_content = json.dumps( - {"documents": {d.import_id: d.to_json() for d in documents}}, - indent=2, - ) + json_content = json.dumps(content, indent=2) bytes_content = BytesIO(json_content.encode("utf8")) documents_object_key = f"{s3_prefix}/db_state.json" _LOGGER.info("Writing Documents file into S3") diff --git a/app/repository/document.py b/app/repository/document.py index 038d669f..62f155d8 100644 --- a/app/repository/document.py +++ b/app/repository/document.py @@ -21,6 +21,7 @@ from db_client.models.dfce.metadata import FamilyMetadata from db_client.models.document.physical_document import PhysicalDocument from db_client.models.organisation.organisation import Organisation +from sqlalchemy import func from sqlalchemy.orm import Session from app.api.api_v1.schemas.document import ( @@ -66,14 +67,21 @@ def get_family_document_and_context( ) db_objects = ( db.query( - Family, FamilyDocument, PhysicalDocument, geo_subquery.c.value, FamilyCorpus # type: ignore + Family, + FamilyDocument, + PhysicalDocument, + func.array_agg(geo_subquery.c.value).label( # type: ignore + "geographies" + ), # Aggregate geographies + FamilyCorpus, ) .filter(FamilyDocument.import_id == family_document_import_id) .filter(Family.import_id == FamilyDocument.family_import_id) .filter(FamilyDocument.physical_document_id == PhysicalDocument.id) .filter(FamilyCorpus.family_import_id == Family.import_id) .filter(geo_subquery.c.family_import_id == Family.import_id) # type: ignore - ).first() # TODO: Fix when handling multiple geographies /PDCT-1510 + .group_by(Family, FamilyDocument, PhysicalDocument, FamilyCorpus) + ).first() if not db_objects: _LOGGER.warning( @@ -84,7 +92,7 @@ def get_family_document_and_context( f"No family document found for import_id: {family_document_import_id}" ) - family, document, physical_document, geography_value, family_corpus = db_objects + family, document, physical_document, geographies, family_corpus = db_objects if ( family.family_status != FamilyStatus.PUBLISHED @@ -95,7 +103,10 @@ def get_family_document_and_context( family_context = FamilyContext( title=cast(str, family.title), import_id=cast(str, family.import_id), - geography=geography_value, + geography=geographies[ + 0 + ], # First geography for backward compatibility PDCT-1440 + geographies=geographies, slug=family.slugs[0].name, category=family.family_category, published_date=family.published_date, @@ -147,7 +158,13 @@ def get_family_and_documents(db: Session, import_id: str) -> FamilyAndDocumentsR geo_subquery = get_geo_subquery(db) db_objects = ( db.query( - Family, geo_subquery.c.value, FamilyMetadata, Organisation, FamilyCorpus # type: ignore + Family, + func.array_agg(geo_subquery.c.value).label( # type: ignore + "geographies" + ), # Aggregate geographies + FamilyMetadata, + Organisation, + FamilyCorpus, ) .join(FamilyMetadata, Family.import_id == FamilyMetadata.family_import_id) .join(FamilyCorpus, Family.import_id == FamilyCorpus.family_import_id) @@ -155,14 +172,14 @@ def get_family_and_documents(db: Session, import_id: str) -> FamilyAndDocumentsR .join(Organisation, Corpus.organisation_id == Organisation.id) .filter(Family.import_id == import_id) .filter(geo_subquery.c.family_import_id == Family.import_id) # type: ignore - ).first() # TODO Fix as part of PDCT-1440 + .group_by(Family, FamilyMetadata, Organisation, FamilyCorpus) + ).first() if not db_objects: _LOGGER.warning("No family found for import_id", extra={"slug": import_id}) raise ValueError(f"No family found for import_id: {import_id}") - family: Family - (family, geography_value, family_metadata, organisation, family_corpus) = db_objects + family, geographies, family_metadata, organisation, family_corpus = db_objects if family.family_status != FamilyStatus.PUBLISHED: raise ValueError(f"Family {import_id} is not published") @@ -175,7 +192,10 @@ def get_family_and_documents(db: Session, import_id: str) -> FamilyAndDocumentsR import_id=import_id, title=cast(str, family.title), summary=cast(str, family.description), - geography=geography_value, + geography=geographies[ + 0 + ], # First geography for backward compatibility PDCT-1440 + geographies=geographies, category=cast(str, family.family_category), status=cast(str, family.family_status), metadata=cast(dict, family_metadata.value), diff --git a/app/repository/geography.py b/app/repository/geography.py index b917cd42..80d54eba 100644 --- a/app/repository/geography.py +++ b/app/repository/geography.py @@ -1,6 +1,7 @@ """Functions to support the geographies endpoint.""" import logging +from typing import Optional from db_client.models.dfce.family import ( Family, @@ -21,32 +22,31 @@ def get_geo_subquery( db: Session, - allowed_geo_slugs=None, - allowed_geo_values=None, - family_document_import_id=None, + allowed_geo_slugs: Optional[list[str]] = None, + allowed_geo_values: Optional[list[str]] = None, + family_document_import_id: Optional[str] = None, ) -> Query: - + """ + Create a subquery to fetch geographies associated with families. + + :param Session db: Database session. + :param Optional[list[str]] allowed_geo_slugs: Optional list of + allowed geography slugs. + :param Optional[list[str]] allowed_geo_values: Optional list of + allowed geography values. + :param Optional[str] family_document_import_id: Optional family + document import ID. + :return Query: A subquery for geographies. + """ geo_subquery = ( db.query( - func.min(Geography.value).label("value"), - func.min(Geography.slug).label("slug"), + Geography.value.label("value"), + Geography.slug.label("slug"), FamilyGeography.family_import_id, ) .join(FamilyGeography, FamilyGeography.geography_id == Geography.id) .filter(FamilyGeography.family_import_id == Family.import_id) - .group_by(Geography.value, Geography.slug, FamilyGeography.family_import_id) ) - """ NOTE: This is an intermediate step to migrate to multi-geography support. - We grab the minimum geography value for each family to use as a fallback for a single geography. - This is because there is no rank for geography values and we need to pick one. - - This also looks dodgy as the "value" and "slug" may not match up. - However, the browse code only uses one of these values, so it should be fine. - - Don't forget this is temporary and will be removed once multi-geography support is implemented. - - Also remember to update the pipeline query to pass these in when changing this. - """ if allowed_geo_slugs is not None: geo_subquery = geo_subquery.filter(Geography.slug.in_(allowed_geo_slugs)) diff --git a/app/repository/pipeline.py b/app/repository/pipeline.py new file mode 100644 index 00000000..3e724f01 --- /dev/null +++ b/app/repository/pipeline.py @@ -0,0 +1,151 @@ +import logging +import os +from datetime import datetime, timezone +from functools import lru_cache +from typing import Sequence, cast + +import pandas as pd +from db_client.models.dfce import DocumentStatus +from db_client.models.dfce.family import FamilyDocument +from fastapi import Depends + +from app.api.api_v1.schemas.document import DocumentParserInput +from app.clients.db.session import get_db + +_LOGGER = logging.getLogger(__name__) + +MetadataType = dict[str, list[str]] + + +@lru_cache() +def generate_pipeline_ingest_input_query(): + """Read query for non-deleted docs and their associated data.""" + with open(os.path.join("app", "repository", "sql", "pipeline.sql"), "r") as file: + return file.read() + + +def get_pipeline_data(db=Depends(get_db)) -> pd.DataFrame: + """Get non-deleted docs and their associated data from the db. + + Use the pipeline query to query the current database to get a list + of non deleted documents and their associated data, family info, + metadata, languages, and geographies. + + The final result is a DataFrame containing the required information + to construct a DocumentParserInput object. + + :param Session db: The db session to query against. + :return pd.DataFrame: DataFrame containing current view of documents + in database. + """ + _LOGGER.info("Running pipeline query") + query = generate_pipeline_ingest_input_query() + df = pd.read_sql(query, db.connection().connection) + return df + + +def parse_document_object(row: pd.Series) -> DocumentParserInput: + """Parse DataFrame row into DocumentParserInput object. + + :param pd.Series row: A pandas series containing a row that + represents a family document and its related context. + :return DocumentParserInput: A DocumentParserInput object + representing the family document record & its context. + """ + published_date = row.family_published_date + published_date = datetime( + published_date.year, published_date.month, published_date.day + ).astimezone(timezone.utc) + + fallback_date = datetime(1900, 1, 1, tzinfo=timezone.utc) + return DocumentParserInput( + # All documents in a family indexed by title + name=cast(str, row.family_title), + document_title=cast(str, row.physical_document_title), + description=cast(str, row.family_description), + category=str(row.family_category), + publication_ts=published_date or fallback_date, + import_id=cast(str, row.family_document_import_id), + # This gets the most recently added document slug. + slug=cast(str, row.family_document_slug), + family_import_id=cast(str, row.family_import_id), + # This gets the most recently added family slug. + family_slug=cast(str, row.family_slug), + source_url=( + cast(str, row.physical_document_source_url) + if row.physical_document_source_url is not None + else None + ), + download_url=None, + type=cast(str, row.get("family_document_type", default="")), + source=cast(str, row.organisation_name), + geography=cast(list, row.get("geographies", default=[""]))[ + 0 + ], # First geography for backward compatibility + geographies=row.geographies, + corpus_import_id=cast(str, row.corpus_import_id), + corpus_type_name=cast(str, row.corpus_type_name), + collection_title=None, + collection_summary=None, + languages=[ + lang + for lang in ( + cast( + list, + row.get("languages") if row.get("languages") is not None else [], + ) + ) + ], + metadata=_flatten_pipeline_metadata( + cast(MetadataType, row.family_metadata), + cast(MetadataType, row.family_document_metadata), + ), + ) + + +def generate_pipeline_ingest_input(db=Depends(get_db)) -> Sequence[DocumentParserInput]: + """Generate a view of the current document db as pipeline input. + + :param Session db: The db session to query against. + :return Sequence[DocumentParserInput]: A list of DocumentParserInput + objects that can be used by the pipeline. + """ + results = get_pipeline_data(db) + + _LOGGER.info("Parsing pipeline query data") + documents: Sequence[DocumentParserInput] = [ + parse_document_object(row) for _, row in results.iterrows() + ] + + # TODO: Revert to raise a ValueError when the issue is resolved + database_doc_count = ( + db.query(FamilyDocument) + .filter(FamilyDocument.document_status != DocumentStatus.DELETED) + .count() + ) + if len(documents) > database_doc_count: + _LOGGER.warning( + "Potential Row Explosion. Ingest input is returning more documents than exist in the database", + extra={ + "ingest_count": len(documents), + "database_count": database_doc_count, + }, + ) + + return documents + + +def _flatten_pipeline_metadata( + family_metadata: MetadataType, document_metadata: MetadataType +) -> MetadataType: + """Combines metadata objects ready for the pipeline""" + + metadata = {} + + for k, v in family_metadata.items(): + metadata[f"family.{k}"] = v + + for k, v in document_metadata.items(): + metadata[f"document.{k}"] = v + + return metadata diff --git a/app/repository/sql/pipeline.sql b/app/repository/sql/pipeline.sql new file mode 100644 index 00000000..c9f6b088 --- /dev/null +++ b/app/repository/sql/pipeline.sql @@ -0,0 +1,244 @@ +WITH deduplicated_family_slugs AS ( SELECT + DISTINCT + ON (slug.family_import_id) slug.family_import_id, + slug.created, + slug.name + FROM + ( SELECT + slug.family_import_id AS "family_import_id", + Count(*) AS count + FROM + slug + WHERE + slug.family_import_id IS NOT NULL + GROUP BY + slug.family_import_id + HAVING + Count(*) > 1 ) duplicates + left join + slug + ON duplicates.family_import_id = slug.family_import_id + ORDER BY + slug.family_import_id DESC, + slug.created DESC, + slug.ctid DESC ), + unique_family_slugs AS ( SELECT + DISTINCT + ON (slug.family_import_id) slug.family_import_id, + slug.created, + slug.name + FROM + ( SELECT + slug.family_import_id AS "family_import_id", + Count(*) AS count + FROM + slug + WHERE + slug.family_import_id IS NOT NULL + GROUP BY + slug.family_import_id + HAVING + Count(*) = 1 ) non_duplicates + left join + slug + ON non_duplicates.family_import_id = slug.family_import_id + ORDER BY + slug.family_import_id DESC, + slug.created DESC, + slug.ctid DESC ), + most_recent_family_slugs AS ( SELECT + deduplicated_family_slugs.family_import_id AS "family_import_id", + deduplicated_family_slugs.created AS "created", + deduplicated_family_slugs.name AS "name" + FROM + deduplicated_family_slugs + UNION + ALL SELECT + unique_family_slugs.family_import_id AS "family_import_id", + unique_family_slugs.created AS "created", + unique_family_slugs.name AS "name" + FROM + unique_family_slugs + ORDER BY + family_import_id DESC, + created DESC ), deduplicated_doc_slugs AS ( SELECT + DISTINCT + ON (slug.family_document_import_id) slug.family_document_import_id, + slug.created, + slug.name + FROM + ( SELECT + slug.family_document_import_id AS "family_document_import_id", + Count(*) AS count + FROM + slug + WHERE + slug.family_document_import_id IS NOT NULL + GROUP BY + slug.family_document_import_id + HAVING + Count(*) > 1 ) duplicates + left join + slug + ON duplicates.family_document_import_id = slug.family_document_import_id + ORDER BY + slug.family_document_import_id DESC, + slug.created DESC, + slug.ctid DESC ), + unique_doc_slugs AS ( SELECT + DISTINCT + ON (slug.family_document_import_id) slug.family_document_import_id, + slug.created, + slug.name + FROM + ( SELECT + slug.family_document_import_id AS "family_document_import_id", + Count(*) AS count + FROM + slug + WHERE + slug.family_document_import_id IS NOT NULL + GROUP BY + slug.family_document_import_id + HAVING + Count(*) = 1 ) non_duplicates + left join + slug + ON non_duplicates.family_document_import_id = slug.family_document_import_id + ORDER BY + slug.family_document_import_id DESC, + slug.created DESC, + slug.ctid DESC ), + most_recent_doc_slugs AS ( SELECT + deduplicated_doc_slugs.family_document_import_id AS "family_document_import_id", + deduplicated_doc_slugs.created, + deduplicated_doc_slugs.name + FROM + deduplicated_doc_slugs + UNION + ALL SELECT + unique_doc_slugs.family_document_import_id AS "family_document_import_id", + unique_doc_slugs.created, + unique_doc_slugs.name + FROM + unique_doc_slugs + ORDER BY + family_document_import_id DESC, + created DESC ), event_dates AS ( SELECT + family_event.family_import_id AS family_import_id, + Min(CASE + WHEN family_event.event_type_name='Passed/Approved' THEN family_event.DATE::DATE + ELSE family_event.DATE::DATE + END) published_date, + Max(family_event.DATE::DATE) last_changed + FROM + family_event + GROUP BY + family_import_id ) SELECT + f.title AS "family_title", + p.title AS "physical_document_title", + f.description AS "family_description", + CASE + WHEN f.family_category IN ('UNFCCC', + 'MCF') THEN Upper(f.family_category::text) + ELSE Initcap(f.family_category::text) + END "family_category", + fp.published_date AS "family_published_date", + d.import_id AS "family_document_import_id", + ds.name AS "family_document_slug", + f.import_id AS "family_import_id", + fs.name AS "family_slug", + p.source_url AS "physical_document_source_url", + d.valid_metadata::json#>>'{type,0}' AS "family_document_type", + o.name AS "organisation_name", + geos.geographies AS "geographies", + c.import_id AS "corpus_import_id", + c.corpus_type_name AS "corpus_type_name", + langs.languages AS "languages", + fm.value AS "family_metadata", + d.valid_metadata AS "family_document_metadata" + FROM + physical_document p + join + family_document d + ON p.id = d.physical_document_id + join + family f + ON d.family_import_id = f.import_id full + join + ( + SELECT + family_geography.family_import_id AS "family_import_id", + string_agg(geography.value, + ';') AS geo_isos, + string_agg(geography.display_value, + ';') AS geo_display_values + FROM + geography + inner join + family_geography + ON geography.id = family_geography.geography_id + GROUP BY + family_geography.family_import_id + ) fg + ON fg.family_import_id=f.import_id + join + family_corpus fc + ON f.import_id = fc.family_import_id + join + corpus c + ON fc.corpus_import_id = c.import_id + join + organisation o + ON c.organisation_id = o.id + join + family_metadata fm + ON fm.family_import_id = f.import_id + left outer join + ( + SELECT + family_document.import_id AS family_document_import_id, + json_agg(DISTINCT(LANGUAGE.name)) AS languages + FROM + family_document + join + physical_document_language + ON physical_document_language.document_id = family_document.physical_document_id + join + LANGUAGE + ON LANGUAGE.id = physical_document_language.language_id + GROUP BY + family_document.import_id + ) AS langs + ON langs.family_document_import_id = d.import_id + left outer join + ( + SELECT + family_geography.family_import_id AS family_import_id, + json_agg(DISTINCT(geography.value)) AS geographies + FROM + family_geography + join + geography + ON geography.id = family_geography.geography_id + GROUP BY + family_geography.family_import_id + ) AS geos + ON geos.family_import_id = f.import_id + left join + most_recent_doc_slugs ds + ON ds.family_document_import_id = d.import_id + left join + most_recent_family_slugs fs + ON fs.family_import_id = f.import_id + left join + event_dates fp + ON fp.family_import_id = f.import_id + WHERE + d.document_status != 'DELETED' + AND fg.family_import_id = f.import_id + ORDER BY + d.last_modified DESC, + d.created DESC, + d.ctid DESC, + f.import_id diff --git a/app/service/pipeline.py b/app/service/pipeline.py new file mode 100644 index 00000000..bc86b766 --- /dev/null +++ b/app/service/pipeline.py @@ -0,0 +1,37 @@ +import logging +from typing import Any, Sequence + +from fastapi import Depends + +from app.api.api_v1.schemas.document import DocumentParserInput +from app.clients.db.session import get_db +from app.repository.pipeline import generate_pipeline_ingest_input + +_LOGGER = logging.getLogger(__name__) + +MetadataType = dict[str, list[str]] + + +def format_pipeline_ingest_input( + documents: Sequence[DocumentParserInput], +) -> dict[str, Any]: + """Format the DocumentParserInput objects for the db_state.json file. + + :param Sequence[DocumentParserInput] documents: A list of + DocumentParserInput objects that can be used by the pipeline. + :return dict[str, Any]: The contents of the db_state.json file in + JSON form. + """ + return {"documents": {d.import_id: d.to_json() for d in documents}} + + +def get_db_state_content(db=Depends(get_db)): + """Get the db_state.json content in JSON form. + + :param Session db: The db session to query against. + :return: A list of DocumentParserInput objects in the JSON format + that will be written to the db_state.json file used by the + pipeline. + """ + pipeline_ingest_input = generate_pipeline_ingest_input(db) + return format_pipeline_ingest_input(pipeline_ingest_input) diff --git a/docs/local_full_stack_setup/README.md b/docs/local_full_stack_setup/README.md index 44ed5775..0b25cef3 100644 --- a/docs/local_full_stack_setup/README.md +++ b/docs/local_full_stack_setup/README.md @@ -108,8 +108,10 @@ echo $(python -m extract_document_import_ids $family_document_sample.jsonl | xar ### Set your vespa cli to point to local -Vespa config set target local +```shell +vespa config set target local vespa config set application default.application.default +``` ### Feed the documents into the local instance diff --git a/makefile-docker.defs b/makefile-docker.defs index 2aa780f5..b6f1a6a3 100644 --- a/makefile-docker.defs +++ b/makefile-docker.defs @@ -27,7 +27,10 @@ show_logs: docker compose logs -f stop: - docker compose stop + docker compose stop --remove-orphans + +rm: + docker compose rm remove_volumes: docker compose down -v @@ -110,7 +113,7 @@ test_search: -vvv tests/search \ -m 'search' ${ARGS} -test_cors: +test_cors: docker compose -f docker-compose.yml -f docker-compose.dev.yml run --rm backend pytest -vvv -m 'cors' ${ARGS} test_unit: diff --git a/poetry.lock b/poetry.lock index 76562209..791d3a4f 100644 --- a/poetry.lock +++ b/poetry.lock @@ -834,7 +834,7 @@ vision = ["Pillow (>=9.4.0)"] [[package]] name = "db-client" -version = "3.8.18" +version = "3.8.19" description = "All things to do with the datamodel and its storage. Including alembic migrations and datamodel code." optional = false python-versions = "^3.9" @@ -854,8 +854,8 @@ SQLAlchemy-Utils = "^0.38.2" [package.source] type = "git" url = "https://github.com/climatepolicyradar/navigator-db-client.git" -reference = "v3.8.18" -resolved_reference = "b221b4d7a23a111b8b0ce2a19329f2d3a73b0e7c" +reference = "v3.8.19" +resolved_reference = "603666581c5c4aa8436f7c419167a8ae5ab49fb2" [[package]] name = "deprecation" @@ -4239,4 +4239,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "6bcc2beaba87225d85fe50521e88cc2f944d5274c66d8b43b49722fa71c62f39" +content-hash = "711884413de31b1e08edd8d61dc17af9c3254b52bd8988bcd1d33edf3278a63a" diff --git a/pyproject.toml b/pyproject.toml index e0ba3706..2dd74526 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "navigator_backend" -version = "1.18.0" +version = "1.19.0" description = "" authors = ["CPR-dev-team "] packages = [{ include = "app" }, { include = "tests" }] @@ -31,7 +31,7 @@ starlette = "^0.27.0" tenacity = "^8.0.1" uvicorn = { extras = ["standard"], version = "^0.20.0" } botocore = "^1.34.19" -db-client = { git = "https://github.com/climatepolicyradar/navigator-db-client.git", tag = "v3.8.18" } +db-client = { git = "https://github.com/climatepolicyradar/navigator-db-client.git", tag = "v3.8.19" } urllib3 = "<2" apscheduler = "^3.10.4" numpy = "1.26.4" diff --git a/scripts/db_state_validator_click.py b/scripts/db_state_validator_click.py new file mode 100644 index 00000000..0140cbd9 --- /dev/null +++ b/scripts/db_state_validator_click.py @@ -0,0 +1,153 @@ +import json +import logging +import sys +from typing import List + +import click +from pydantic import ValidationError +from cpr_sdk.pipeline_general_models import InputData +from cpr_sdk.parser_models import BackendDocument + +# Setup logger +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def load_json(file_path: str) -> InputData: + """Load JSON from a file and validate it against the DBState model. + + :param file_path: Path to the JSON file + :type file_path: str + :raises SystemExit: If there's an error loading/validating the JSON + :return: The validated database state + :rtype: DBState + """ + try: + with open(file_path, "r") as file: + data = json.load(file) + return InputData(**data) + except (json.JSONDecodeError, ValidationError) as e: + logger.error(f"💥 Error loading JSON from {file_path}: {e}") + sys.exit(1) + + +def find_differing_doc_import_ids( + main_sorted: List[BackendDocument], branch_sorted: List[BackendDocument] +) -> bool: + main_set = {doc.import_id for doc in main_sorted} + branch_set = {doc.import_id for doc in branch_sorted} + missing_in_branch = main_set - branch_set + extra_in_branch = branch_set - main_set + + if missing_in_branch or extra_in_branch: + if missing_in_branch: + logger.info(f"🔍 Missing doc IDs in branch: {missing_in_branch}") + if extra_in_branch: + logger.info( + f"🔍 Extra doc IDs in branch compared with main: {extra_in_branch}" + ) + return True + return False + + +def find_document_differences( + main_sorted: List[BackendDocument], branch_sorted: List[BackendDocument] +) -> bool: + """Compare each document in two sorted lists and log differences. + + :param main_sorted: List of documents from the main database state + :type main_sorted: List[Document] + :param branch_sorted: List of documents from the branch database state + :type branch_sorted: List[Document] + :return: True if differences are found, False otherwise + :rtype: bool + """ + differences_found = False + differences = {"differences": {}} + + for main_doc, branch_doc in zip(main_sorted, branch_sorted): + if main_doc.import_id != branch_doc.import_id: + logger.info( + f"❌ Import ID difference found {main_doc.import_id} " + f"vs {branch_doc.import_id}" + ) + + if main_doc != branch_doc: + doc_differences = {} + for field in main_doc.model_fields_set: + main_value = getattr(main_doc, field) + branch_value = getattr(branch_doc, field) + + # Special handling for 'languages' and 'geographies' fields + if field in ["languages", "geographies"]: + main_value = sorted(main_value) + branch_value = sorted(branch_value) + + if main_value != branch_value: + doc_differences[field] = { + "main": main_value, + "branch": branch_value, + } + logger.info( + f"🔍 Field '{field}' differs in document '{main_doc.import_id}': " + f"main '{main_value}' vs branch '{branch_value}'" + ) + if doc_differences: + differences["differences"][main_doc.import_id] = doc_differences + differences_found = True + + # Write differences to a JSON file + if differences_found: + with open("document_differences.json", "w") as json_file: + json.dump(differences, json_file, indent=4) + + return differences_found + + +def compare_db_states(main_db: InputData, branch_db: InputData): + """Compare two DB state files and log differences. + + :param main_db: The main database state (source of truth) + :type main_db: DBState + :param branch_db: The branch database state under test + :type branch_db: DBState + :raises SystemExit: If there are differences in the document lengths + or contents + """ + # Sort documents by import_id for order-insensitive comparison + main_sorted = sorted(main_db.documents.values(), key=lambda doc: doc.import_id) + branch_sorted = sorted(branch_db.documents.values(), key=lambda doc: doc.import_id) + + if len(main_sorted) != len(branch_sorted): + logger.info( + f"🔍 Document list lengths differ: main {len(main_sorted)}, " + f"branch {len(branch_sorted)}" + ) + sys.exit(1) + + if find_differing_doc_import_ids(main_sorted, branch_sorted): + sys.exit(1) + + if find_document_differences(main_sorted, branch_sorted): + sys.exit(1) + + logger.info("🎉 DB states are equivalent!") + + +@click.command() +@click.argument("main_db_file", type=click.Path(exists=True)) +@click.argument("branch_db_file", type=click.Path(exists=True)) +def main(main_db_file: str, branch_db_file: str): + """Main function to load and compare database states. + + :param main_db_file: Path to the main database state file + :param branch_db_file: Path to the branch database state file + """ + main_db_state = load_json(main_db_file) + branch_db_state = load_json(branch_db_file) + + compare_db_states(main_db_state, branch_db_state) + + +if __name__ == "__main__": + main() diff --git a/tests/non_search/app/documents/test_get_document_families.py b/tests/non_search/app/documents/test_get_document_families.py index 2cf8094c..6c752f02 100644 --- a/tests/non_search/app/documents/test_get_document_families.py +++ b/tests/non_search/app/documents/test_get_document_families.py @@ -1,3 +1,4 @@ +import pytest from db_client.models.dfce.family import Family, FamilyDocument, FamilyEvent from fastapi.testclient import TestClient from sqlalchemy import update @@ -9,8 +10,8 @@ setup_with_two_docs_one_family, ) -N_FAMILY_KEYS = 15 -N_FAMILY_OVERVIEW_KEYS = 8 +N_FAMILY_KEYS = 16 # TODO: Replace with 15 as part of PDCT-1440 +N_FAMILY_OVERVIEW_KEYS = 9 # TODO: Replace with 8 part of PDCT-1440 N_DOCUMENT_KEYS = 12 @@ -55,49 +56,116 @@ def test_documents_family_slug_returns_correct_family( assert json_response["import_id"] == "CCLW.family.2002.0" +@pytest.mark.parametrize( + ("slug", "expected_fam", "expected_doc"), + [ + ( + "FamSlug1", + { + "title": "Fam1", + "import_id": "CCLW.family.1001.0", + "geography": "South Asia", + "geographies": ["South Asia"], + "category": "Executive", + "slug": "FamSlug1", + "corpus_id": "CCLW.corpus.i00000001.n0000", + "published_date": "2019-12-25T00:00:00Z", + "last_updated_date": "2019-12-25T00:00:00Z", + "metadata": {"color": "pink", "size": "big"}, + "organisation": "CCLW", + "status": "Published", + "summary": "Summary1", + }, + { + "import_id": "CCLW.executive.1.2", + "variant": "Original Language", + "slug": "DocSlug1", + "title": "Document1", + "md5_sum": "111", + "cdn_object": None, + "content_type": "application/pdf", + "source_url": "http://somewhere1", + "language": "eng", + "languages": ["eng"], + "document_type": "Plan", + "document_role": "MAIN", + }, + ), + ( + "FamSlug2", + { + "title": "Fam2", + "import_id": "CCLW.family.2002.0", + "geography": "AFG", + "geographies": ["AFG", "IND"], + "category": "Executive", + "slug": "FamSlug2", + "corpus_id": "CCLW.corpus.i00000001.n0000", + "published_date": "2019-12-25T00:00:00Z", + "last_updated_date": "2019-12-25T00:00:00Z", + "metadata": {"color": "blue", "size": "small"}, + "organisation": "CCLW", + "status": "Published", + "summary": "Summary2", + }, + { + "import_id": "CCLW.executive.2.2", + "variant": None, + "slug": "DocSlug2", + "title": "Document2", + "md5_sum": None, + "cdn_object": None, + "content_type": None, + "source_url": "http://another_somewhere", + "language": "", + "languages": [], + "document_type": "Order", + "document_role": "MAIN", + }, + ), + ], +) def test_documents_family_slug_returns_correct_json( - data_client: TestClient, - data_db: Session, + data_client: TestClient, data_db: Session, slug, expected_fam, expected_doc ): setup_with_two_docs(data_db) # Test associations response = data_client.get( - "/api/v1/documents/FamSlug1", + f"/api/v1/documents/{slug}", ) json_response = response.json() assert response.status_code == 200 - assert len(json_response) == N_FAMILY_KEYS - assert json_response["organisation"] == "CCLW" - assert json_response["import_id"] == "CCLW.family.1001.0" - assert json_response["title"] == "Fam1" - assert json_response["summary"] == "Summary1" - assert json_response["geography"] == "South Asia" - assert json_response["category"] == "Executive" - assert json_response["status"] == "Published" - assert json_response["corpus_id"] == "CCLW.corpus.i00000001.n0000" - assert json_response["published_date"] == "2019-12-25T00:00:00Z" - assert json_response["last_updated_date"] == "2019-12-25T00:00:00Z" - - # TODO: https://linear.app/climate-policy-radar/issue/PDCT-1017 - assert len(json_response["metadata"]) == 2 - assert json_response["metadata"]["size"] == "big" - - assert json_response["slug"] == "FamSlug1" + # Verify family data correct. + assert len(json_response) == N_FAMILY_KEYS + actual_family_data = { + k: v + for k, v in json_response.items() + if k not in ["events", "documents", "collections"] + } + assert actual_family_data == expected_fam + + # Verify events data correct. + events = json_response["events"] assert len(json_response["events"]) == 1 - assert json_response["events"][0]["title"] == "Published" - - assert len(json_response["documents"]) == 1 - assert json_response["documents"][0]["title"] == "Document1" - assert json_response["documents"][0]["slug"] == "DocSlug1" - assert json_response["documents"][0]["import_id"] == "CCLW.executive.1.2" - + event = events[0] + assert event["title"] == "Published" + + # Verify documents data correct. + docs = json_response["documents"] + assert len(docs) == 1 + doc = docs[0] + assert len(doc.keys()) == N_DOCUMENT_KEYS + assert doc == expected_doc + + # Verify collections data correct. + collections = json_response["collections"] assert len(json_response["collections"]) == 1 - assert json_response["collections"][0]["title"] == "Collection1" - - assert json_response["collections"][0]["families"] == [ + collection = collections[0] + assert collection["title"] == "Collection1" + assert collection["families"] == [ {"title": "Fam1", "slug": "FamSlug1", "description": "Summary1"}, {"title": "Fam2", "slug": "FamSlug2", "description": "Summary2"}, ] @@ -181,14 +249,74 @@ def test_documents_doc_slug_returns_not_found( assert response.json()["detail"] == "Nothing found for DocSlug100" +@pytest.mark.parametrize( + ("slug", "expected_fam", "expected_doc"), + [ + ( + "DocSlug1", + { + "title": "Fam1", + "import_id": "CCLW.family.1001.0", + "geography": "South Asia", + "geographies": ["South Asia"], + "category": "Executive", + "slug": "FamSlug1", + "corpus_id": "CCLW.corpus.i00000001.n0000", + "published_date": "2019-12-25T00:00:00Z", + "last_updated_date": "2019-12-25T00:00:00Z", + }, + { + "import_id": "CCLW.executive.1.2", + "variant": "Original Language", + "slug": "DocSlug1", + "title": "Document1", + "md5_sum": "111", + "cdn_object": None, + "content_type": "application/pdf", + "source_url": "http://somewhere1", + "language": "eng", + "languages": ["eng"], + "document_type": "Plan", + "document_role": "MAIN", + }, + ), + ( + "DocSlug2", + { + "title": "Fam2", + "import_id": "CCLW.family.2002.0", + "geography": "AFG", + "geographies": ["AFG", "IND"], + "category": "Executive", + "slug": "FamSlug2", + "corpus_id": "CCLW.corpus.i00000001.n0000", + "published_date": "2019-12-25T00:00:00Z", + "last_updated_date": "2019-12-25T00:00:00Z", + }, + { + "import_id": "CCLW.executive.2.2", + "variant": None, + "slug": "DocSlug2", + "title": "Document2", + "md5_sum": None, + "cdn_object": None, + "content_type": None, + "source_url": "http://another_somewhere", + "language": "", + "languages": [], + "document_type": "Order", + "document_role": "MAIN", + }, + ), + ], +) def test_documents_doc_slug_preexisting_objects( - data_client: TestClient, - data_db: Session, + data_client: TestClient, data_db: Session, slug, expected_fam, expected_doc ): setup_with_two_docs(data_db) response = data_client.get( - "/api/v1/documents/DocSlug2", + f"/api/v1/documents/{slug}", ) json_response = response.json() assert response.status_code == 200 @@ -197,30 +325,12 @@ def test_documents_doc_slug_preexisting_objects( family = json_response["family"] assert family assert len(family.keys()) == N_FAMILY_OVERVIEW_KEYS - assert family["title"] == "Fam2" - assert family["import_id"] == "CCLW.family.2002.0" - assert family["geography"] == "AFG" - assert family["category"] == "Executive" - assert family["slug"] == "FamSlug2" - assert family["corpus_id"] == "CCLW.corpus.i00000001.n0000" - assert family["published_date"] == "2019-12-25T00:00:00Z" - assert family["last_updated_date"] == "2019-12-25T00:00:00Z" + assert family == expected_fam doc = json_response["document"] assert doc assert len(doc) == N_DOCUMENT_KEYS - assert doc["import_id"] == "CCLW.executive.2.2" - assert doc["variant"] is None - assert doc["slug"] == "DocSlug2" - assert doc["title"] == "Document2" - assert doc["md5_sum"] is None - assert doc["cdn_object"] is None - assert doc["content_type"] is None - assert doc["source_url"] == "http://another_somewhere" - assert doc["language"] == "" - assert doc["languages"] == [] - assert doc["document_type"] == "Order" - assert doc["document_role"] == "MAIN" + assert doc == expected_doc def test_documents_doc_slug_when_deleted( diff --git a/tests/non_search/app/geographies/setup_world_map_helpers.py b/tests/non_search/app/geographies/setup_world_map_helpers.py index b9fcd01e..1752d0e2 100644 --- a/tests/non_search/app/geographies/setup_world_map_helpers.py +++ b/tests/non_search/app/geographies/setup_world_map_helpers.py @@ -84,7 +84,7 @@ def _add_published_fams_and_docs(db: Session): "title": "Fam3", "slug": "FamSlug3", "description": "Summary3", - "geography_id": 5, + "geography_id": [2, 5], "category": "UNFCCC", "documents": [], "metadata": { diff --git a/tests/non_search/app/geographies/test_world_map_summary.py b/tests/non_search/app/geographies/test_world_map_summary.py index a616bde3..71d0fe6b 100644 --- a/tests/non_search/app/geographies/test_world_map_summary.py +++ b/tests/non_search/app/geographies/test_world_map_summary.py @@ -1,4 +1,5 @@ import pytest +from db_client.models.dfce.family import Family, FamilyGeography, FamilyStatus from db_client.models.dfce.geography import Geography from fastapi import status @@ -7,6 +8,8 @@ setup_mixed_doc_statuses_world_map, ) +EXPECTED_NUM_FAM_CATEGORIES = 3 + def _get_expected_keys(): return ["display_name", "iso_code", "slug", "family_counts"] @@ -28,7 +31,21 @@ def test_geo_table_populated(data_db): assert len(lst) > 0 -def test_endpoint_returns_ok_all_docs_per_family_published(data_db, data_client): +@pytest.mark.parametrize( + ("geo_display_value", "expected_exec", "expected_leg", "expected_unfccc"), + [ + ("India", 1, 1, 1), + ("Afghanistan", 0, 0, 1), + ], +) +def test_endpoint_returns_ok_all_docs_per_family_published( + data_db, + data_client, + geo_display_value, + expected_exec, + expected_leg, + expected_unfccc, +): """Check endpoint returns 200 on success""" setup_all_docs_published_world_map(data_db) response = data_client.get(_url_under_test()) @@ -36,21 +53,49 @@ def test_endpoint_returns_ok_all_docs_per_family_published(data_db, data_client) resp_json = response.json() assert len(resp_json) > 1 - idx = _find_geography_index(resp_json, "display_name", "India") + idx = _find_geography_index(resp_json, "display_name", geo_display_value) resp = resp_json[idx] assert set(["display_name", "iso_code", "slug", "family_counts"]) == set( resp.keys() ) - assert len(resp["family_counts"]) == 3 + family_geos = ( + data_db.query(Family) + .filter(Family.family_status == FamilyStatus.PUBLISHED) + .filter(Geography.display_value == geo_display_value) + .join(FamilyGeography, Family.import_id == FamilyGeography.family_import_id) + .join(Geography, Geography.id == FamilyGeography.geography_id) + .all() + ) - assert resp["family_counts"]["EXECUTIVE"] == 1 - assert resp["family_counts"]["LEGISLATIVE"] == 1 - assert resp["family_counts"]["UNFCCC"] == 1 + assert len(resp["family_counts"]) == EXPECTED_NUM_FAM_CATEGORIES + assert sum(resp["family_counts"].values()) == len(family_geos) + + assert ( + sum(resp["family_counts"].values()) + == expected_exec + expected_leg + expected_unfccc + ) + assert resp["family_counts"]["EXECUTIVE"] == expected_exec + assert resp["family_counts"]["LEGISLATIVE"] == expected_leg + assert resp["family_counts"]["UNFCCC"] == expected_unfccc -def test_endpoint_returns_ok_some_docs_per_family_unpublished(data_db, data_client): +@pytest.mark.parametrize( + ("geo_display_value", "expected_exec", "expected_leg", "expected_unfccc"), + [ + ("India", 1, 1, 2), + ("Afghanistan", 0, 0, 1), + ], +) +def test_endpoint_returns_ok_some_docs_per_published_family_unpublished( + data_db, + data_client, + geo_display_value, + expected_exec, + expected_leg, + expected_unfccc, +): """Check endpoint returns 200 & discounts CREATED & DELETED docs""" setup_mixed_doc_statuses_world_map(data_db) response = data_client.get(_url_under_test()) @@ -58,18 +103,32 @@ def test_endpoint_returns_ok_some_docs_per_family_unpublished(data_db, data_clie resp_json = response.json() assert len(resp_json) > 1 - idx = _find_geography_index(resp_json, "display_name", "India") + idx = _find_geography_index(resp_json, "display_name", geo_display_value) resp = resp_json[idx] assert set(["display_name", "iso_code", "slug", "family_counts"]) == set( resp.keys() ) - assert len(resp["family_counts"]) == 3 + fams = ( + data_db.query(Family) + .filter(Family.family_status == FamilyStatus.PUBLISHED) + .filter(Geography.display_value == geo_display_value) + .join(FamilyGeography, Family.import_id == FamilyGeography.family_import_id) + .join(Geography, Geography.id == FamilyGeography.geography_id) + .all() + ) + + assert len(resp["family_counts"]) == EXPECTED_NUM_FAM_CATEGORIES + assert sum(resp["family_counts"].values()) == len(fams) - assert resp["family_counts"]["EXECUTIVE"] == 1 - assert resp["family_counts"]["LEGISLATIVE"] == 1 - assert resp["family_counts"]["UNFCCC"] == 2 + assert ( + sum(resp["family_counts"].values()) + == expected_exec + expected_leg + expected_unfccc + ) + assert resp["family_counts"]["EXECUTIVE"] == expected_exec + assert resp["family_counts"]["LEGISLATIVE"] == expected_leg + assert resp["family_counts"]["UNFCCC"] == expected_unfccc def test_endpoint_returns_404_when_not_found(data_client): diff --git a/tests/non_search/core/validation/test_util.py b/tests/non_search/core/validation/test_util.py index 1a40ac15..b8581cb4 100644 --- a/tests/non_search/core/validation/test_util.py +++ b/tests/non_search/core/validation/test_util.py @@ -121,7 +121,11 @@ def test_write_documents_to_s3(test_s3_client, mocker): expected_folder_name = every_now.isoformat().replace(":", ".") test_s3_prefix = f"input/{expected_folder_name}" - write_documents_to_s3(test_s3_client, test_s3_prefix, documents=[d]) + write_documents_to_s3( + test_s3_client, + test_s3_prefix, + content={"documents": {d.import_id: d.to_json() for d in [d]}}, + ) upload_file_mock.assert_called_once_with( bucket=PIPELINE_BUCKET, key=f"{test_s3_prefix}/db_state.json", diff --git a/tests/non_search/setup_helpers.py b/tests/non_search/setup_helpers.py index 9e5f39f8..8dbc462e 100644 --- a/tests/non_search/setup_helpers.py +++ b/tests/non_search/setup_helpers.py @@ -93,7 +93,7 @@ def get_default_families(): "title": "Fam2", "slug": "FamSlug2", "description": "Summary2", - "geography_id": 2, + "geography_id": [2, 5], "category": "Executive", "documents": [], "metadata": { diff --git a/tests/unit/app/core/test_pipeline.py b/tests/unit/app/core/test_pipeline.py index 25159f67..69df3f71 100644 --- a/tests/unit/app/core/test_pipeline.py +++ b/tests/unit/app/core/test_pipeline.py @@ -1,20 +1,22 @@ +import json from typing import Dict +from unittest.mock import patch -from sqlalchemy.orm import Session +from click.testing import CliRunner -from app.core.ingestion.pipeline import ( - flatten_pipeline_metadata, - generate_pipeline_ingest_input, -) +from app.repository.pipeline import generate_pipeline_ingest_input +from app.service.pipeline import format_pipeline_ingest_input, get_db_state_content +from scripts.db_state_validator_click import main as db_state_validator_main from tests.non_search.setup_helpers import ( setup_docs_with_two_orgs, setup_with_documents_large_with_families, + setup_with_two_docs_multiple_languages, setup_with_two_docs_one_family, setup_with_two_unpublished_docs, ) -def test_generate_pipeline_ingest_input(data_db: Session): +def test_generate_pipeline_ingest_input(data_db): setup_with_two_docs_one_family(data_db) state_rows = generate_pipeline_ingest_input(data_db) @@ -48,8 +50,7 @@ def test_generate_pipeline_ingest_input(data_db: Session): def test_generate_pipeline_ingest_input_with_fixture( - documents_large: list[Dict], - data_db: Session, + documents_large: list[Dict], data_db ): setup_with_documents_large_with_families(documents_large, data_db) @@ -58,14 +59,14 @@ def test_generate_pipeline_ingest_input_with_fixture( assert len(state_rows) == 23 -def test_generate_pipeline_ingest_input_no_collection_family_link(data_db: Session): +def test_generate_pipeline_ingest_input_no_collection_family_link(data_db): setup_docs_with_two_orgs(data_db) state_rows = generate_pipeline_ingest_input(data_db) assert len(state_rows) == 2 -def test_generate_pipeline_ingest_input__deleted(data_db: Session): +def test_generate_pipeline_ingest_input__deleted(data_db): setup_with_two_unpublished_docs(data_db) documents = generate_pipeline_ingest_input(data_db) @@ -74,13 +75,200 @@ def test_generate_pipeline_ingest_input__deleted(data_db: Session): assert documents[0].import_id == "CCLW.executive.1.2" -def test_flatten_pipeline_metadata(): - family_metadata = {"a": ["1"], "b": ["2"]} - doc_metadata = {"a": ["3"], "b": ["4"]} - result = flatten_pipeline_metadata(family_metadata, doc_metadata) +def test_get_db_state_content_success(data_db, caplog): + """ + GIVEN an expected db state file + WHEN the branch db state content is identical (bar ordering) + THEN the db state validator should succeed + """ + setup_with_two_docs_multiple_languages(data_db) + + expected_db_state_contents = { + "documents": { + "CCLW.executive.1.2": { + "name": "Fam1", + "document_title": "Document1", + "description": "Summary1", + "import_id": "CCLW.executive.1.2", + "slug": "DocSlug1", + "family_import_id": "CCLW.family.1001.0", + "family_slug": "FamSlug1", + "publication_ts": "2019-12-25T00:00:00+00:00", + "date": None, + "source_url": "http://somewhere1", + "download_url": None, + "corpus_import_id": "CCLW.corpus.i00000001.n0000", + "corpus_type_name": "Laws and Policies", + "collection_title": None, + "collection_summary": None, + "type": "Plan", + "source": "CCLW", + "category": "Executive", + "geography": "South Asia", + "geographies": ["South Asia"], + "languages": ["French", "English"], + "metadata": { + "family.size": "big", + "family.color": "pink", + "document.role": ["MAIN"], + "document.type": ["Plan"], + }, + }, + "CCLW.executive.2.2": { + "name": "Fam2", + "document_title": "Document2", + "description": "Summary2", + "import_id": "CCLW.executive.2.2", + "slug": "DocSlug2", + "family_import_id": "CCLW.family.2002.0", + "family_slug": "FamSlug2", + "publication_ts": "2019-12-25T00:00:00+00:00", + "date": None, + "source_url": "http://another_somewhere", + "download_url": None, + "corpus_import_id": "CCLW.corpus.i00000001.n0000", + "corpus_type_name": "Laws and Policies", + "collection_title": None, + "collection_summary": None, + "type": "Order", + "source": "CCLW", + "category": "Executive", + "geography": "AFG", + "geographies": ["AFG", "IND"], + "languages": [], + "metadata": { + "family.size": "small", + "family.color": "blue", + "document.role": ["MAIN"], + "document.type": ["Order"], + }, + }, + } + } + + actual_db_state_content = get_db_state_content(data_db) + + # Use the db_state_validator to verify the db_state files are alike. + runner = CliRunner() + with runner.isolated_filesystem(): + with open("expected.json", "w") as f: + f.write(json.dumps(expected_db_state_contents)) + + with open("actual.json", "w") as f: + f.write(json.dumps(actual_db_state_content)) + + result = runner.invoke( + db_state_validator_main, ["expected.json", "actual.json"] + ) + + assert result.exit_code == 0 + assert "🎉 DB states are equivalent!" in caplog.messages + + +@patch("json.dump") +def test_get_db_state_content_fails_when_mismatch(mock_json_dump, data_db, caplog): + """ + GIVEN a db state file where the doc CCLW.executive.1.2 has 2 langs + WHEN the branch db state contains a third lang for this document + THEN fail the db state validator & output the differences + """ + setup_with_two_docs_multiple_languages(data_db) + + expected_main_db_state = generate_pipeline_ingest_input(data_db) + + modified_branch_db_state = { + "documents": { + "CCLW.executive.1.2": { + "name": "Fam1", + "document_title": "Document1", + "description": "Summary1", + "import_id": "CCLW.executive.1.2", + "slug": "DocSlug1", + "family_import_id": "CCLW.family.1001.0", + "family_slug": "FamSlug1", + "publication_ts": "2019-12-25T00:00:00+00:00", + "date": None, + "source_url": "http://somewhere1", + "download_url": None, + "corpus_import_id": "CCLW.corpus.i00000001.n0000", + "corpus_type_name": "Laws and Policies", + "collection_title": None, + "collection_summary": None, + "type": "Plan", + "source": "CCLW", + "category": "Executive", + "geography": "South Asia", + "geographies": ["South Asia"], + "languages": ["French", "English", "NewLanguage"], + "metadata": { + "family.size": "big", + "family.color": "pink", + "document.role": ["MAIN"], + "document.type": ["Plan"], + }, + }, + "CCLW.executive.2.2": { + "name": "Fam2", + "document_title": "Document2", + "description": "Summary2", + "import_id": "CCLW.executive.2.2", + "slug": "DocSlug2", + "family_import_id": "CCLW.family.2002.0", + "family_slug": "FamSlug2", + "publication_ts": "2019-12-25T00:00:00+00:00", + "date": None, + "source_url": "http://another_somewhere", + "download_url": None, + "corpus_import_id": "CCLW.corpus.i00000001.n0000", + "corpus_type_name": "Laws and Policies", + "collection_title": None, + "collection_summary": None, + "type": "Order", + "source": "CCLW", + "category": "Executive", + "geography": "AFG", + "geographies": ["AFG", "IND"], + "languages": [], + "metadata": { + "family.size": "small", + "family.color": "blue", + "document.role": ["MAIN"], + "document.type": ["Order"], + }, + }, + } + } + + # Use the db_state_validator to verify the db_state files are alike. + runner = CliRunner() + with runner.isolated_filesystem(): + with open("expected.json", "w") as f: + f.write(json.dumps(format_pipeline_ingest_input(expected_main_db_state))) + + with open("actual.json", "w") as f: + f.write(json.dumps(modified_branch_db_state)) + + result = runner.invoke( + db_state_validator_main, ["expected.json", "actual.json"] + ) + + assert result.exit_code == 1 + assert "Field 'languages' differs" in caplog.text + + # Check the JSON differences were written + mock_json_dump.assert_called_once() + differences = mock_json_dump.call_args[0][0] + assert "differences" in differences - assert len(result) == 4 - assert result["family.a"] == ["1"] - assert result["family.b"] == ["2"] - assert result["document.a"] == ["3"] - assert result["document.b"] == ["4"] + assert "CCLW.executive.1.2" in differences["differences"] + assert "languages" in differences["differences"]["CCLW.executive.1.2"] + assert all( + k in differences["differences"]["CCLW.executive.1.2"]["languages"] + for k in ["main", "branch"] + ) + assert ["English", "French"] == differences["differences"]["CCLW.executive.1.2"][ + "languages" + ]["main"] + assert ["English", "French", "NewLanguage"] == differences["differences"][ + "CCLW.executive.1.2" + ]["languages"]["branch"] diff --git a/tests/unit/app/repository/pipeline/test_flatten_pipeline_metadata.py b/tests/unit/app/repository/pipeline/test_flatten_pipeline_metadata.py new file mode 100644 index 00000000..59e67848 --- /dev/null +++ b/tests/unit/app/repository/pipeline/test_flatten_pipeline_metadata.py @@ -0,0 +1,13 @@ +from app.repository.pipeline import _flatten_pipeline_metadata + + +def test_flatten_pipeline_metadata(): + family_metadata = {"a": ["1"], "b": ["2"]} + doc_metadata = {"a": ["3"], "b": ["4"]} + result = _flatten_pipeline_metadata(family_metadata, doc_metadata) + + assert len(result) == 4 + assert result["family.a"] == ["1"] + assert result["family.b"] == ["2"] + assert result["document.a"] == ["3"] + assert result["document.b"] == ["4"]