Skip to content

Commit

Permalink
Issue Pyrrha-Platform#29 - Add batch capability for running analytics…
Browse files Browse the repository at this point in the history
… over historical results (e.g. after uploading data from SD cards)
  • Loading branch information
JSegrave-IBM committed Mar 23, 2021
1 parent e52ce67 commit 7bb26f5
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 4 deletions.
69 changes: 65 additions & 4 deletions src/GasExposureAnalytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,9 @@ def _calculate_TWA_and_gauge_for_all_firefighters(self, sensor_log_chunk_df, ff_
# current_utc_timestamp : The UTC datetime for which to calculate sensor analytics. Defaults to 'now' (UTC).
# commit : Utility flag for unit testing - defaults to committing analytic results to
# the database. Setting commit=False prevents unit tests from writing to the database.
def run_analytics (self, current_utc_timestamp=None, commit=True) :
# log_mins : How often to log an informational message stating which minute is currently being processed. Defaults
# to every '1' min. Can be set to (eg) every '15' mins when running batches, so the logs are readable.
def run_analytics (self, current_utc_timestamp=None, commit=True, log_mins=1) :

# Get the desired timeframe for the analytics run and standardise it to UTC.
if current_utc_timestamp is None:
Expand All @@ -528,9 +530,11 @@ def run_analytics (self, current_utc_timestamp=None, commit=True) :
# buffer for the data.
timestamp_key = current_utc_timestamp.floor(freq='min') - pd.Timedelta(minutes = 1)

message = ("Running Prometeo Analytics for minute key '%s'" % (timestamp_key.isoformat()))
if not self._from_db : message += " (local CSV file mode)"
self.logger.info(message)
# Log progress regularly (e.g. by default, log_mins is 'every 1 min', but could be set to 'every 15 mins').
if (timestamp_key == timestamp_key.floor(str(log_mins) + 'T')) :
message = ("Running Prometeo Analytics for minute key '%s'" % (timestamp_key.floor(str(log_mins) + 'T')))
if not self._from_db : message += " (local CSV file mode)"
self.logger.info(message)

# Read a block of sensor logs from the DB, covering the longest window we're calculating over (usually 8hrs).
# Note: This has the advantage of always including all known sensor data, even when that data was delayed due
Expand All @@ -554,3 +558,60 @@ def run_analytics (self, current_utc_timestamp=None, commit=True) :
analytics_df.to_sql(ANALYTICS_TABLE, self._db_engine, if_exists='append', dtype={FIREFIGHTER_ID_COL:FIREFIGHTER_ID_COL_TYPE})

return analytics_df


# This is the batched version of 'main' - given a minute-by-minute playback schedule, it runs all of the core
# analytics for Prometeo for each of those minutes.
# playback_schedule : A list of UTC datetimes for which to calculate sensor analytics.
# commit : Utility flag for unit testing - defaults to committing analytic results to
# the database. Setting commit=False prevents unit tests from writing to the database.
def batch_run_analytics (self, playback_schedule=[], commit=True) :

# Calculate exposure for every minute in the playback schedule
all_results = []
for time in playback_schedule :

# Calculate exposure
result = self.run_analytics(time, commit, log_mins=15) # only log every 15 mins, so logs remain readable
if result is not None :
all_results.append(result)

if all_results :
return pd.concat(all_results)
else :
return None


# This is a variant of the batched version - given a date, it runs the core analytics for Prometeo for all available
# sensor data on that date.
# date : A UTC date (e.g. '2021-03-01') over which to calculate sensor analytics. Must not include time.
# commit : Utility flag for unit testing - defaults to committing analytic results to
# the database. Setting commit=False prevents unit tests from writing to the database.
def batch_run_analytics_by_date (self, date, commit=True) :

# Given a date, find the start and end times for the sensor records on that date
start_time, end_time = None, None
if self._from_db :
sql = ("SELECT MIN("+TIMESTAMP_COL+") AS start_time, MAX("+TIMESTAMP_COL+") AS end_time FROM "+SENSOR_LOG_TABLE+" WHERE DATE("+TIMESTAMP_COL+") = '"+date+"'")
start_time, end_time = pd.read_sql_query(sql, self._db_engine).iloc[0,:]
else :
index = self._sensor_log_from_csv_df.sort_index().loc[date:date].index
start_time = index.min()
end_time = index.max()

# Adjust the end - should allow for the longest time-weighted averaging window to be fully-reported.
longest_window_in_mins = max([window['mins'] for window in self.WINDOWS_AND_LIMITS])
end_time = end_time + pd.Timedelta(minutes=longest_window_in_mins)

# Log progress regularly (e.g. by default, log_mins is 'every 1 min', but could be set to 'every 15 mins').
message = ("Running Prometeo Analytics over batch period '%s' - '%s'" % (start_time.isoformat(), end_time.isoformat()))
if not self._from_db : message += " (local CSV file mode)"
self.logger.info(message)

# Get a minute-by-minute playback schedule over which to run the analytics
playback_schedule = pd.date_range(start=start_time.floor('min'), end=end_time.floor('min'), freq='min').to_list()

# Calculate exposure for every minute in the playback schedule
all_results_df = self.batch_run_analytics(playback_schedule, commit)

return all_results_df
33 changes: 33 additions & 0 deletions src/core_decision_flask_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
FIREFIGHTER_ID_COL = 'firefighter_id'
TIMESTAMP_COL = 'timestamp_mins'
STATUS_LED_COL = 'analytics_status_LED'
DATE_PARAMETER = 'date'

# We initialize the prometeo Analytics engine.
perMinuteAnalytics = GasExposureAnalytics()
Expand Down Expand Up @@ -167,6 +168,38 @@ def getStatusDetails():
logger.error(f'Internal Server Error: {e}')
abort(500)


@app.route('/batch_run_analytics_by_date', methods=['GET'])
def batch_run_analytics_by_date():

try:
date = request.args.get(DATE_PARAMETER)

# Return 404 (Not Found) if the record IDs are invalid
if (date is None) :
logger.error('Missing parameters : '+DATE_PARAMETER+' : '+str(date))
abort(404)

# Calculate exposure for every minute in the selected day
batchAnalytics = GasExposureAnalytics()
batch_results_df = batchAnalytics.batch_run_analytics_by_date(date)

# Return 404 (Not Found) if no record is found
if (batch_results_df is None) or (batch_results_df.empty):
logger.error('No analytic results were produced for : ' + DATE_PARAMETER + ' : ' + str(date))
abort(404)
else:
return

except HTTPException as e:
logger.error(f'{e}')
raise e
except Exception as e:
# Return 500 (Internal Server Error) if there's any unexpected errors.
logger.error(f'Internal Server Error: {e}')
abort(500)


@app.route('/get_configuration', methods=['GET'])
def getConfiguration():

Expand Down

0 comments on commit 7bb26f5

Please sign in to comment.