Skip to content

Commit

Permalink
Merge branch 'main' into docker-orchestration
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesgorrie authored Dec 9, 2024
2 parents 1cc1e0d + 0f82eac commit 17cf05b
Show file tree
Hide file tree
Showing 24 changed files with 321 additions and 207 deletions.
4 changes: 2 additions & 2 deletions app/api/api_v1/routers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from app.api.api_v1.routers.analytics import analytics_router
from app.api.api_v1.routers.auth import auth_router
from app.api.api_v1.routers.bulk_import import bulk_import_router
from app.api.api_v1.routers.collection import collections_router
from app.api.api_v1.routers.config import config_router
from app.api.api_v1.routers.corpus import corpora_router
from app.api.api_v1.routers.document import document_router
from app.api.api_v1.routers.event import event_router
from app.api.api_v1.routers.family import families_router
from app.api.api_v1.routers.ingest import ingest_router

__all__ = (
"analytics_router",
Expand All @@ -17,5 +17,5 @@
"document_router",
"event_router",
"families_router",
"ingest_router",
"bulk_import_router",
)
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,31 @@

from app.errors import ValidationError
from app.model.general import Json
from app.service.ingest import (
from app.service.bulk_import import (
get_collection_template,
get_document_template,
get_event_template,
get_family_template,
import_data,
)
from app.service.validation import validate_ingest_data
from app.service.validation import validate_bulk_import_data

ingest_router = r = APIRouter()
bulk_import_router = r = APIRouter()

_LOGGER = logging.getLogger(__name__)


@r.get(
"/ingest/template/{corpus_type}",
"/bulk-import/template/{corpus_type}",
response_model=Json,
status_code=status.HTTP_200_OK,
)
async def get_ingest_template(corpus_type: str) -> Json:
async def get_bulk_import_template(corpus_type: str) -> Json:
"""
Data ingest template endpoint.
Data bulk import template endpoint.
:param str corpus_type: type of the corpus of data to ingest.
:return Json: json representation of ingest template.
:param str corpus_type: type of the corpus of data to import.
:return Json: json representation of bulk import template.
"""

_LOGGER.info(f"Creating template for corpus type: {corpus_type}")
Expand All @@ -54,30 +54,30 @@ async def get_ingest_template(corpus_type: str) -> Json:


@r.post(
"/ingest/{corpus_import_id}",
"/bulk-import/{corpus_import_id}",
response_model=Json,
status_code=status.HTTP_202_ACCEPTED,
)
async def ingest(
async def bulk_import(
request: Request,
new_data: UploadFile,
data: UploadFile,
corpus_import_id: str,
background_tasks: BackgroundTasks,
) -> Json:
"""
Bulk import endpoint.
:param UploadFile new_data: file containing json representation of data to ingest.
:return Json: json representation of the data to ingest.
:param UploadFile new_data: file containing json representation of data to import.
:return Json: json representation of the data to import.
"""
_LOGGER.info(
f"User {request.state.user} triggered bulk import for corpus: {corpus_import_id}"
)

try:
content = await new_data.read()
content = await data.read()
data_dict = json.loads(content)
validate_ingest_data(data_dict)
validate_bulk_import_data(data_dict)

background_tasks.add_task(import_data, data_dict, corpus_import_id)

Expand Down
18 changes: 9 additions & 9 deletions app/clients/aws/s3bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,25 +106,25 @@ def upload_json_to_s3(
raise


def upload_ingest_json_to_s3(
ingest_id: str, corpus_import_id: str, data: dict[str, Any]
def upload_bulk_import_json_to_s3(
import_id: str, corpus_import_id: str, data: dict[str, Any]
) -> None:
"""
Upload an ingest JSON file to S3
Upload an bulk import JSON file to S3
:param str ingest_id: The uuid of the ingest action.
:param str corpus_import_id: The import_id of the corpus the ingest data belongs to.
:param dict[str, Any] json_data: The ingest json data to be uploaded to S3.
:param str import_id: The uuid of the bulk import action.
:param str corpus_import_id: The id of the corpus the bulk import data belongs to.
:param dict[str, Any] json_data: The bulk import json data to be uploaded to S3.
"""
ingest_upload_bucket = os.environ["BULK_IMPORT_BUCKET"]
bulk_import_upload_bucket = os.environ["BULK_IMPORT_BUCKET"]
current_timestamp = datetime.now().strftime("%m-%d-%YT%H:%M:%S")

filename = f"{ingest_id}-{corpus_import_id}-{current_timestamp}.json"
filename = f"{import_id}-{corpus_import_id}-{current_timestamp}.json"

s3_client = boto3.client("s3")

context = S3UploadContext(
bucket_name=ingest_upload_bucket,
bucket_name=bulk_import_upload_bucket,
object_name=filename,
)
upload_json_to_s3(s3_client, context, data)
Expand Down
6 changes: 3 additions & 3 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
from app.api.api_v1.routers import (
analytics_router,
auth_router,
bulk_import_router,
collections_router,
config_router,
corpora_router,
document_router,
event_router,
families_router,
ingest_router,
)
from app.api.api_v1.routers.auth import check_user_auth
from app.clients.db.session import engine
Expand Down Expand Up @@ -97,9 +97,9 @@ async def lifespan(app_: FastAPI):
)

app.include_router(
ingest_router,
bulk_import_router,
prefix="/api/v1",
tags=["ingest"],
tags=["bulk-import"],
dependencies=[Depends(check_user_auth)],
)

Expand Down
6 changes: 3 additions & 3 deletions app/model/authorisation.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class AuthEndpoint(str, enum.Enum):
CONFIG = "CONFIG"
ANALYTICS = "ANALYTICS"
EVENT = "EVENTS"
INGEST = "INGEST"
BULK_IMPORT = "BULK-IMPORT"
CORPUS = "CORPORA"


Expand Down Expand Up @@ -61,8 +61,8 @@ class AuthEndpoint(str, enum.Enum):
AuthOperation.UPDATE: AuthAccess.USER,
AuthOperation.DELETE: AuthAccess.USER,
},
# Ingest
AuthEndpoint.INGEST: {
# Bulk Import
AuthEndpoint.BULK_IMPORT: {
AuthOperation.CREATE: AuthAccess.SUPER,
AuthOperation.READ: AuthAccess.SUPER,
},
Expand Down
39 changes: 20 additions & 19 deletions app/model/ingest.py → app/model/bulk_import.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
from datetime import datetime
from typing import Optional
from typing import Dict, List, Optional, Union

from pydantic import AnyHttpUrl, BaseModel
from pydantic import AnyHttpUrl, BaseModel, RootModel

from app.model.collection import CollectionCreateDTO
from app.model.document import DocumentCreateDTO
from app.model.event import EventCreateDTO
from app.model.family import FamilyCreateDTO
from app.model.general import Json

Metadata = RootModel[Dict[str, Union[str, List[str]]]]

class IngestCollectionDTO(BaseModel):
"""Representation of a collection for ingest."""

class BulkImportCollectionDTO(BaseModel):
"""Representation of a collection for bulk import."""

import_id: str
title: str
description: str

def to_collection_create_dto(self) -> CollectionCreateDTO:
"""
Convert IngestCollectionDTO to CollectionCreateDTO.
Convert BulkImportCollectionDTO to CollectionCreateDTO.
:return CollectionCreateDTO: Converted CollectionCreateDTO instance.
"""
Expand All @@ -30,21 +31,21 @@ def to_collection_create_dto(self) -> CollectionCreateDTO:
)


class IngestFamilyDTO(BaseModel):
"""Representation of a family for ingest."""
class BulkImportFamilyDTO(BaseModel):
"""Representation of a family for bulk import."""

import_id: str
title: str
summary: str
geographies: list[str]
category: str
metadata: Json
metadata: Metadata
collections: list[str]
corpus_import_id: str

def to_family_create_dto(self, corpus_import_id: str) -> FamilyCreateDTO:
"""
Convert IngestFamilyDTO to FamilyCreateDTO.
Convert BulkImportFamilyDTO to FamilyCreateDTO.
:return FamilyCreateDTO: Converted FamilyCreateDTO instance.
"""
Expand All @@ -54,14 +55,14 @@ def to_family_create_dto(self, corpus_import_id: str) -> FamilyCreateDTO:
summary=self.summary,
geography=self.geographies,
category=self.category,
metadata=self.metadata,
metadata=self.metadata.model_dump(),
collections=self.collections,
corpus_import_id=corpus_import_id,
)


class IngestEventDTO(BaseModel):
"""Representation of an event for ingest."""
class BulkImportEventDTO(BaseModel):
"""Representation of an event for bulk import."""

import_id: str
family_import_id: str
Expand All @@ -72,7 +73,7 @@ class IngestEventDTO(BaseModel):

def to_event_create_dto(self) -> EventCreateDTO:
"""
Convert IngestEventDTO to EventCreateDTO.
Convert BulkImportEventDTO to EventCreateDTO.
:return EventCreateDTO: Converted EventCreateDTO instance.
"""
Expand All @@ -85,20 +86,20 @@ def to_event_create_dto(self) -> EventCreateDTO:
)


