Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Turnlanes #7

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
200 changes: 115 additions & 85 deletions roadmaptools/clean_geojson.py
Original file line number Diff line number Diff line change
@@ -1,79 +1,113 @@
import geojson
import codecs
import copy
import argparse
import sys
from roadmaptools import utils

set_of_useful_properties = {'highway', 'id', 'lanes', 'maxspeed', 'oneway', 'bridge', 'width', 'tunnel', 'traffic_calming', 'lanes:forward', 'lanes:backward'}
dict_of_useful_properties = {'highway': str, 'id': int, 'lanes': int, 'maxspeed': int, 'oneway': str, 'bridge': str, 'width': float, 'tunnel': str, 'traffic_calming': str, 'lanes:forward': int, 'lanes:backward': int}
# Dict of used properties from geojson + id_opposite (tag for twoway)
set_of_useful_properties = {'highway', 'id', 'id_opposite', 'lanes', 'maxspeed', 'oneway', 'bridge', 'width', 'tunnel',
'traffic_calming', 'turn:lanes', 'junction'}

dict_of_useful_properties = {'highway': str, 'id': int, 'id_opposite': int, 'lanes': int, 'maxspeed': int,
'oneway': str, 'bridge': str,
'width': float, 'tunnel': str, 'traffic_calming': str, 'turn:lanes': str, 'junction': str}

DEFAULT_NUMBER_OF_LANES = 1
DEFAULT_TURN = 'all'


#
# MAIN
#
def clean_geojson(input_stream, output_stream):
json_dict = load_geojson(input_stream)
json_deleted = get_geojson_with_deleted_features(json_dict)
# save_geojson(output_stream, json_deleted)
prune_geojson_file(json_dict)
save_geojson(json_dict, output_stream)
"""Prune geojson and save to stream"""
json_dict = utils.load_geojson(input_stream)
__prune_geojson_file(json_dict)
utils.save_geojson(json_dict, output_stream)


def get_cleaned_geojson(json_dict):
prune_geojson_file(json_dict)
"""Prune geojson and return"""
__prune_geojson_file(json_dict)
json_dict['features'] = [i for i in json_dict["features"] if i] # remove empty dicts
return json_dict


def remove_properties(item):
temp_dict_with_props = copy.deepcopy(item['properties'])
for prop in temp_dict_with_props:
if prop not in set_of_useful_properties:
del item['properties'][prop]
return item
#
# PRIVATE
#
def __prune_geojson_file(json_dict):
"""main prune function"""
id_iterator = 0
length = len(json_dict['features'])

for i in range(0, length):
item = json_dict['features'][i]
if item['geometry']['type'] == 'LineString':
item = __prune_properties(item)
__check_types(item)
for c in range(0, len(item['geometry']['coordinates']) - 1):
temp = copy.deepcopy(item)
u = item['geometry']['coordinates'][c]
v = item['geometry']['coordinates'][c + 1]
new_item = __get_single_pair_of_coords(u, v, temp, id_iterator, True)
json_dict['features'].append(new_item)
if 'oneway' in item['properties']:
if item['properties']['oneway'] != 'yes': # == twoway
id_iterator += 1

def load_geojson(in_stream):
json_dict = geojson.load(in_stream)
return json_dict
# mark twoway
json_dict['features'][(length + id_iterator - 1)]['properties']['id_opposite'] = id_iterator
previous_id = json_dict['features'][(length + id_iterator - 1)]['properties']['id']

temp = copy.deepcopy(item)
# mark two way
temp['properties']['id_opposite'] = previous_id

def get_geojson_with_deleted_features(json_dict):
json_deleted = dict()
json_deleted['type'] = json_dict['type']
json_deleted['features'] = list()
# create new two way
new_item = __get_single_pair_of_coords(v, u, temp, id_iterator, False)
json_dict['features'].append(new_item)
else:
if item['properties']['highway'] == 'motorway' \
or item['properties']['highway'] == 'motorway_link' \
or item['properties']['highway'] == 'trunk_link' \
or item['properties']['highway'] == 'primary_link' \
or ('junction' in item['properties'] and item['properties']['junction'] == 'roundabout'):
item['properties']['id'] = int(id_iterator)
# item['properties']['id_opposite'] = int(-1)
# item['properties']['oneway'] = 'yes'
id_iterator += 1
continue

for item in json_dict['features']:
if item['geometry']['type'] != 'LineString':
json_deleted['features'].append(item)
id_iterator += 1

