Skip to content

Commit

Permalink
Feature/alcs 1130 Notice of intent submission base etl (#1006)
Browse files Browse the repository at this point in the history
notice of intent submission base import
connection.commit fix for clean functions
fix for exhausted connection pool
  • Loading branch information
mhuseinov authored Sep 25, 2023
1 parent a7dc78d commit de87b01
Show file tree
Hide file tree
Showing 16 changed files with 219 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,5 @@ def clean_application_submission_status_emails(conn=None):
print("".join(traceback.format_exception(None, error, error.__traceback__)))
cursor.close()
conn.close()

conn.commit()
2 changes: 2 additions & 0 deletions bin/migrate-oats-data/applications/base_applications.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,5 @@ def clean_applications(conn=None):
cursor.execute("DROP TABLE IF EXISTS oats.alcs_etl_application_exclude")
cursor.execute("DROP TABLE IF EXISTS oats.alcs_etl_application_duplicate")
print(f"Tempory tables dropped")

conn.commit()
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
from common import (
ALRChangeCode,
log_end,
log_start,
)
from common import ALRChangeCode, log_end, log_start, OATS_ETL_USER
from .submap import (
add_direction_field,
map_direction_values,
Expand All @@ -21,7 +17,6 @@
from common import BATCH_UPLOAD_SIZE
from psycopg2.extras import execute_batch, RealDictCursor
import traceback
from enum import Enum
import json

etl_name = "alcs_app_sub"
Expand Down Expand Up @@ -304,6 +299,8 @@ def clean_application_submission(conn=None):
print("Start application_submission cleaning")
with conn.cursor() as cursor:
cursor.execute(
"DELETE FROM alcs.application_submission a WHERE a.audit_created_by = 'oats_etl'"
f"DELETE FROM alcs.application_submission a WHERE a.audit_created_by = '{OATS_ETL_USER}'"
)
print(f"Deleted items count = {cursor.rowcount}")

conn.commit()
8 changes: 6 additions & 2 deletions bin/migrate-oats-data/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@

def inject_conn_pool(func):
def wrapper(*args, **kwargs):
with connection_pool.getconn() as conn:
return func(conn, *args, **kwargs)
conn = connection_pool.getconn()
try:
result = func(conn, *args, **kwargs)
finally:
connection_pool.putconn(conn)
return result

return wrapper
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ def process_application_documents(conn=None, batch_size=10000):
"""
with conn.cursor() as cursor:
with open(
"sql/documents_app/alcs_documents_to_app_documents_total_count.sql", "r", encoding="utf-8"
"sql/documents_app/alcs_documents_to_app_documents_total_count.sql",
"r",
encoding="utf-8",
) as sql_file:
count_query = sql_file.read()
cursor.execute(count_query)
Expand All @@ -50,7 +52,9 @@ def process_application_documents(conn=None, batch_size=10000):
last_document_id = 0

with open(
"sql/documents_app/alcs_documents_to_app_documents.sql", "r", encoding="utf-8"
"sql/documents_app/alcs_documents_to_app_documents.sql",
"r",
encoding="utf-8",
) as sql_file:
documents_to_insert_sql = sql_file.read()
while True:
Expand Down Expand Up @@ -97,3 +101,5 @@ def clean_application_documents(conn=None):
)
conn.commit()
print(f"Deleted items count = {cursor.rowcount}")

conn.commit()
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ def process_noi_documents(conn=None, batch_size=10000):
"""
with conn.cursor() as cursor:
with open(
"sql/documents_noi/alcs_documents_to_noi_documents_total_count.sql", "r", encoding="utf-8"
"sql/documents_noi/alcs_documents_to_noi_documents_total_count.sql",
"r",
encoding="utf-8",
) as sql_file:
count_query = sql_file.read()
cursor.execute(count_query)
Expand All @@ -50,7 +52,9 @@ def process_noi_documents(conn=None, batch_size=10000):
last_document_id = 0

with open(
"sql/documents_noi/alcs_documents_to_noi_documents.sql", "r", encoding="utf-8"
"sql/documents_noi/alcs_documents_to_noi_documents.sql",
"r",
encoding="utf-8",
) as sql_file:
documents_to_insert_sql = sql_file.read()
while True:
Expand Down Expand Up @@ -97,3 +101,5 @@ def clean_noi_documents(conn=None):
)
conn.commit()
print(f"Deleted items count = {cursor.rowcount}")

conn.commit()
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ def process_documents(conn=None, batch_size=10000):
"""
with conn.cursor() as cursor:
with open(
"sql/documents_app/oats_documents_to_alcs_documents_app_total_count.sql", "r", encoding="utf-8"
"sql/documents_app/oats_documents_to_alcs_documents_app_total_count.sql",
"r",
encoding="utf-8",
) as sql_file:
count_query = sql_file.read()
cursor.execute(count_query)
Expand All @@ -48,7 +50,11 @@ def process_documents(conn=None, batch_size=10000):
successful_inserts_count = 0
last_document_id = 0

with open("sql/documents_app/oats_documents_to_alcs_documents_app.sql", "r", encoding="utf-8") as sql_file:
with open(
"sql/documents_app/oats_documents_to_alcs_documents_app.sql",
"r",
encoding="utf-8",
) as sql_file:
documents_to_insert_sql = sql_file.read()
while True:
cursor.execute(
Expand Down Expand Up @@ -91,3 +97,5 @@ def clean_documents(conn=None):
cursor.execute("DELETE FROM alcs.document WHERE audit_created_by = 'oats_etl';")
conn.commit()
print(f"Deleted items count = {cursor.rowcount}")

conn.commit()
7 changes: 3 additions & 4 deletions bin/migrate-oats-data/menu/commands/clean_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@
clean_application_submission_status_emails,
)
from noi.noi_submission_status_email import (
clean_application_submission_status_emails,
clean_notice_of_intent_submission_status_emails,
)
from noi import clean_notice_of_intents
from documents import (
clean_application_documents,
clean_documents,
Expand All @@ -25,8 +24,8 @@ def clean_all(console, args):
clean_documents()
clean_application_submission()
clean_applications()
clean_notice_of_intents()
clean_application_submission_status_emails(),
clean_notice_of_intent()
clean_application_submission_status_emails(),
clean_notice_of_intent_submission_status_emails(),

console.log("Done")
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def process_notice_of_intent_submission_status_emails(conn=None):


@inject_conn_pool
def clean_application_submission_status_emails(conn=None):
def clean_notice_of_intent_submission_status_emails(conn=None):
update_query = f"""
UPDATE alcs.notice_of_intent_submission_to_submission_status status
SET email_sent_date = NULL
Expand All @@ -38,3 +38,5 @@ def clean_application_submission_status_emails(conn=None):
print("".join(traceback.format_exception(None, error, error.__traceback__)))
cursor.close()
conn.close()

conn.commit()
8 changes: 5 additions & 3 deletions bin/migrate-oats-data/noi/notice_of_intent_init.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import traceback
from db import inject_conn_pool
from common import log_end, log_start
from common import log_end, log_start, OATS_ETL_USER


def noi_insert_query(number_of_rows_to_insert):
Expand Down Expand Up @@ -58,7 +58,7 @@ def init_notice_of_intents(conn=None, batch_size=10000):
)

