Skip to content
This repository has been archived by the owner on Apr 17, 2024. It is now read-only.

Commit

Permalink
customsql
Browse files Browse the repository at this point in the history
  • Loading branch information
vishreddy01 committed Jan 25, 2024
1 parent 59eda78 commit 5aa0127
Showing 1 changed file with 20 additions and 12 deletions.
32 changes: 20 additions & 12 deletions data_replication_parametrized_audit_os.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,18 +109,26 @@ def get_active_tables(mstr_schema,app_name):
return rows

# In[9]: Function to extract data from Oracle
def extract_from_oracle(table_name,source_schema):
def extract_from_oracle(table_name,source_schema,customsql_ind,customsql_query):
# Acquire a connection from the pool
oracle_connection = OrcPool.acquire()
oracle_cursor = oracle_connection.cursor()
try:
# Use placeholders in the query and bind the table name as a parameter
sql_query = f'SELECT * FROM {source_schema}.{table_name}'
print(sql_query)
oracle_cursor.execute(sql_query)
rows = oracle_cursor.fetchall()
OrcPool.release(oracle_connection)
return rows
if customsql_ind == "N":
# Use placeholders in the query and bind the table name as a parameter
sql_query = f'SELECT * FROM {source_schema}.{table_name}'
print(sql_query)
oracle_cursor.execute(sql_query)
rows = oracle_cursor.fetchall()
OrcPool.release(oracle_connection)
return rows
else:
sql_query=customsql_query
print(sql_query)
oracle_cursor.execute(sql_query)
rows = oracle_cursor.fetchall()
OrcPool.release(oracle_connection)
return rows
except Exception as e:
audit_batch_status_insert(table_name,'failed')
print(f"Error extracting data from Oracle: {str(e)}")
Expand Down Expand Up @@ -160,10 +168,10 @@ def load_into_postgres(table_name, data,target_schema):
PgresPool.putconn(postgres_connection)

# In[11]: Function to call both extract and load functions
def load_data_from_src_tgt(table_name,source_schema,target_schema):
def load_data_from_src_tgt(table_name,source_schema,target_schema,customsql_ind,customsql_query):
# Extract data from Oracle
print(f'Source: Thread {table_name} started at ' + datetime.now().strftime("%H:%M:%S"))
oracle_data = extract_from_oracle(table_name,source_schema) # Ensure table name is in uppercase
oracle_data = extract_from_oracle(table_name,customsql_ind,customsql_query) # Ensure table name is in uppercase
print(f'Source: Extraction for {table_name} completed at ' + datetime.now().strftime("%H:%M:%S"))

if oracle_data:
Expand All @@ -177,7 +185,7 @@ def load_data_from_src_tgt(table_name,source_schema,target_schema):
# Main ETL process
active_tables_rows =get_active_tables(mstr_schema,app_name)
#print(active_tables_rows)
tables_to_extract = [(row[2],row[1],row[3]) for row in active_tables_rows]
tables_to_extract = [(row[2],row[1],row[3],row[10],row[11]) for row in active_tables_rows]

print(f"tables to extract are {tables_to_extract}")
print(f'No of concurrent tasks:{concurrent_tasks}')
Expand All @@ -186,7 +194,7 @@ def load_data_from_src_tgt(table_name,source_schema,target_schema):
# Using ThreadPoolExecutor to run tasks concurrently
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_tasks) as executor:
# Submit tasks to the executor
future_to_table = {executor.submit(load_data_from_src_tgt, table[0],table[1],table[2]): table for table in tables_to_extract}
future_to_table = {executor.submit(load_data_from_src_tgt, table[0],table[1],table[2],table[3],table[4]): table for table in tables_to_extract}

# Wait for all tasks to complete
concurrent.futures.wait(future_to_table)
Expand Down

0 comments on commit 5aa0127

Please sign in to comment.