# with codecs.open("data/deleted_items.geojson", 'w') as output:
# geojson.dump(json_deleted, output)
# output.close()
return json_deleted
# mark twoway
json_dict['features'][(length + id_iterator - 1)]['properties']['id_opposite'] = id_iterator
previous_id = json_dict['features'][(length + id_iterator - 1)]['properties']['id']

temp = copy.deepcopy(item)
# mark two way
temp['properties']['id_opposite'] = previous_id

def get_single_pair_of_coords(coord_u, coord_v, new_item, id, is_forward):
new_item['properties']['id'] = id
del new_item['geometry']['coordinates']
new_item['geometry']['coordinates'] = [coord_u, coord_v]
if ('oneway' in new_item['properties'] and new_item['properties']['oneway'] != 'yes') or ('oneway' not in new_item['properties']):
if 'lanes:forward' in new_item['properties'] and is_forward:
new_item['properties']['lanes'] = int(new_item['properties']['lanes:forward'])
elif 'lanes:backward' in new_item['properties'] and not is_forward:
new_item['properties']['lanes'] = int(new_item['properties']['lanes:backward'])
elif is_forward and 'lanes' in new_item['properties']:
new_item['properties']['lanes'] = int(new_item['properties']['lanes']) - 1
elif not is_forward and 'lanes' in new_item['properties']:
new_item['properties']['lanes'] = 1
new_item = __get_single_pair_of_coords(v, u, temp, id_iterator, False)
json_dict['features'].append(new_item)

if 'lanes' not in new_item['properties'] or new_item['properties']['lanes'] < 1:
new_item['properties']['lanes'] = 1
else:
new_item['properties']['lanes'] = int(new_item['properties']['lanes'])
id_iterator += 1

new_item['properties']['oneway'] = 'yes'
return new_item
item.clear()


def check_types(item):
def __prune_properties(item):
"""Remove useless properties, based on set_of_useful_properties"""
temp_dict_with_props = copy.deepcopy(item['properties'])
for prop in temp_dict_with_props:
if prop not in set_of_useful_properties:
del item['properties'][prop]
return item


#
def __check_types(item):
for prop in dict_of_useful_properties:
if prop in item['properties'] and not isinstance(item['properties'][prop], dict_of_useful_properties[prop]):
if dict_of_useful_properties[prop] == int:
Expand Down Expand Up @@ -115,48 +149,44 @@ def check_types(item):
del item['properties'][prop]


def prune_geojson_file(json_dict):
id_iterator = 0
length = len(json_dict['features'])
#
def __get_single_pair_of_coords(coord_u, coord_v, new_item, id, is_forward):
new_item['properties']['id'] = id # linear

for i in range(0, length):
item = json_dict['features'][i]
if item['geometry']['type'] == 'LineString':
item = remove_properties(item)
check_types(item)
for i in range(0, len(item['geometry']['coordinates']) - 1):
temp = copy.deepcopy(item)
u = item['geometry']['coordinates'][i]
v = item['geometry']['coordinates'][i + 1]
new_item = get_single_pair_of_coords(u, v, temp, id_iterator, True)
json_dict['features'].append(new_item)
if 'oneway' in item['properties']:
if item['properties']['oneway'] != 'yes':
id_iterator += 1
temp = copy.deepcopy(item)
new_item = get_single_pair_of_coords(v, u, temp, id_iterator, False)
json_dict['features'].append(new_item)
else:
if item['properties']['highway'] == 'motorway' or item['properties']['highway'] == 'motorway_link' or item['properties']['highway'] == 'trunk_link' \
or item['properties']['highway'] == 'primary_link' or ('junction' in item['properties'] and item['properties']['junction'] == 'roundabout'):
id_iterator += 1
continue
id_iterator += 1
temp = copy.deepcopy(item)
new_item = get_single_pair_of_coords(v, u, temp, id_iterator, False)
json_dict['features'].append(new_item)
# remove and create coordinates in correct order
del new_item['geometry']['coordinates']
new_item['geometry']['coordinates'] = [coord_u, coord_v]

id_iterator += 1
# check number of lanes with oneway
if ('oneway' in new_item['properties'] and new_item['properties']['oneway'] != 'yes') \
or ('oneway' not in new_item['properties']):
if 'lanes:forward' in new_item['properties'] and is_forward:
new_item['properties']['lanes'] = int(new_item['properties']['lanes:forward'])
elif 'lanes:backward' in new_item['properties'] and not is_forward:
new_item['properties']['lanes'] = int(new_item['properties']['lanes:backward'])
elif is_forward and 'lanes' in new_item['properties']:
new_item['properties']['lanes'] = int(new_item['properties']['lanes']) - 1
elif not is_forward and 'lanes' in new_item['properties']:
new_item['properties']['lanes'] = DEFAULT_NUMBER_OF_LANES
# default lanes
if 'lanes' not in new_item['properties'] or new_item['properties']['lanes'] < 1:
new_item['properties']['lanes'] = DEFAULT_NUMBER_OF_LANES