print(
f"retrieved/inserted items count: {applications_to_be_inserted_count}; total successfully inserted/updated NOIs so far {successful_inserts_count}; last inserted noi_applidation_id: {last_application_id}"
f"retrieved/inserted items count: {applications_to_be_inserted_count}; total successfully inserted/updated NOIs so far {successful_inserts_count}; last inserted noi_id: {last_application_id}"
)
except Exception as error:
conn.rollback()
Expand All @@ -83,6 +83,8 @@ def clean_notice_of_intents(conn=None):
print("Start NOI cleaning")
with conn.cursor() as cursor:
cursor.execute(
"DELETE FROM alcs.notice_of_intent a WHERE a.audit_created_by = 'oats_etl'"
f"DELETE FROM alcs.notice_of_intent noi WHERE noi.audit_created_by = '{OATS_ETL_USER}'"
)
print(f"Deleted items count = {cursor.rowcount}")

conn.commit()
8 changes: 7 additions & 1 deletion bin/migrate-oats-data/noi/notice_of_intent_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@
process_alcs_notice_of_intent_decision_date,
)
from .notice_of_intent_init import init_notice_of_intents, clean_notice_of_intents
from .notice_of_intent_submissions import (
clean_notice_of_intent_submissions,
init_notice_of_intent_submissions,
)


def init_notice_of_intent(batch_size):

init_notice_of_intents(batch_size=batch_size)


def clean_notice_of_intent():
clean_notice_of_intent_submissions()
clean_notice_of_intents()


Expand All @@ -21,3 +25,5 @@ def process_notice_of_intent(batch_size):
process_alcs_notice_of_intent_base_fields(batch_size=batch_size)

process_alcs_notice_of_intent_decision_date(batch_size=batch_size)

init_notice_of_intent_submissions(batch_size=batch_size)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .notice_of_intent_submission_init import init_notice_of_intent_submissions, clean_notice_of_intent_submissions
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
from common import log_end, log_start, OATS_ETL_USER
from db import inject_conn_pool
from common import BATCH_UPLOAD_SIZE
from psycopg2.extras import execute_batch, RealDictCursor
import traceback

etl_name = "init_notice_of_intent_submissions"