class IngestDocumentDTO(BaseModel):
"""Representation of a document for ingest."""
class BulkImportDocumentDTO(BaseModel):
"""Representation of a document for bulk import."""

import_id: str
family_import_id: str
variant_name: Optional[str] = None
metadata: Json
metadata: Metadata
title: str
source_url: Optional[AnyHttpUrl] = None
user_language_name: Optional[str] = None

def to_document_create_dto(self) -> DocumentCreateDTO:
"""
Convert IngestDocumentDTO to DocumentCreateDTO.
Convert BulkImportDocumentDTO to DocumentCreateDTO.
:return DocumentCreateDTO: Converted DocumentCreateDTO instance.
"""
Expand All @@ -107,7 +108,7 @@ def to_document_create_dto(self) -> DocumentCreateDTO:
import_id=self.import_id,
family_import_id=self.family_import_id,
variant_name=self.variant_name,
metadata=self.metadata,
metadata=self.metadata.model_dump(),
title=self.title,
source_url=self.source_url,
user_language_name=self.user_language_name,
Expand Down
26 changes: 10 additions & 16 deletions app/repository/family.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from db_client.models.organisation.users import Organisation
from sqlalchemy import Column, and_
from sqlalchemy import delete as db_delete
from sqlalchemy import desc, func, or_
from sqlalchemy import desc, or_
from sqlalchemy import update as db_update
from sqlalchemy.exc import NoResultFound, OperationalError
from sqlalchemy.orm import Query, Session
Expand All @@ -34,45 +34,39 @@

