diff --git a/data_replication_parametrized_audit_os.py b/data_replication_parametrized_audit_os.py index c55187a..41cd9a2 100644 --- a/data_replication_parametrized_audit_os.py +++ b/data_replication_parametrized_audit_os.py @@ -109,18 +109,26 @@ def get_active_tables(mstr_schema,app_name): return rows # In[9]: Function to extract data from Oracle -def extract_from_oracle(table_name,source_schema): +def extract_from_oracle(table_name,source_schema,customsql_ind,customsql_query): # Acquire a connection from the pool oracle_connection = OrcPool.acquire() oracle_cursor = oracle_connection.cursor() try: - # Use placeholders in the query and bind the table name as a parameter - sql_query = f'SELECT * FROM {source_schema}.{table_name}' - print(sql_query) - oracle_cursor.execute(sql_query) - rows = oracle_cursor.fetchall() - OrcPool.release(oracle_connection) - return rows + if customsql_ind == "N": + # Use placeholders in the query and bind the table name as a parameter + sql_query = f'SELECT * FROM {source_schema}.{table_name}' + print(sql_query) + oracle_cursor.execute(sql_query) + rows = oracle_cursor.fetchall() + OrcPool.release(oracle_connection) + return rows + else: + sql_query=customsql_query + print(sql_query) + oracle_cursor.execute(sql_query) + rows = oracle_cursor.fetchall() + OrcPool.release(oracle_connection) + return rows except Exception as e: audit_batch_status_insert(table_name,'failed') print(f"Error extracting data from Oracle: {str(e)}") @@ -160,10 +168,10 @@ def load_into_postgres(table_name, data,target_schema): PgresPool.putconn(postgres_connection) # In[11]: Function to call both extract and load functions -def load_data_from_src_tgt(table_name,source_schema,target_schema): +def load_data_from_src_tgt(table_name,source_schema,target_schema,customsql_ind,customsql_query): # Extract data from Oracle print(f'Source: Thread {table_name} started at ' + datetime.now().strftime("%H:%M:%S")) - oracle_data = extract_from_oracle(table_name,source_schema) # Ensure table name is in uppercase + oracle_data = extract_from_oracle(table_name,customsql_ind,customsql_query) # Ensure table name is in uppercase print(f'Source: Extraction for {table_name} completed at ' + datetime.now().strftime("%H:%M:%S")) if oracle_data: @@ -177,7 +185,7 @@ def load_data_from_src_tgt(table_name,source_schema,target_schema): # Main ETL process active_tables_rows =get_active_tables(mstr_schema,app_name) #print(active_tables_rows) - tables_to_extract = [(row[2],row[1],row[3]) for row in active_tables_rows] + tables_to_extract = [(row[2],row[1],row[3],row[10],row[11]) for row in active_tables_rows] print(f"tables to extract are {tables_to_extract}") print(f'No of concurrent tasks:{concurrent_tasks}') @@ -186,7 +194,7 @@ def load_data_from_src_tgt(table_name,source_schema,target_schema): # Using ThreadPoolExecutor to run tasks concurrently with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_tasks) as executor: # Submit tasks to the executor - future_to_table = {executor.submit(load_data_from_src_tgt, table[0],table[1],table[2]): table for table in tables_to_extract} + future_to_table = {executor.submit(load_data_from_src_tgt, table[0],table[1],table[2],table[3],table[4]): table for table in tables_to_extract} # Wait for all tasks to complete concurrent.futures.wait(future_to_table)