-
Notifications
You must be signed in to change notification settings - Fork 0
/
database_functions.py
237 lines (187 loc) · 7.78 KB
/
database_functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
import pymysql
import pandas as pd
from datetime import datetime
import time
import json
import logging
from config import * # Importing configurations
# conexión a la base de datos desde el exterior de Hostinger
def connect_to_db():
try:
connection = pymysql.connect(host=db_config['host'],
port=int(db_config.get('port', 3306)), # Default port is 3306
user=db_config['user'],
password=db_config['password'],
db=db_config['database'])
return connection
except pymysql.MySQLError as e:
print(f"Error connecting to the database: {e}")
return None
def insert_data_into_db(df, table, date_to_insert, incremental=False):
max_retries = 3
retries = 0
while retries < max_retries:
connection = connect_to_db()
if connection:
break
print(f"Retrying connection ({retries + 1}/{max_retries})")
retries += 1
time.sleep(5) # Wait for 5 seconds before retrying
if connection is None:
print("Failed to establish a connection after several attempts.")
return
cursor = connection.cursor()
try:
# Obtiene los nombres de las columnas de la tabla destino
cursor.execute(f"DESCRIBE {table};")
columns = [column[0] for column in cursor.fetchall()]
# Asegurarse de que 'date' está en la lista de columnas
if 'date' not in columns:
print(f"La tabla {table} debe tener una columna 'date'")
return
# Delete existing records if incremental is True
if incremental:
cursor.execute(f"DELETE FROM {table} WHERE date = %s", (date_to_insert,))
connection.commit()
# inicializa los datos a insertar en la tabla
data_to_insert = []
for index, row in df.iterrows():
row_data = [date_to_insert] + [row[col] for col in df.columns]
# el numero de peregrinos siempre es mayor que cero porque la
# query a PBI lo utiliza como filtro
data_to_insert.append(tuple(row_data))
if len(data_to_insert) >= 500:
all_columns = ', '.join(columns)
update_columns = ', '.join([f"{col}=VALUES({col})" for col in columns])
placeholders = ', '.join(['%s'] * len(row_data))
insert_query = f"INSERT INTO {table} ({all_columns}) VALUES ({placeholders}) ON DUPLICATE KEY UPDATE {update_columns}"
cursor.executemany(insert_query, data_to_insert)
connection.commit()
data_to_insert = []
if len(data_to_insert) > 0:
all_columns = ', '.join(columns)
update_columns = ', '.join([f"{col}=VALUES({col})" for col in columns])
placeholders = ', '.join(['%s'] * len(row_data))
insert_query = f"INSERT INTO {table} ({all_columns}) VALUES ({placeholders}) ON DUPLICATE KEY UPDATE {update_columns}"
cursor.executemany(insert_query, data_to_insert)
connection.commit()
except Exception as e:
print(f"Error during data insertion: {e}")
connection.rollback()
finally:
cursor.close()
connection.close()
def insert_data_into_db_last_day(data_dict):
# Guarda los datos de resumen de pilgrims sencillos
# Establish a connection to the MySQL database
connection = pymysql.connect(
host=db_config['host'],
user=db_config['user'],
password=db_config['password'],
database=db_config['database']
)
try:
# Create a cursor object
cursor = connection.cursor()
# SQL query to insert the data into the table
sql_query = """INSERT IGNORE INTO stats_pilgrims_last_day (date, pilgrims)
VALUES (%s, %s)"""
# Values to insert into the table
values = (data_dict.get('date', None), data_dict.get('pilgrims', None))
# Execute the SQL query
cursor.execute(sql_query, values)
# Commit the transaction
connection.commit()
except pymysql.MySQLError as e:
print(f"An error occurred: {e}")
finally:
# Close the database connection
connection.close()
def get_queries_from_db():
max_retries = 3
retries = 0
while retries < max_retries:
connection = connect_to_db()
if connection:
break
logging.warning(f"Retrying connection ({retries + 1}/{max_retries})")
retries += 1
time.sleep(15) # Wait for 15 seconds before retrying
if connection is None:
logging.error("Failed to establish a connection after several attempts.")
return None, None, None
cursor = connection.cursor()
# Buscar en la tabla db_queries_stats
cursor.execute("SELECT template, query FROM db_queries_stats")
# Inicializar la variable
query_date_last_day = None
template_query_year_month = None
template_query_date = None
# Iterar sobre los resultados
for row in cursor:
if row[0] == "query_date_last_day":
query_date_last_day = row[1]
if row[0] == "query_template_all_columns_year_month":
template_query_year_month = row[1]
if row[0] == "query_template_all_columns_any_day":
template_query_date = row[1]
# Cerrar cursor y conexión
cursor.close()
connection.close()
return json.loads(query_date_last_day), json.loads(template_query_year_month), json.loads(template_query_date)
def get_sum_pilgrims_last_day():
"""
This function reads data from the "db_caminos" table for the last recorded day
and returns the sum of the "pilgrims" column.
"""
# Establish a connection to the database
connection = connect_to_db()
if connection is None:
print("Failed to establish a connection.")
return None
cursor = connection.cursor()
# Step 1: Determine the last recorded date
cursor.execute(f"SELECT MAX(date) FROM {tabla_check};")
last_date = cursor.fetchone()[0]
if not last_date:
print("No records found in the table.")
cursor.close()
connection.close()
return None, None
last_date_str = last_date.strftime('%Y-%m-%d')
# Step 2: Query the data for the last date and sum the "pilgrims" column
sql_query = f"SELECT SUM(pilgrims) FROM {tabla_check} WHERE date = '{last_date_str}';"
cursor.execute(sql_query)
total_pilgrims = cursor.fetchone()[0]
# Close the cursor and the connection
cursor.close()
connection.close()
return last_date, total_pilgrims
def get_pilgrims_last_day():
"""
This function reads data from the "db_caminos" table for the last recorded day
and returns the sum of the "pilgrims" column.
"""
# Establish a connection to the database
connection = connect_to_db()
if connection is None:
print("Failed to establish a connection.")
return None
cursor = connection.cursor()
# Step 1: Determine the last recorded date
cursor.execute(f"SELECT MAX(date) FROM {tabla_pilgrims_last_day};")
last_date = cursor.fetchone()[0]
if not last_date:
print("No records found in the table.")
cursor.close()
connection.close()
return None, None
last_date_str = last_date.strftime('%Y-%m-%d')
# Step 2: Query the data for the last date and sum the "pilgrims" column
sql_query = f"SELECT pilgrims FROM {tabla_pilgrims_last_day} WHERE date = '{last_date_str}';"
cursor.execute(sql_query)
total_pilgrims = cursor.fetchone()[0]
# Close the cursor and the connection
cursor.close()
connection.close()
return last_date_str, total_pilgrims