@inject_conn_pool
def init_notice_of_intent_submissions(conn=None, batch_size=BATCH_UPLOAD_SIZE):
"""
This function is responsible for initializing the notice_of_intent_submission in ALCS.
Args:
conn (psycopg2.extensions.connection): PostgreSQL database connection. Provided by the decorator.
batch_size (int): The number of items to process at once. Defaults to BATCH_UPLOAD_SIZE.
"""

log_start(etl_name)
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
with open(
"noi/sql/notice_of_intent_submission/notice_of_intent_submission_init_count.sql",
"r",
encoding="utf-8",
) as sql_file:
count_query = sql_file.read()
cursor.execute(count_query)
count_total = dict(cursor.fetchone())["count"]
print("- Total Notice of Intent Submission data to insert: ", count_total)

failed_inserts = 0
successful_inserts_count = 0
last_submission_id = 0

with open(
"noi/sql/notice_of_intent_submission/notice_of_intent_submission_init.sql",
"r",
encoding="utf-8",
) as sql_file:
submission_sql = sql_file.read()
while True:
cursor.execute(
f"{submission_sql} WHERE noig.alr_application_id > {last_submission_id} ORDER BY noig.alr_application_id;"
)

rows = cursor.fetchmany(batch_size)

if not rows:
break
try:
submissions_to_be_inserted_count = len(rows)

_insert_notice_of_intent_submissions(conn, batch_size, cursor, rows)

successful_inserts_count = (
successful_inserts_count + submissions_to_be_inserted_count
)
last_submission_id = dict(rows[-1])["alr_application_id"]

print(
f"retrieved/inserted items count: {submissions_to_be_inserted_count}; total successfully inserted submissions so far {successful_inserts_count}; last inserted application_id: {last_submission_id}"
)
except Exception as e:
conn.rollback()
str_err = str(e)
trace_err = traceback.format_exc()
print(str_err)
print(trace_err)
log_end(etl_name, str_err, trace_err)
failed_inserts = count_total - successful_inserts_count
last_submission_id = last_submission_id + 1

print("Total amount of successful inserts:", successful_inserts_count)
print("Total failed inserts:", failed_inserts)
log_end(etl_name)


def _insert_notice_of_intent_submissions(conn, batch_size, cursor, rows):
""" """
query = _get_insert_query()
parsed_data_list = _prepare_oats_alr_applications_data(rows)

if len(parsed_data_list) > 0:
execute_batch(cursor, query, parsed_data_list, page_size=batch_size)

conn.commit()


def _get_insert_query():
query = f"""
INSERT INTO alcs.notice_of_intent_submission (
file_number,
local_government_uuid,
type_code,
applicant,
is_draft,
audit_created_by
)
VALUES (
%(file_number)s,
%(local_government_uuid)s,
%(type_code)s,
%(applicant)s,
false,
'{OATS_ETL_USER}'
)
"""
return query


def _prepare_oats_alr_applications_data(row_data_list):
data_list = []
for row in row_data_list:
data_list.append(dict(row))
return data_list


@inject_conn_pool
def clean_notice_of_intent_submissions(conn=None):
print("Start notice_of_intent_submissions cleaning")
with conn.cursor() as cursor:
cursor.execute(
f"DELETE FROM alcs.notice_of_intent_submission nois WHERE nois.audit_created_by = '{OATS_ETL_USER}'"
)
print(f"Deleted items count = {cursor.rowcount}")

conn.commit()
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ def process_alcs_notice_of_intent_base_fields(conn=None, batch_size=BATCH_UPLOAD

def _update_fee_fields_records(conn, batch_size, cursor, rows):
query = _get_update_query_from_oats_alr_applications_fields()
parsed_fee_data_list = _prepare_oats_alr_applications_data(rows)
parsed_data_list = _prepare_oats_alr_applications_data(rows)

if len(parsed_fee_data_list) > 0:
execute_batch(cursor, query, parsed_fee_data_list, page_size=batch_size)
if len(parsed_data_list) > 0:
execute_batch(cursor, query, parsed_data_list, page_size=batch_size)

conn.commit()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
WITH noi_components_grouped AS (
SELECT oaac.alr_application_id
FROM oats.oats_alr_appl_components oaac
JOIN oats.oats_alr_applications oaa ON oaa.alr_application_id = oaac.alr_application_id
WHERE oaa.application_class_code = 'NOI'
GROUP BY oaac.alr_application_id
HAVING count(oaac.alr_application_id) < 2 -- ignore notice of intents wit multiple components
)
SELECT noi.file_number,
noi.type_code,
noi.local_government_uuid,
oc.alr_change_code,
noig.alr_application_id,
noi.applicant,
noi.alr_area,
oc.alr_appl_component_id
FROM noi_components_grouped noig
LEFT JOIN alcs.notice_of_intent noi ON noi.file_number = noig.alr_application_id::TEXT
JOIN oats.oats_alr_appl_components oc ON noig.alr_application_id = oc.alr_application_id
Loading

0 comments on commit de87b01

Please sign in to comment.