Skip to content

Commit

Permalink
Add logging to batch processing
Browse files Browse the repository at this point in the history
Additionally, tiles that error out will be skipped and the run will
proceed rather than terminating on the first error.
  • Loading branch information
rmarianski committed Dec 28, 2017
1 parent 8a62ed7 commit 6b20b3d
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 24 deletions.
73 changes: 49 additions & 24 deletions tilequeue/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,11 +399,12 @@ def make_seed_tile_generator(cfg):
return tile_generator


def _make_store(cfg):
def _make_store(cfg, logger=None):
store_cfg = cfg.yml.get('store')
assert store_cfg, "Store was not configured, but is necessary."
credentials = cfg.subtree('aws credentials')
logger = make_logger(cfg, 'process')
if logger is None:
logger = make_logger(cfg, 'process')
store = make_store(store_cfg, credentials=credentials, logger=logger)
return store

Expand Down Expand Up @@ -1931,15 +1932,13 @@ def tilequeue_batch_enqueue(cfg, peripherals):


def tilequeue_batch_process(cfg, args):
from tilequeue.log import BatchProcessLogger
from tilequeue.metatile import make_metatiles

logger = make_logger(cfg, 'batch_process')
batch_logger = BatchProcessLogger(logger)

# TODO log json

store = _make_store(cfg)

logger.info('batch process ... start')
store = _make_store(cfg, logger)

coord_str = args.tile

Expand All @@ -1956,8 +1955,6 @@ def tilequeue_batch_process(cfg, args):

assert queue_coord.zoom == queue_zoom, 'Unexpected zoom: %s' % coord_str

logger.info('batch process: %s' % coord_str)

# TODO generalize and move to tile.py?
def find_job_coords_for(coord, target_zoom):
xmin = coord.column
Expand Down Expand Up @@ -1997,38 +1994,66 @@ def find_job_coords_for(coord, target_zoom):
assert zoom_stop > group_by_zoom
formats = lookup_formats(cfg.output_formats)

batch_logger.begin_run(queue_coord)

job_coords = find_job_coords_for(queue_coord, group_by_zoom)
for job_coord in job_coords:

batch_logger.begin_pyramid(job_coord)

# each coord here is the unit of work now
pyramid_coords = [job_coord]
pyramid_coords.extend(coord_children_range(job_coord, zoom_stop))
coord_data = [dict(coord=x) for x in pyramid_coords]
for fetch, coord_datum in data_fetcher.fetch_tiles(coord_data):

try:
fetched_coord_data = data_fetcher.fetch_tiles(coord_data)
except Exception as e:
batch_logger.pyramid_fetch_failed(e, job_coord)
continue

for fetch, coord_datum in fetched_coord_data:
coord = coord_datum['coord']
nominal_zoom = coord.zoom + cfg.metatile_zoom
unpadded_bounds = coord_to_mercator_bounds(coord)
source_rows = fetch(nominal_zoom, unpadded_bounds)
feature_layers = convert_source_data_to_feature_layers(
source_rows, layer_data, unpadded_bounds, coord.zoom)

try:
source_rows = fetch(nominal_zoom, unpadded_bounds)
feature_layers = convert_source_data_to_feature_layers(
source_rows, layer_data, unpadded_bounds, coord.zoom)
except Exception as e:
batch_logger.tile_fetch_failed(e, coord)
continue

cut_coords = [coord]
if nominal_zoom > coord.zoom:
cut_coords.extend(coord_children_range(coord, nominal_zoom))

formatted_tiles, extra_data = process_coord(
coord, nominal_zoom, feature_layers, post_process_data,
formats, unpadded_bounds, cut_coords, cfg.buffer_cfg,
output_calc_mapping
)
try:
formatted_tiles, extra_data = process_coord(
coord, nominal_zoom, feature_layers, post_process_data,
formats, unpadded_bounds, cut_coords, cfg.buffer_cfg,
output_calc_mapping
)
except Exception as e:
batch_logger.tile_process_failed(e, coord)
continue

try:
tiles = make_metatiles(cfg.metatile_size, formatted_tiles)
for tile in tiles:
store.write_tile(
tile['tile'], tile['coord'], tile['format'],
tile['layer'])
except Exception as e:
batch_logger.metatile_storage_failed(e, coord)
continue

tiles = make_metatiles(cfg.metatile_size, formatted_tiles)
for tile in tiles:
store.write_tile(tile['tile'], tile['coord'], tile['format'],
tile['layer'])
batch_logger.tile_processed(coord)

# TODO log?
batch_logger.end_pyramid(job_coord)

logger.info('batch process ... done')
batch_logger.end_run(queue_coord)


def tilequeue_main(argv_args=None):
Expand Down
55 changes: 55 additions & 0 deletions tilequeue/log.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from enum import Enum
from tilequeue.utils import format_stacktrace_one_line
import json
import logging
import sys
Expand Down Expand Up @@ -263,3 +264,57 @@ def unknown_queue_handle_id(self, coord_id, queue_handle_id):

def unknown_coord_id(self, coord_id, queue_handle_id):
self._log('Unknown coord_id', coord_id, queue_handle_id)


class BatchProcessLogger(object):

def __init__(self, logger):
self.logger = logger

def _log(self, msg, coord):
json_obj = dict(
coord=make_coord_dict(coord),
type=log_level_name(LogLevel.INFO),
msg=msg,
)
json_str = json.dumps(json_obj)
self.logger.info(json_str)

def begin_run(self, coord):
self._log('batch process run begin', coord)

def end_run(self, coord):
self._log('batch process run end', coord)

def begin_pyramid(self, coord):
self._log('pyramid begin', coord)

def end_pyramid(self, coord):
self._log('pyramid end', coord)

def tile_processed(self, coord):
self._log('tile processed', coord)

def _log_exception(self, msg, exception, coord):
stacktrace = format_stacktrace_one_line()
json_obj = dict(
coord=make_coord_dict(coord),
type=log_level_name(LogLevel.ERROR),
msg=msg,
exception=str(exception),
stacktrace=stacktrace,
)
json_str = json.dumps(json_obj)
self.logger.error(json_str)

def pyramid_fetch_failed(self, exception, coord):
self._log_exception('pyramid fetch failed', exception, coord)

def tile_fetch_failed(self, exception, coord):
self._log_exception('tile fetch failed', exception, coord)

def tile_process_failed(self, exception, coord):
self._log_exception('tile process failed', exception, coord)

def metatile_storage_failed(self, exception, coord):
self._log_exception('metatile storage failed', exception, coord)

0 comments on commit 6b20b3d

Please sign in to comment.