Skip to content

Commit

Permalink
Merge pull request #322 from tilezen/log-batch-process
Browse files Browse the repository at this point in the history
Add logging to batch processing
  • Loading branch information
rmarianski authored Dec 29, 2017
2 parents 8a62ed7 + 6b20b3d commit b1d59a1
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 24 deletions.
73 changes: 49 additions & 24 deletions tilequeue/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,11 +399,12 @@ def make_seed_tile_generator(cfg):
return tile_generator


def _make_store(cfg):
def _make_store(cfg, logger=None):
store_cfg = cfg.yml.get('store')
assert store_cfg, "Store was not configured, but is necessary."
credentials = cfg.subtree('aws credentials')
logger = make_logger(cfg, 'process')
if logger is None:
logger = make_logger(cfg, 'process')
store = make_store(store_cfg, credentials=credentials, logger=logger)
return store

Expand Down Expand Up @@ -1931,15 +1932,13 @@ def tilequeue_batch_enqueue(cfg, peripherals):


def tilequeue_batch_process(cfg, args):
from tilequeue.log import BatchProcessLogger
from tilequeue.metatile import make_metatiles

logger = make_logger(cfg, 'batch_process')
batch_logger = BatchProcessLogger(logger)

# TODO log json

store = _make_store(cfg)

logger.info('batch process ... start')
store = _make_store(cfg, logger)

coord_str = args.tile

Expand All @@ -1956,8 +1955,6 @@ def tilequeue_batch_process(cfg, args):

assert queue_coord.zoom == queue_zoom, 'Unexpected zoom: %s' % coord_str

logger.info('batch process: %s' % coord_str)

# TODO generalize and move to tile.py?
def find_job_coords_for(coord, target_zoom):
xmin = coord.column
Expand Down Expand Up @@ -1997,38 +1994,66 @@ def find_job_coords_for(coord, target_zoom):
assert zoom_stop > group_by_zoom
formats = lookup_formats(cfg.output_formats)

batch_logger.begin_run(queue_coord)

job_coords = find_job_coords_for(queue_coord, group_by_zoom)
for job_coord in job_coords:

batch_logger.begin_pyramid(job_coord)

# each coord here is the unit of work now
pyramid_coords = [job_coord]
pyramid_coords.extend(coord_children_range(job_coord, zoom_stop))
coord_data = [dict(coord=x) for x in pyramid_coords]
for fetch, coord_datum in data_fetcher.fetch_tiles(coord_data):

try:
fetched_coord_data = data_fetcher.fetch_tiles(coord_data)
except Exception as e:
batch_logger.pyramid_fetch_failed(e, job_coord)
continue

for fetch, coord_datum in fetched_coord_data:
coord = coord_datum['coord']
nominal_zoom = coord.zoom + cfg.metatile_zoom
unpadded_bounds = coord_to_mercator_bounds(coord)
source_rows = fetch(nominal_zoom, unpadded_bounds)
feature_layers = convert_source_data_to_feature_layers(
source_rows, layer_data, unpadded_bounds, coord.zoom)

try:
source_rows = fetch(nominal_zoom, unpadded_bounds)
feature_layers = convert_source_data_to_feature_layers(
source_rows, layer_data, unpadded_bounds, coord.zoom)
except Exception as e:
batch_logger.tile_fetch_failed(e, coord)
continue

cut_coords = [coord]
if nominal_zoom > coord.zoom:
cut_coords.extend(coord_children_range(coord, nominal_zoom))

formatted_tiles, extra_data = process_coord(
coord, nominal_zoom, feature_layers, post_process_data,
formats, unpadded_bounds, cut_coords, cfg.buffer_cfg,
output_calc_mapping
)
try:
formatted_tiles, extra_data = process_coord(
coord, nominal_zoom, feature_layers, post_process_data,
formats, unpadded_bounds, cut_coords, cfg.buffer_cfg,
output_calc_mapping
)
except Exception as e:
batch_logger.tile_process_failed(e, coord)
continue

try:
tiles = make_metatiles(cfg.metatile_size, formatted_tiles)
for tile in tiles:
store.write_tile(
tile['tile'], tile['coord'], tile['format'],
tile['layer'])
except Exception as e:
batch_logger.metatile_storage_failed(e, coord)
continue

tiles = make_metatiles(cfg.metatile_size, formatted_tiles)
for tile in tiles:
store.write_tile(tile['tile'], tile['coord'], tile['format'],
tile['layer'])
batch_logger.tile_processed(coord)

# TODO log?
batch_logger.end_pyramid(job_coord)

logger.info('batch process ... done')
batch_logger.end_run(queue_coord)


def tilequeue_main(argv_args=None):
Expand Down
55 changes: 55 additions & 0 deletions tilequeue/log.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from enum import Enum
from tilequeue.utils import format_stacktrace_one_line
import json
import logging
import sys
Expand Down Expand Up @@ -263,3 +264,57 @@ def unknown_queue_handle_id(self, coord_id, queue_handle_id):

def unknown_coord_id(self, coord_id, queue_handle_id):
self._log('Unknown coord_id', coord_id, queue_handle_id)


class BatchProcessLogger(object):

def __init__(self, logger):
self.logger = logger

def _log(self, msg, coord):
json_obj = dict(
coord=make_coord_dict(coord),
type=log_level_name(LogLevel.INFO),
msg=msg,
)
json_str = json.dumps(json_obj)
self.logger.info(json_str)

def begin_run(self, coord):
self._log('batch process run begin', coord)

def end_run(self, coord):
self._log('batch process run end', coord)

def begin_pyramid(self, coord):
self._log('pyramid begin', coord)

def end_pyramid(self, coord):
self._log('pyramid end', coord)

def tile_processed(self, coord):
self._log('tile processed', coord)

def _log_exception(self, msg, exception, coord):
stacktrace = format_stacktrace_one_line()
json_obj = dict(
coord=make_coord_dict(coord),
type=log_level_name(LogLevel.ERROR),
msg=msg,
exception=str(exception),
stacktrace=stacktrace,
)
json_str = json.dumps(json_obj)
self.logger.error(json_str)

def pyramid_fetch_failed(self, exception, coord):
self._log_exception('pyramid fetch failed', exception, coord)

def tile_fetch_failed(self, exception, coord):
self._log_exception('tile fetch failed', exception, coord)

def tile_process_failed(self, exception, coord):
self._log_exception('tile process failed', exception, coord)

def metatile_storage_failed(self, exception, coord):
self._log_exception('metatile storage failed', exception, coord)

0 comments on commit b1d59a1

Please sign in to comment.