diff --git a/roadmaptools/clean_geojson.py b/roadmaptools/clean_geojson.py index 442af5d..48cd0e5 100644 --- a/roadmaptools/clean_geojson.py +++ b/roadmaptools/clean_geojson.py @@ -1,79 +1,113 @@ -import geojson import codecs import copy import argparse import sys +from roadmaptools import utils -set_of_useful_properties = {'highway', 'id', 'lanes', 'maxspeed', 'oneway', 'bridge', 'width', 'tunnel', 'traffic_calming', 'lanes:forward', 'lanes:backward'} -dict_of_useful_properties = {'highway': str, 'id': int, 'lanes': int, 'maxspeed': int, 'oneway': str, 'bridge': str, 'width': float, 'tunnel': str, 'traffic_calming': str, 'lanes:forward': int, 'lanes:backward': int} +# Dict of used properties from geojson + id_opposite (tag for twoway) +set_of_useful_properties = {'highway', 'id', 'id_opposite', 'lanes', 'maxspeed', 'oneway', 'bridge', 'width', 'tunnel', + 'traffic_calming', 'turn:lanes', 'junction'} +dict_of_useful_properties = {'highway': str, 'id': int, 'id_opposite': int, 'lanes': int, 'maxspeed': int, + 'oneway': str, 'bridge': str, + 'width': float, 'tunnel': str, 'traffic_calming': str, 'turn:lanes': str, 'junction': str} +DEFAULT_NUMBER_OF_LANES = 1 +DEFAULT_TURN = 'all' + + +# +# MAIN +# def clean_geojson(input_stream, output_stream): - json_dict = load_geojson(input_stream) - json_deleted = get_geojson_with_deleted_features(json_dict) - # save_geojson(output_stream, json_deleted) - prune_geojson_file(json_dict) - save_geojson(json_dict, output_stream) + """Prune geojson and save to stream""" + json_dict = utils.load_geojson(input_stream) + __prune_geojson_file(json_dict) + utils.save_geojson(json_dict, output_stream) def get_cleaned_geojson(json_dict): - prune_geojson_file(json_dict) + """Prune geojson and return""" + __prune_geojson_file(json_dict) json_dict['features'] = [i for i in json_dict["features"] if i] # remove empty dicts return json_dict -def remove_properties(item): - temp_dict_with_props = copy.deepcopy(item['properties']) - for prop in temp_dict_with_props: - if prop not in set_of_useful_properties: - del item['properties'][prop] - return item +# +# PRIVATE +# +def __prune_geojson_file(json_dict): + """main prune function""" + id_iterator = 0 + length = len(json_dict['features']) + for i in range(0, length): + item = json_dict['features'][i] + if item['geometry']['type'] == 'LineString': + item = __prune_properties(item) + __check_types(item) + for c in range(0, len(item['geometry']['coordinates']) - 1): + temp = copy.deepcopy(item) + u = item['geometry']['coordinates'][c] + v = item['geometry']['coordinates'][c + 1] + new_item = __get_single_pair_of_coords(u, v, temp, id_iterator, True) + json_dict['features'].append(new_item) + if 'oneway' in item['properties']: + if item['properties']['oneway'] != 'yes': # == twoway + id_iterator += 1 -def load_geojson(in_stream): - json_dict = geojson.load(in_stream) - return json_dict + # mark twoway + json_dict['features'][(length + id_iterator - 1)]['properties']['id_opposite'] = id_iterator + previous_id = json_dict['features'][(length + id_iterator - 1)]['properties']['id'] + temp = copy.deepcopy(item) + # mark two way + temp['properties']['id_opposite'] = previous_id -def get_geojson_with_deleted_features(json_dict): - json_deleted = dict() - json_deleted['type'] = json_dict['type'] - json_deleted['features'] = list() + # create new two way + new_item = __get_single_pair_of_coords(v, u, temp, id_iterator, False) + json_dict['features'].append(new_item) + else: + if item['properties']['highway'] == 'motorway' \ + or item['properties']['highway'] == 'motorway_link' \ + or item['properties']['highway'] == 'trunk_link' \ + or item['properties']['highway'] == 'primary_link' \ + or ('junction' in item['properties'] and item['properties']['junction'] == 'roundabout'): + item['properties']['id'] = int(id_iterator) + # item['properties']['id_opposite'] = int(-1) + # item['properties']['oneway'] = 'yes' + id_iterator += 1 + continue - for item in json_dict['features']: - if item['geometry']['type'] != 'LineString': - json_deleted['features'].append(item) + id_iterator += 1 - # with codecs.open("data/deleted_items.geojson", 'w') as output: - # geojson.dump(json_deleted, output) - # output.close() - return json_deleted + # mark twoway + json_dict['features'][(length + id_iterator - 1)]['properties']['id_opposite'] = id_iterator + previous_id = json_dict['features'][(length + id_iterator - 1)]['properties']['id'] + temp = copy.deepcopy(item) + # mark two way + temp['properties']['id_opposite'] = previous_id -def get_single_pair_of_coords(coord_u, coord_v, new_item, id, is_forward): - new_item['properties']['id'] = id - del new_item['geometry']['coordinates'] - new_item['geometry']['coordinates'] = [coord_u, coord_v] - if ('oneway' in new_item['properties'] and new_item['properties']['oneway'] != 'yes') or ('oneway' not in new_item['properties']): - if 'lanes:forward' in new_item['properties'] and is_forward: - new_item['properties']['lanes'] = int(new_item['properties']['lanes:forward']) - elif 'lanes:backward' in new_item['properties'] and not is_forward: - new_item['properties']['lanes'] = int(new_item['properties']['lanes:backward']) - elif is_forward and 'lanes' in new_item['properties']: - new_item['properties']['lanes'] = int(new_item['properties']['lanes']) - 1 - elif not is_forward and 'lanes' in new_item['properties']: - new_item['properties']['lanes'] = 1 + new_item = __get_single_pair_of_coords(v, u, temp, id_iterator, False) + json_dict['features'].append(new_item) - if 'lanes' not in new_item['properties'] or new_item['properties']['lanes'] < 1: - new_item['properties']['lanes'] = 1 - else: - new_item['properties']['lanes'] = int(new_item['properties']['lanes']) + id_iterator += 1 - new_item['properties']['oneway'] = 'yes' - return new_item + item.clear() -def check_types(item): +def __prune_properties(item): + """Remove useless properties, based on set_of_useful_properties""" + temp_dict_with_props = copy.deepcopy(item['properties']) + for prop in temp_dict_with_props: + if prop not in set_of_useful_properties: + del item['properties'][prop] + return item + + +# +def __check_types(item): for prop in dict_of_useful_properties: if prop in item['properties'] and not isinstance(item['properties'][prop], dict_of_useful_properties[prop]): if dict_of_useful_properties[prop] == int: @@ -115,48 +149,44 @@ def check_types(item): del item['properties'][prop] -def prune_geojson_file(json_dict): - id_iterator = 0 - length = len(json_dict['features']) +# +def __get_single_pair_of_coords(coord_u, coord_v, new_item, id, is_forward): + new_item['properties']['id'] = id # linear - for i in range(0, length): - item = json_dict['features'][i] - if item['geometry']['type'] == 'LineString': - item = remove_properties(item) - check_types(item) - for i in range(0, len(item['geometry']['coordinates']) - 1): - temp = copy.deepcopy(item) - u = item['geometry']['coordinates'][i] - v = item['geometry']['coordinates'][i + 1] - new_item = get_single_pair_of_coords(u, v, temp, id_iterator, True) - json_dict['features'].append(new_item) - if 'oneway' in item['properties']: - if item['properties']['oneway'] != 'yes': - id_iterator += 1 - temp = copy.deepcopy(item) - new_item = get_single_pair_of_coords(v, u, temp, id_iterator, False) - json_dict['features'].append(new_item) - else: - if item['properties']['highway'] == 'motorway' or item['properties']['highway'] == 'motorway_link' or item['properties']['highway'] == 'trunk_link' \ - or item['properties']['highway'] == 'primary_link' or ('junction' in item['properties'] and item['properties']['junction'] == 'roundabout'): - id_iterator += 1 - continue - id_iterator += 1 - temp = copy.deepcopy(item) - new_item = get_single_pair_of_coords(v, u, temp, id_iterator, False) - json_dict['features'].append(new_item) + # remove and create coordinates in correct order + del new_item['geometry']['coordinates'] + new_item['geometry']['coordinates'] = [coord_u, coord_v] - id_iterator += 1 + # check number of lanes with oneway + if ('oneway' in new_item['properties'] and new_item['properties']['oneway'] != 'yes') \ + or ('oneway' not in new_item['properties']): + if 'lanes:forward' in new_item['properties'] and is_forward: + new_item['properties']['lanes'] = int(new_item['properties']['lanes:forward']) + elif 'lanes:backward' in new_item['properties'] and not is_forward: + new_item['properties']['lanes'] = int(new_item['properties']['lanes:backward']) + elif is_forward and 'lanes' in new_item['properties']: + new_item['properties']['lanes'] = int(new_item['properties']['lanes']) - 1 + elif not is_forward and 'lanes' in new_item['properties']: + new_item['properties']['lanes'] = DEFAULT_NUMBER_OF_LANES + # default lanes + if 'lanes' not in new_item['properties'] or new_item['properties']['lanes'] < 1: + new_item['properties']['lanes'] = DEFAULT_NUMBER_OF_LANES - item.clear() + # check lanes heading with oneway + if ('oneway' in new_item['properties'] and new_item['properties']['oneway'] != 'yes') \ + or ('oneway' not in new_item['properties']): + if 'turn:lanes:forward' in new_item['properties'] and is_forward: + new_item['properties']['turn:lanes'] = str(new_item['properties']['turn:lanes:forward']) + elif 'turn:lanes:backward' in new_item['properties'] and not is_forward: + new_item['properties']['turn:lanes'] = str(new_item['properties']['turn:lanes:backward']) + # mark oneway + new_item['properties']['oneway'] = 'yes' -def save_geojson(json_dict, out_stream): - json_dict['features'] = [i for i in json_dict["features"] if i] # remove empty dicts - geojson.dump(json_dict, out_stream) + return new_item -def get_args(): +def __get_args(): parser = argparse.ArgumentParser() parser.add_argument('-i', dest="input", type=str, action='store', help='input file') parser.add_argument('-o', dest="output", type=str, action='store', help='output file') @@ -165,7 +195,7 @@ def get_args(): # EXAMPLE OF USAGE if __name__ == '__main__': - args = get_args() + args = __get_args() input_stream = sys.stdin output_stream = sys.stdout diff --git a/roadmaptools/coords.py b/roadmaptools/coords.py new file mode 100644 index 0000000..4a3436a --- /dev/null +++ b/roadmaptools/coords.py @@ -0,0 +1,86 @@ +"""Helpful tools for work with coordinates and angles between them.""" +from math import cos, asin, sqrt, sin, degrees, atan2, isnan, pi +from numpy import radians, array + +__author__ = "Zdenek Bousa" +__email__ = "bousazde@fel.cvut.cz" + + +def get_lat_lon(coords): + """Return coordinates in order latitude, longitude""" + return coords[1], coords[0] + + +def get_lon_lat(coords): + """Return coordinates in order longitude,latitude""" + return coords[0], coords[1] + + +def get_distance_between_coords(point1, point2): + """Point[lat,lon] (not in radians), returns distance in meters. Based on + https://en.wikipedia.org/wiki/Haversine_formula """ + p = 0.017453292519943295 # pi/180 + q = 0.5 - cos((point2[0] - point1[0]) * p) / 2 + cos(point1[0] * p) * cos(point2[0] * p) * ( + 1 - cos((point2[1] - point1[1]) * p)) / 2 + return 2 * 6378137.0 * asin(sqrt(q)) # 2 * earth radius in meters *.... + + +def get_coordinates_in_radians(coordinates_in_degree): + """Returns coordinates in radians from decimal degrees""" + return radians(array(coordinates_in_degree)) + + +def get_coordinates_in_degree(coordinates_in_radian): + """Return coordinates in decimal degrees from radians""" + return degrees(array(coordinates_in_radian)) + + +def get_coordinates_in_3d(coordinates_in_radians): + """Returns point in 3D space. Parameter coordinates[lat,lon]""" + lat_radians = coordinates_in_radians[0] + lon_radians = coordinates_in_radians[1] + return array((cos(lat_radians) * cos(lon_radians), cos(lat_radians) * sin(lon_radians), sin(lat_radians))) + + +def angle_between_vectors_radians(v_a, v_b): + """Return angle in radians, interval (-pi,pi>""" + r = atan2(v_a[1], v_a[0]) - atan2(v_b[1], v_b[0]) + # check nan + if isnan(r): + r = 0 + # check interval + if r <= (-pi): + r = r + (2 * pi) + if r > pi: + r = r - (2 * pi) + return r + + +def angle_between_points(p1_rad, p2_rad, p3_rad, p4_rad): + """ Vector A = [p1_rad,p2_rad] and vector B = [p3_rad,p4_rad]. + Angle is measured between P1 - vertext(P2,P3) - P4. Interval +-<0-180>. + Including spherical correction. + + Return: angle in radians + + Example: + + P4 + \ + \ (alpha) + P2=P3 \____________ + / P1 + / (beta) + / + P4' + + Angle alpha = -(beta), in this example approx. alpha=110 degrees and beta=-110 degrees. + """ + v_a = [p2_rad[0] - p1_rad[0], p2_rad[1] - p1_rad[1]] + v_b = [p3_rad[0] - p4_rad[0], p3_rad[1] - p4_rad[1]] + + # correction to 2D at given lat + v_a[1] = v_a[1] * cos(p2_rad[0]) + v_b[1] = v_b[1] * cos(p3_rad[0]) + + return angle_between_vectors_radians(v_a, v_b) diff --git a/roadmaptools/create_lanes_connections.py b/roadmaptools/create_lanes_connections.py new file mode 100644 index 0000000..24dca9a --- /dev/null +++ b/roadmaptools/create_lanes_connections.py @@ -0,0 +1,489 @@ +"""Finds connecting edges (ids) for specified turn lanes. + +Prune data: + 1. By road_elements (try to connect only them) + 2. By degree of node, prune only with degree > 2 + - on remaining elements the algorithm will try to calcu + +Error check: + 1. Checks for correct number of lanes and eventually updates them. + +Data output: + list_of_lanes = (Lane1(dict_direction1:following_edge_id,dict_direction2:following_edge_id), + Lane2(dict_direction1:following_edge_id)) + + For unknown situation list_of_lanes might be None or following_edge_id = -1 or dict_directionX.key = 'none'. + This is with respect to osm wiki. + +""" +import argparse +import codecs + +import networkx as nx +import sys + +from roadmaptools import coords, utils, map_elements +from math import degrees +from copy import deepcopy + +__author__ = "Zdenek Bousa" +__email__ = "bousazde@fel.cvut.cz" + +road_elements = map_elements.get_road_elements_agentpolis() # Road elements available for lanes direction + +road_directions = {'sharp_left', 'left', 'slight_left', 'through', 'slight_right', 'right', 'sharp_right', + 'none'} # junction +road_directions_extra = {'reverse', 'merge_to_left', 'merge_to_right'} # ignore these tags + +# Variables +tag_turn = 'turn:lanes' +tag_id = 'id' +tag_junction = 'junction' +tag_highway = 'highway' +tag_roundabout = 'roundabout' +extend_logging = False + +# Statistics +statistics_number_of_road_elements = 0 +statistics_number_of_turn_lanes_tags = 0 +statistics_number_of_inconsistent_edges = 0 + +statistics_number_of_road_edges = 0 +statistics_number_of_processed_lanes = 0 +statistics_number_of_processed_edges = 0 +statistics_number_of_erroneously_processed_edges = 0 +statistics_number_of_erroneously_processed_lanes = 0 + + +# +# PUBLIC +# +def get_geojson_with_turn_lanes(json_dict): + """Return json dict without logging on error output""" + return process(json_dict) + + +def process(json_dict, logging=False): + """Main function, returns json dict. + It is better to run this after simplification""" + global extend_logging + extend_logging = logging + graph = __load_graph(json_dict) + data = __traverse_graph_and_connect(graph) + return ToJson.graph_to_json(data, json_dict) + + +def print_statistics(): + print ("Number of road elements: " + str(statistics_number_of_road_elements)) + print ("Number of available edges with turn:lanes tag: " + str(statistics_number_of_turn_lanes_tags)) + print ("Number of inconsistent edges(number of lanes differs): " + str(statistics_number_of_inconsistent_edges)) + print ("=========================================================================") + print ("Number of road elements ending at junction: " + str(statistics_number_of_road_edges)) + print ("Number of road elements processed and connected at intersection: " + str( + statistics_number_of_processed_edges)) + print ("Number of road elements erroneously processed: " + str(statistics_number_of_erroneously_processed_edges)) + print ("Number of connected lanes: " + str(statistics_number_of_processed_lanes)) + print ("Number of not connected lanes (due to internal error): " + str( + statistics_number_of_erroneously_processed_lanes)) + + +def get_number_of_inconsistent_edges(): + return statistics_number_of_inconsistent_edges + + +# +# Private +# +def __load_graph(json_dict): + """Load graph from json dict with id,lanes,turn:lanes + all data + + Prune data: + 1. By road_elements (try to connect only them) + 2. By degree of node, prune only with degree > 2 + """ + global statistics_number_of_road_elements + g = nx.MultiDiGraph() + for item in json_dict['features']: + coord = item['geometry']['coordinates'] + coord_u = coords.get_lat_lon(coord[0]) + coord_v = coords.get_lat_lon(coord[-1]) + if coord_u != coord_v or len(coord) != 2: # prune loops without any purpose, save loops like traffic roundabout + lanes = item['properties']['lanes'] + lanes_turn = ItemProperties.get_turn_lanes(item) + junction = ItemProperties.get_junction(item) + highway = ItemProperties.get_highway(item) + if highway in road_elements: # prune non-road elements + statistics_number_of_road_elements += 1 + g.add_edge(coord_u, coord_v, id=item['properties'][tag_id], lanes=lanes, lanes_turn=lanes_turn, + junction=junction, highway=highway) + + return g + + +def __traverse_graph_and_connect(graph): + """Traverse graph and add agentpolis:turn:id to road elements (ID of following edge, that is available at + intersection). + + To unknown direction add all possible directions + + Prune data: + 1. By road_elements (try to connect only them) + 2. By degree of node, prune only with degree > 2 + + Return list of modified edges + """ + global statistics_number_of_erroneously_processed_edges + # create dictionary with junction node (from node) as a key + coords,junction,turn_lanes + dict_out_edges = dict() + for e in graph.out_edges(data=True): + key = __get_hash(e[0]) + if key not in dict_out_edges: + dict_out_edges[key] = list() + dict_out_edges[key].append( + (e[0], e[1], e[2]['id'], e[2]['junction'], e[2]['highway'])) # coord A, coord B, id, junction, highway + + # create dictionary with junction node (to node) as a key + coords,junction,turn_lanes + dict_in_edges = dict() + for e in graph.in_edges(data=True): + key = __get_hash(e[1]) + if key not in dict_in_edges: + dict_in_edges[key] = list() + dict_in_edges[key].append( + (e[0], e[1], e[2]['lanes_turn'], e[2]['lanes'], + e[2]['junction'], e[2]['id'], e[2]['highway'])) # coord A, coord B, lanes, #lanes, junction,id, highway + + modified_edges = list() + # Get junctions + for node in graph.nodes(): + degree = graph.degree(node) + # prune nodes by degree + if degree > 2: # it has to be junction + # calculate angles at intersection and fill modified edges + try: + __calculate_junction(dict_in_edges[__get_hash(node)], dict_out_edges[__get_hash(node)], modified_edges) + except KeyError as e: + statistics_number_of_erroneously_processed_edges += 1 + if extend_logging: + utils.eprint("Error: Incorrect node or is not part of road element", str(e), "junction skipped.") + else: + pass + return modified_edges + + +def __calculate_junction(list_in_edges, list_out_edges, modified_edges): + """""" + global statistics_number_of_inconsistent_edges, statistics_number_of_turn_lanes_tags, \ + statistics_number_of_road_edges, statistics_number_of_processed_edges + statistics_number_of_road_edges += len(list_in_edges) + for in_e in list_in_edges: + if in_e[2] is None: # no turn lanes available for this in_edge + continue # continue to next incoming edge to the node + else: # turn lanes available + statistics_number_of_turn_lanes_tags += 1 + + in_e = deepcopy(in_e) + list_out_edges = deepcopy(list_out_edges) + # Incoming edge + e_coords1 = in_e[0] + e_coords2 = in_e[1] + e_turn_lanes = in_e[2] + e_number_of_lanes = in_e[3] + e_junction = in_e[4] + e_id = in_e[5] + e_highway = in_e[6] + + # parse turn lanes + list_of_directions, turns_data_parsed = __parse_turn_lanes(e_turn_lanes) + + # check data consistency + if len(turns_data_parsed) != e_number_of_lanes: + # update number of lanes + e_number_of_lanes = len(turns_data_parsed) + statistics_number_of_inconsistent_edges += 1 + if extend_logging: + utils.eprint("Inconsistent data in edge: " + str(e_id)) + through_id = -1 + roundabout = False + # If it is roundabout, then try to connect through direction to the rest of the roundabout + if e_junction == tag_roundabout: + # get out edge and following roundabout + for out_e in list_out_edges: + if out_e[3] == tag_roundabout: + through_id = out_e[2] + list_out_edges.remove(out_e) + try: + list_of_directions.remove('through') + except ValueError: + list_of_directions.remove('slight_left') + roundabout = True + break + + # calculate rest of directions + dict_data = __calculate_directions(in_e, list_out_edges, roundabout) + if through_id != -1: + dict_data['through'] = through_id # append data about roundabout + + dict_turns_data_with_id = __rebuild_according_to_lanes(turns_data_parsed, dict_data) + + # Create modified edge + modified_edge = ( + e_coords1, e_coords2, e_turn_lanes, e_number_of_lanes, e_junction, e_id, e_highway, + dict_turns_data_with_id) + + # Append to changelist + modified_edges.append(modified_edge) + + statistics_number_of_processed_edges = len(modified_edges) + return modified_edges + + +def __rebuild_according_to_lanes(dict_turns_data_parsed, dict_directions): + """Fill data from dict_directions to dict_turns_data_parsed""" + global statistics_number_of_processed_lanes, statistics_number_of_erroneously_processed_lanes + for lane in dict_turns_data_parsed: + statistics_number_of_processed_lanes += 1 + for direction in lane.keys(): + if direction is not None and not "none": + lane[direction] = -1 + # try default from dict + lane[direction] = __try_direction(direction, dict_directions) + if lane[direction] != -1: + continue + + # switch left/right + switch = None + if "right" in str(direction): + switch = "right" + elif "left" in str(direction): + switch = "left" + + if switch is not None: + # Normal turn + if direction == str(switch): + lane[direction] = __try_direction("slight_" + str(switch), dict_directions) + if lane[direction] != -1: + continue + if direction == str(switch): + lane[direction] = __try_direction("sharp_" + str(switch), dict_directions) + if lane[direction] != -1: + continue + # Slight + if direction == ("slight_" + str(switch)): + lane[direction] = __try_direction(str(switch), dict_directions) + if lane[direction] != -1: + continue + # Sharp + if direction == ("sharp_" + str(switch)): + lane[direction] = __try_direction(str(switch), dict_directions) + if lane[direction] != -1: + continue + + # Exception + statistics_number_of_erroneously_processed_lanes += 1 + if extend_logging: + utils.eprint("Error: No match for requested direction with computed") + else: + lane[direction] = -1 + + return dict_turns_data_parsed + + +def __try_direction(direction, dict_directions): + try: + return dict_directions[direction] + except KeyError: + return -1 + + +def __calculate_directions(in_edge, out_edges, roundabout): + """Assign for each direction an ID + Return dict(direction:id) + """ + dict_angle = dict() + dict_directions_id = dict() + # Find angles of all outgoing edges + for e in out_edges: + a = __get_angle_between_edges(in_edge, e) + id = e[2] + dict_angle[a] = id + + # sort angles from -180 to 180 + list_angles = dict_angle.keys() + list_angles.sort() + + # convert angles to directions + for a in list_angles: + # Straight and reverse + if ((-160 >= a >= -180) or (180 >= a >= 160)) and not roundabout: # through + dict_directions_id['through'] = dict_angle[a] + elif -10 <= a <= 10: # turn back + dict_directions_id['reverse'] = dict_angle[a] + + # Roundabout + elif roundabout and (-180 <= a <= -20): # roundabout exit right + dict_directions_id['right'] = dict_angle[a] + elif roundabout and (180 >= a >= 20): # roundabout exit left + dict_directions_id['left'] = dict_angle[a] + + # Right + elif -160 < a < -140: # slight right + dict_directions_id['slight_right'] = dict_angle[a] + elif -40 < a < -10: # sharp right + dict_directions_id['sharp_right'] = dict_angle[a] + elif -140 <= a <= -40: # right + dict_directions_id['right'] = dict_angle[a] + # Left + elif 160 > a > 140: # slight left + dict_directions_id['slight_left'] = dict_angle[a] + elif 40 > a > 10: # sharp left + dict_directions_id['sharp_left'] = dict_angle[a] + elif 140 >= a >= 40: # left + dict_directions_id['left'] = dict_angle[a] + else: + utils.eprint("Error, unknown orientation for angle(degrees): " + a) + + return dict_directions_id + + +def __parse_turn_lanes(data): + """ Return list of dictionaries. List of lanes that will be available""" + list_of_directions = list() + list_of_lanes_directions = list() + lanes = data.split("|") # split by lanes + for l in lanes: # for each lane separated by |, e.g. right;right|sharp_right + dir_dict = dict() + directions = l.split(";") + for direction in directions: # for each direction, e.g. right;right + if direction == "": + dir_dict['none'] = -1 + elif direction in road_directions: + dir_dict[direction] = -1 # correct ids will be assigned later + # add to list of directions + if direction not in list_of_directions: + list_of_directions.append(direction) + else: + pass # ignore this data + if len(dir_dict) > 0: + list_of_lanes_directions.append(dir_dict) + return list_of_directions, list_of_lanes_directions + + +def __get_hash(coordinates): + """Return string from coordinates""" + return str(str(coordinates[0]) + "-" + str(coordinates[1])) + + +def __get_angle_between_edges(in_edge, out_edge): + """Get angle between incoming_edge and outgoing_edge. + Note: in_edge[1] = out_edge[0] + """ + p_a = coords.get_coordinates_in_radians(coords.get_lat_lon(in_edge[0])) + p_b = coords.get_coordinates_in_radians(coords.get_lat_lon(in_edge[1])) + p_c = coords.get_coordinates_in_radians(coords.get_lat_lon(out_edge[1])) + + angle = degrees(coords.angle_between_points(p_a, p_b, p_b, p_c)) + if not (-180 <= angle <= 180): + ValueError("Out of interval") + return angle + + +class ItemProperties: + """ Getters for LineString/Point properties.""" + + def __init__(self): + pass + + @staticmethod + def get_turn_lanes(item): + """Parse turn:lanes from item properties. + Return: string/none + """ + turn_lanes = None + try: + turn_lanes = item['properties'][tag_turn] + except: + if extend_logging: + utils.eprint("No turn lanes available for object " + str(item['properties'][tag_id])) + return turn_lanes + + @staticmethod + def get_junction(item): + """Parse junction from item properties. + Return: string/none""" + junction = None + try: + junction = item['properties'][tag_junction] + except: + if extend_logging: + utils.eprint("No junction available for object " + str(item['properties'][tag_id])) + return junction + + @staticmethod + def get_highway(item): + """Parse highway from item properties. + Return: string/none""" + type = None + try: + type = item['properties'][tag_highway] + except: + if extend_logging: + utils.eprint("No junction available for object " + str(item['properties'][tag_id])) + return type + + +class ToJson: + # Prepare json dict from graph + def __init__(self): + pass + + @staticmethod + def graph_to_json(modified_edges, json_dict): + """Prepare json dict from graph""" + dict_of_edges = dict() + for e in modified_edges: + dict_of_edges[e[5]] = modified_edges.index(e) # Hash structure for searching id and match with rest of data + + for item in json_dict['features']: + i = item['properties']['id'] + if i in dict_of_edges.keys(): + edge = modified_edges[dict_of_edges[i]] + properties = item['properties'] + + # e_coords1, e_coords2, e_turn_lanes, e_number_of_lanes, e_junction, e_id, e_highway, dict_turns_data_with_id) + properties['lanes'] = edge[3] + properties['turn:lanes:id'] = edge[7] + + json_dict['features'] = [i for i in json_dict["features"] if i] # remove empty dicts + + return json_dict + + +def __get_args(): + parser = argparse.ArgumentParser() + parser.add_argument('--version', action='version', version='%(prog)s 0.1.2') + parser.add_argument('-i', dest="input", type=str, action='store', help='input file (.geojson)') + parser.add_argument('-o', dest="output", type=str, action='store', help='output file (.geojson)') + parser.add_argument('-log', action='store_true', default=False, dest='log', help='Turn log on stderr.') + return parser.parse_args() + + +# EXAMPLE OF USAGE +if __name__ == '__main__': + args = __get_args() + output_stream = sys.stdout + input_stream = sys.stdin + + if args.output is not None: + output_stream = codecs.open(args.output, 'w') + if args.input is not None: + input_stream = codecs.open(args.input, 'r') + + geojson_file = utils.load_geojson(input_stream) + if utils.is_geojson_valid(geojson_file): + geojson_file = process(geojson_file, args.log) + utils.save_geojson(geojson_file, output_stream) + else: + utils.eprint("Invalid geojson file") + + input_stream.close() + output_stream.close() diff --git a/roadmaptools/examples/__init__.py b/roadmaptools/examples/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/roadmaptools/examples/example_geojson_for_agentpolis.py b/roadmaptools/examples/example_geojson_for_agentpolis.py new file mode 100644 index 0000000..d3f2b5f --- /dev/null +++ b/roadmaptools/examples/example_geojson_for_agentpolis.py @@ -0,0 +1,69 @@ +"""Creator of geojson from scratch, with any valid geojson. Includes all important data for AgentPolis""" +import argparse +import codecs + +import sys + +import time + +from roadmaptools import utils, clean_geojson, prepare_geojson_to_agentpolisdemo, simplify_graph, calculate_curvature, \ + create_lanes_connections, estimate_speed_from_osm, remove_specific_line_elements, map_elements + +__author__ = "Zdenek Bousa" +__email__ = "bousazde@fel.cvut.cz" + + +def __get_args(): + parser = argparse.ArgumentParser() + parser.add_argument('-i', dest="input", type=str, action='store', help='input file (.geojson)') + parser.add_argument('-edges', dest="out_edges", type=str, action='store', help='output file - edges (.geojson)') + parser.add_argument('-nodes', dest="out_nodes", type=str, action='store', help='output file - nodes (.geojson)') + return parser.parse_args() + + +# EXAMPLE OF USAGE +if __name__ == '__main__': + args = __get_args() + # input + input_stream = sys.stdin + if args.input is not None: + input_stream = codecs.open(args.input, encoding='utf8') + # output + o_edges = codecs.open(args.out_edges, 'w') + o_nodes = codecs.open(args.out_nodes, 'w') + + start_time = time.time() + # Load data + input_geojson = utils.load_geojson(input_stream) + + # Check + if utils.is_geojson_valid(input_geojson): + # Prune + geojson_data = clean_geojson.get_cleaned_geojson(input_geojson) + # Remove LineString that is not road element in Agentpolis + geojson_data = remove_specific_line_elements.get_geojson_only_with_elements(geojson_data, + map_elements.get_road_elements_agentpolis()) + # Simplify - (json_dict, simplify edges with same number of lanes?,not simplify edges with different curvature?) + geojson_data = simplify_graph.get_simplified_geojson(geojson_data, True, False) + # Estimate speed and length (required properties in graph-importer) + geojson_data = estimate_speed_from_osm.get_geojson_with_speeds(geojson_data) + # Calculate curvature + geojson_data = calculate_curvature.get_geojson_with_curvature(geojson_data) + # Create lanes connection at each intersection + connect_lanes = create_lanes_connections + geojson_data = connect_lanes.get_geojson_with_turn_lanes(geojson_data) + connect_lanes.print_statistics() + # Prepare road network/graph for agentpolis + geojson_data = prepare_geojson_to_agentpolisdemo.get_nodes_and_edges_for_agentpolisdemo(geojson_data) + + # save to file + utils.save_geojson(geojson_data[0], o_edges) + utils.save_geojson(geojson_data[1], o_nodes) + else: + utils.eprint("Invalid geojson file.") + + print("--- %s seconds ---" % (time.time() - start_time)) + + input_stream.close() + o_nodes.close() + o_edges.close() diff --git a/roadmaptools/examples/example_geojson_from_osm.py b/roadmaptools/examples/example_geojson_from_osm.py new file mode 100644 index 0000000..415240a --- /dev/null +++ b/roadmaptools/examples/example_geojson_from_osm.py @@ -0,0 +1,28 @@ +"""Example on how to create geojson from osm.""" +import argparse +import codecs + +import sys +from roadmaptools import utils, osmtogeojson + + +def __get_args(): + parser = argparse.ArgumentParser() + parser.add_argument('--version', action='version', version='%(prog)s 0.1.2') + parser.add_argument('-i', dest="input", type=str, action='store', help='input file (.osm)') + parser.add_argument('-o', dest="output", type=str, action='store', help='output file (.geojson)') + return parser.parse_args() + + +# EXAMPLE OF USAGE +if __name__ == '__main__': + args = __get_args() + output_stream = sys.stdout + + if args.output is not None: + output_stream = codecs.open(args.output, 'w') + + geojson_file = osmtogeojson.convert_osmtogeojson(args.input) + utils.save_geojson(geojson_file, output_stream) + + output_stream.close() diff --git a/roadmaptools/map_elements.py b/roadmaptools/map_elements.py new file mode 100644 index 0000000..ef84f81 --- /dev/null +++ b/roadmaptools/map_elements.py @@ -0,0 +1,31 @@ +"""Groups of values for OSM key = highway""" + +__author__ = "Zdenek Bousa" +__email__ = "bousazde@fel.cvut.cz" + + +def get_road_elements_agentpolis(): + return {'primary', 'secondary', 'motorway', 'trunk', 'tertiary', 'road', + 'motorway_link', 'trunk_link', 'primary_link', 'secondary_link', + 'tertiary_link', 'residential'} + + +def get_road_elements_main(): + return {'primary', 'secondary', 'motorway', 'trunk', 'road', + 'motorway_link', 'trunk_link', 'primary_link', 'secondary_link', + 'tertiary_link'} + + +def get_road_elements_all(): + return {'primary', 'secondary', 'motorway', 'trunk', 'road', 'tertiary', + 'motorway_link', 'trunk_link', 'primary_link', 'secondary_link', + 'tertiary_link', 'unclassified', 'residential', 'service', 'living_street', 'road' + } + + +def get_pedestrian_elements(): + return {'living_street', 'pedestrian', 'footway', 'bridleway', 'steps', 'path'} + + +def get_other_non_road_elements(): + return {'cycleway', 'proposed', 'construction', 'rest_area', 'services'} \ No newline at end of file diff --git a/roadmaptools/remove_specific_line_elements.py b/roadmaptools/remove_specific_line_elements.py new file mode 100644 index 0000000..d247f76 --- /dev/null +++ b/roadmaptools/remove_specific_line_elements.py @@ -0,0 +1,59 @@ +"""Remove useless LineStrings""" +import argparse +import codecs + +import sys +from roadmaptools import utils, map_elements + +__author__ = "Zdenek Bousa" +__email__ = "bousazde@fel.cvut.cz" + + +def get_geojson_without_elements(json_dict, to_remove_dict): + return _prune(json_dict, to_remove_dict, False) + + +def get_geojson_only_with_elements(json_dict, to_keep_dict): + return _prune(json_dict, to_keep_dict, True) + + +def _prune(json_dict, elements_dict, boolean_keep): + for item in json_dict['features']: + if item['geometry']['type'] == 'LineString': + try: + highway = item['properties']['highway'] + except: + highway = None + if boolean_keep and highway is not None and highway in elements_dict: + continue + elif not boolean_keep and highway not in elements_dict: + continue + else: + item.clear() + json_dict['features'] = [i for i in json_dict["features"] if i] + return json_dict + + +def __get_args(): + parser = argparse.ArgumentParser() + parser.add_argument('-i', dest="input", type=str, action='store', help='input file (.geojson)') + parser.add_argument('-o', dest="output", type=str, action='store', help='output file (.geojson)') + return parser.parse_args() + + +# EXAMPLE OF USAGE +if __name__ == '__main__': + args = __get_args() + input_stream = sys.stdin + output_stream = sys.stdout + + if args.input is not None: + input_stream = codecs.open(args.input, encoding='utf8') + if args.output is not None: + output_stream = codecs.open(args.output, 'w') + + geojson_data = utils.load_geojson(input_stream) + geojson_data = get_geojson_only_with_elements(geojson_data, map_elements.get_road_elements_agentpolis()) + utils.save_geojson_formatted(geojson_data, output_stream) + input_stream.close() + output_stream.close() diff --git a/roadmaptools/utils.py b/roadmaptools/utils.py new file mode 100644 index 0000000..47a0a6f --- /dev/null +++ b/roadmaptools/utils.py @@ -0,0 +1,39 @@ +"""Set of useful tools for work with geojson file + error log.""" +from __future__ import print_function +from geojson import dump, load, is_valid +from sys import stderr + +__author__ = "Zdenek Bousa" +__email__ = "bousazde@fel.cvut.cz" + + +def save_geojson(json_dict, out_stream): + """Save in geojson format and check for empty dictionaries.""" + json_dict['features'] = [i for i in json_dict["features"] if i] # remove empty dicts + dump(json_dict, out_stream) + + +def save_geojson_formatted(json_dict, out_stream): + """Save in geojson format, format output file and check for empty dictionaries.""" + json_dict['features'] = [i for i in json_dict["features"] if i] # remove empty dicts + dump(json_dict, out_stream, indent=4, sort_keys=True) + + +def load_geojson(in_stream): + """Load geojson into dictionary. + Return: dictionary""" + return load(in_stream) + + +def is_geojson_valid(geojson_file): + """Check if gejson is valid. + Return: True/False""" + validation = is_valid(geojson_file) + if validation['valid'] == 'yes': + return True + return False + + +def eprint(*args, **kwargs): + """Provides easy log on error output""" + print(*args, file=stderr, **kwargs) diff --git a/setup.py b/setup.py index 94a72bb..23dcb4f 100644 --- a/setup.py +++ b/setup.py @@ -10,5 +10,5 @@ packages=['roadmaptools'], url = 'https://github.com/aicenter/roadmap-processing', download_url = 'https://github.com/aicenter/roadmap-processing/archive/0.2.5.tar.gz', - install_requires=['osmread==0.2','setuptools','networkx==1.11','geojson==1.3.5','pip'], + install_requires=['osmread==0.2','setuptools','networkx==1.11','geojson==1.3.5','pip', 'numpy'], ) diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/test_angle.py b/test/test_angle.py new file mode 100644 index 0000000..2595945 --- /dev/null +++ b/test/test_angle.py @@ -0,0 +1,38 @@ +"Angle unittest" +import unittest +import coords +from math import degrees + +__author__ = "Zdenek Bousa" +__email__ = "bousazde@fel.cvut.cz" + + +class TestAngle(unittest.TestCase): + def setUp(self): + # In real life approx. 90 degrees, in 2D it is around +-106 degrees + self.c1 = coords.get_coordinates_in_radians((50.103158, 14.402033)) # A + self.c2 = coords.get_coordinates_in_radians((50.102739, 14.400353)) # vertex + self.c3 = coords.get_coordinates_in_radians((50.102306, 14.400611)) # B + self.c4 = coords.get_coordinates_in_radians((50.103708, 14.399776)) # B' + + def testRightAngle(self): + """On unit circle (0,pi)""" + angle = coords.angle_between_points(self.c1, self.c2, self.c2, self.c4) + if 89 < degrees(angle) < 91: + r = True + else: + r = False + self.assertTrue(r, "Result is not approx. +90 degrees") + + def testLeftAngle(self): + """On unit circle (pi,2pi)""" + angle = coords.angle_between_points(self.c1, self.c2, self.c2, self.c3) + if -89 > degrees(angle) > -91: + r = True + else: + r = False + self.assertTrue(r, "Result is not approx. -90 degrees") + + +if __name__ == "__main__": + unittest.main()