item.clear()
# check lanes heading with oneway
if ('oneway' in new_item['properties'] and new_item['properties']['oneway'] != 'yes') \
or ('oneway' not in new_item['properties']):
if 'turn:lanes:forward' in new_item['properties'] and is_forward:
new_item['properties']['turn:lanes'] = str(new_item['properties']['turn:lanes:forward'])
elif 'turn:lanes:backward' in new_item['properties'] and not is_forward:
new_item['properties']['turn:lanes'] = str(new_item['properties']['turn:lanes:backward'])

# mark oneway
new_item['properties']['oneway'] = 'yes'

def save_geojson(json_dict, out_stream):
json_dict['features'] = [i for i in json_dict["features"] if i] # remove empty dicts
geojson.dump(json_dict, out_stream)
return new_item


def get_args():
def __get_args():
parser = argparse.ArgumentParser()
parser.add_argument('-i', dest="input", type=str, action='store', help='input file')
parser.add_argument('-o', dest="output", type=str, action='store', help='output file')
Expand All @@ -165,7 +195,7 @@ def get_args():

# EXAMPLE OF USAGE
if __name__ == '__main__':
args = get_args()
args = __get_args()
input_stream = sys.stdin
output_stream = sys.stdout

Expand Down
86 changes: 86 additions & 0 deletions roadmaptools/coords.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""Helpful tools for work with coordinates and angles between them."""
from math import cos, asin, sqrt, sin, degrees, atan2, isnan, pi
from numpy import radians, array

__author__ = "Zdenek Bousa"
__email__ = "[email protected]"


def get_lat_lon(coords):
"""Return coordinates in order latitude, longitude"""
return coords[1], coords[0]


def get_lon_lat(coords):
"""Return coordinates in order longitude,latitude"""
return coords[0], coords[1]


def get_distance_between_coords(point1, point2):
"""Point[lat,lon] (not in radians), returns distance in meters. Based on
https://en.wikipedia.org/wiki/Haversine_formula """
p = 0.017453292519943295 # pi/180
q = 0.5 - cos((point2[0] - point1[0]) * p) / 2 + cos(point1[0] * p) * cos(point2[0] * p) * (
1 - cos((point2[1] - point1[1]) * p)) / 2
return 2 * 6378137.0 * asin(sqrt(q)) # 2 * earth radius in meters *....


def get_coordinates_in_radians(coordinates_in_degree):
"""Returns coordinates in radians from decimal degrees"""
return radians(array(coordinates_in_degree))


def get_coordinates_in_degree(coordinates_in_radian):
"""Return coordinates in decimal degrees from radians"""
return degrees(array(coordinates_in_radian))


def get_coordinates_in_3d(coordinates_in_radians):
"""Returns point in 3D space. Parameter coordinates[lat,lon]"""
lat_radians = coordinates_in_radians[0]
lon_radians = coordinates_in_radians[1]
return array((cos(lat_radians) * cos(lon_radians), cos(lat_radians) * sin(lon_radians), sin(lat_radians)))


def angle_between_vectors_radians(v_a, v_b):
"""Return angle in radians, interval (-pi,pi>"""
r = atan2(v_a[1], v_a[0]) - atan2(v_b[1], v_b[0])
# check nan
if isnan(r):
r = 0
# check interval
if r <= (-pi):
r = r + (2 * pi)
if r > pi:
r = r - (2 * pi)
return r


def angle_between_points(p1_rad, p2_rad, p3_rad, p4_rad):
""" Vector A = [p1_rad,p2_rad] and vector B = [p3_rad,p4_rad].
Angle is measured between P1 - vertext(P2,P3) - P4. Interval +-<0-180>.
Including spherical correction.

Return: angle in radians

Example:

P4
\
\ (alpha)
P2=P3 \____________
/ P1
/ (beta)
/
P4'

Angle alpha = -(beta), in this example approx. alpha=110 degrees and beta=-110 degrees.
"""
v_a = [p2_rad[0] - p1_rad[0], p2_rad[1] - p1_rad[1]]
v_b = [p3_rad[0] - p4_rad[0], p3_rad[1] - p4_rad[1]]

# correction to 2D at given lat
v_a[1] = v_a[1] * cos(p2_rad[0])
v_b[1] = v_b[1] * cos(p3_rad[0])

return angle_between_vectors_radians(v_a, v_b)
Loading