_LOGGER = logging.getLogger(__name__)

FamilyGeoMetaOrg = Tuple[Family, str, FamilyMetadata, Corpus, Organisation]
FamilyGeoMetaOrg = Tuple[Family, Geography, FamilyMetadata, Corpus, Organisation]


def _get_query(db: Session) -> Query:
# NOTE: SqlAlchemy will make a complete hash of query generation
# if columns are used in the query() call. Therefore, entire
# objects are returned.
geo_subquery = (
db.query(
func.min(Geography.value).label("value"),
FamilyGeography.family_import_id,
)
.join(FamilyGeography, FamilyGeography.geography_id == Geography.id)
.filter(FamilyGeography.family_import_id == Family.import_id)
.group_by(Geography.value, FamilyGeography.family_import_id)
).subquery("geo_subquery")

return (
db.query(Family, geo_subquery.c.value, FamilyMetadata, Corpus, Organisation) # type: ignore
db.query(Family, Geography, FamilyMetadata, Corpus, Organisation)
.join(FamilyGeography, FamilyGeography.family_import_id == Family.import_id)
.join(
Geography,
Geography.id == FamilyGeography.geography_id,
)
.join(FamilyMetadata, FamilyMetadata.family_import_id == Family.import_id)
.join(FamilyCorpus, FamilyCorpus.family_import_id == Family.import_id)
.join(Corpus, Corpus.import_id == FamilyCorpus.corpus_import_id)
.join(Organisation, Corpus.organisation_id == Organisation.id)
.filter(geo_subquery.c.family_import_id == Family.import_id) # type: ignore
)


def _family_to_dto(
db: Session, fam_geo_meta_corp_org: FamilyGeoMetaOrg
) -> FamilyReadDTO:
fam, geo_value, meta, corpus, org = fam_geo_meta_corp_org

metadata = cast(dict, meta.value)
org = cast(str, org.name)

return FamilyReadDTO(
import_id=str(fam.import_id),
title=str(fam.title),
summary=str(fam.description),
geography=geo_value,
geography=str(geo_value.value),
category=str(fam.family_category),
status=str(fam.family_status),
metadata=metadata,
Expand Down
Loading

0 comments on commit 17cf05b

Please sign in to comment.