diff --git a/scripts/calculate_indicators.py b/scripts/calculate_indicators.py new file mode 100644 index 0000000..1ba7952 --- /dev/null +++ b/scripts/calculate_indicators.py @@ -0,0 +1,203 @@ +from datetime import datetime +from multiprocessing import Pool +from multiprocessing.pool import ThreadPool + +import geopandas as gpd +import pandas as pd +import requests +from cartoframes import read_carto, to_carto +from cartoframes.auth import set_default_credentials + +from city_metrix.metrics import * + +# os.getcwd() +# os.chdir('..') +# print(os.getcwd()) + + +# os.environ["GCS_BUCKET"] = "gee-exports" +# os.environ["GOOGLE_APPLICATION_USER"] = ( +# "developers@citiesindicators.iam.gservaiceaccount.com" +# ) +# os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = ( +# "/home/arsh/wri/code/cities-cif/citiesindicators-fe8fd6514c70.json" +# ) + + +# base_url = "http://127.0.0.1:8000" +base_url = "https://fotomei.com" +city_id_list = list(set(["ARG-Buenos_Aires"])) +indicator_overwrite_tuple_list = [ + ("ACC_2_percentOpenSpaceinBuiltup2022", False), +] # (indicator, should replace) +indicator_list = list(set([i[0] for i in indicator_overwrite_tuple_list])) + + +set_default_credentials( + username="wri-cities", + base_url="https://wri-cities.carto.com/", + api_key="XtoLSRoo6QSyhBOhbbTjOA", +) +from cartoframes.io.managers.context_manager import ContextManager + +a = ContextManager(None) +sql_client = a.sql_client + + +def get_from_carto(city_id_list, indicator_list): + c = ", ".join([f"'{y}'" for y in city_id_list]) + i = ", ".join([f"'{z}'" for z in indicator_list]) + cities = f"({c})" + indicators = f"({i})" + return read_carto( + f"SELECT * from indicators_dev where geo_parent_name in {cities} AND indicator in {indicators}" + ) + + +indicators = get_from_carto(city_id_list, indicator_list) + + +def get_geojson_df(geojson: str): + gdf = gpd.GeoDataFrame.from_features(geojson) + gdf.set_crs(epsg=4326, inplace=True) + return gdf + + +def get_city_boundary(city_id: str, admin_level: str): + city_boundary = requests.get(f"{base_url}/cities/{city_id}/{admin_level}/geojson") + if city_boundary.status_code in range(200, 206): + return city_boundary.json() + raise Exception("City boundary not found") + + +def get_city(city_id: str): + city = requests.get(f"{base_url}/cities/{city_id}") + if city.status_code in range(200, 206): + return city.json() + raise Exception("City not found") + + +def check_indicator_exists(city_id: str, indicator_id: str): + indicator = requests.get(f"{base_url}/indicators/{indicator_id}") + if indicator.status_code in range(200, 206): + return True + raise Exception("Indicator not found") + + +def get_indicator_metric(indicator_id: str): + indicator = requests.get(f"{base_url}/indicators/metadata/{indicator_id}") + if indicator.status_code in range(200, 206): + data = indicator.json() + return data.get("cif_metric_name") + raise Exception("Indicator not found") + + +def write_to_carto(result): + return to_carto(pd.DataFrame(result), "indicators_dev", if_exists="append") + + +def update_carto_data(cartodb_id, value): + date = datetime.today().strftime("%Y-%m-%d") + return sql_client.send( + f"UPDATE indicators_dev SET value={value}, creation_date='{date}' where cartodb_id={cartodb_id}" + ) + + +errors = [] + + +def process(city_id: str, indicator_id: str): + city = get_city(city_id) + indicator_exists = check_indicator_exists(city_id, indicator_id) + data_list = [] + if indicator_exists: + metric = get_indicator_metric(indicator_id) + admin_level = city.get("admin_levels", None) + for a in admin_level: + try: + city_boundary = get_city_boundary(city_id, a) + df = get_geojson_df(city_boundary) + output_df = eval(f"{metric}(df)") + new_df = df.join(output_df) + for _, row in new_df.iterrows(): + data_dict = { + "geo_id": row["geo_id"], + "geo_level": row["geo_level"], + "geo_name": row["geo_name"], + "geo_parent_name": row["geo_parent_name"], + "value": row["count"], + "indicator": indicator_id, + "indicator_version": 0, + "creation_date": datetime.today().strftime("%Y-%m-%d"), + } + data_list.append(data_dict) + except Exception as e: + errors.append(f"{city_id}|{indicator_id}|{a} : {e}") + return data_list + + +def process_indicator(city_id, indicator): + create_list = [] + update_list = [] + + indicator_name = indicator[0] + overwrite = indicator[1] + carto_indicator_list = indicators[ + (indicators["indicator"] == indicator_name) + & (indicators["geo_parent_name"] == city_id) + ] + data = process(city_id, indicator_name) + for d in data: + _indicator = carto_indicator_list[ + (carto_indicator_list["geo_id"] == d["geo_id"]) + ] + if _indicator.empty: + create_list.append(d) + else: + if overwrite: + cartodb_id = _indicator["cartodb_id"].max() + update_list.append( + { + "cartodb_id": cartodb_id, + "value": d["value"], + } + ) + pass + else: + max_version = _indicator["indicator_version"].max() + d["indicator_version"] = max_version + 1 + create_list.append(d) + return create_list, update_list + + +def calculate_indicator_for_city(city_id): + create_list, update_list = [], [] + pool = ThreadPool(5) + results = [] + for j in indicator_overwrite_tuple_list: + results.append(pool.apply_async(process_indicator, (city_id, j))) + + pool.close() + pool.join() + for r in results: + output = r.get() + create_list.extend(output[0]) + update_list.extend(output[1]) + return create_list, update_list + + +if __name__ == "__main__": + create_list = [] + update_list = [] + + with Pool(5) as p: + output = p.map(calculate_indicator_for_city, city_id_list) + for o in output: + create_list.extend(o[0]) + update_list.extend(o[1]) + print("Create List: ", create_list) + print("Update List: ", update_list) + if create_list: + write_to_carto(create_list) + for u in update_list: + update_carto_data(u["cartodb_id"], u["value"]) diff --git a/scripts/calculate_layers.py b/scripts/calculate_layers.py new file mode 100644 index 0000000..fa1cdad --- /dev/null +++ b/scripts/calculate_layers.py @@ -0,0 +1,172 @@ +import os +from datetime import datetime +from multiprocessing import Pool +from multiprocessing.pool import ThreadPool +from pathlib import Path +from typing import Literal + +import boto3 +import botocore +import geopandas as gpd +import requests +from dotenv import load_dotenv + +from city_metrix.layers import * +from city_metrix.layers.open_street_map import OpenStreetMapClass + +load_dotenv() + + +# Parameters + +osm_layer = OpenStreetMapClass.OPEN_SPACE # Required for open_space layer + + +city_id_list = list(set(["ARG-Buenos_Aires"])) + +layer_overwrite_list = {"esa_world_cover_2020": [2020, 2021], "open_space": []} + +############ for widnows user change / to \ +# local_storage_path = f"{os.getcwd()}\cities-cif\scripts\output" + +local_storage_path = "scripts/output" +should_push_to_s3 = True + +# Code + +# base_url = "http://127.0.0.1:8000" +base_url = "https://fotomei.com" +se_layer = ["albedo", "land_surface_temperature"] +year_layer = ["esa_world_cover", "esa_world_cover_2020", "world_pop", "ndvi"] + +layer_list = list(set([i[0] for i in layer_overwrite_list])) + +aws_bucket = os.getenv("AWS_BUCKET") + +s3_client = boto3.client( + "s3", + aws_session_token=os.getenv("AWS_SESSION_TOKEN"), + aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), + aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), +) + + +def get_geojson_df_bounds(geojson: str): + gdf = gpd.GeoDataFrame.from_features(geojson) + gdf.set_crs(epsg=4326, inplace=True) + return gdf.total_bounds + + +# Will implement if requied. +def check_if_layer_exist_on_s3(file_path: str): + try: + s3.Object(aws_bucket, file_path).load() + except botocore.exceptions.ClientError as e: + if e.response["Error"]["Code"] == "404": + return False + else: + raise + else: + return True + + +def get_city_boundary(city_id: str, admin_level: str): + city_boundary = requests.get(f"{base_url}/cities/{city_id}/{admin_level}/geojson") + if city_boundary.status_code in range(200, 206): + return city_boundary.json() + raise Exception("City boundary not found") + + +def get_city(city_id: str): + city = requests.get(f"{base_url}/cities/{city_id}") + if city.status_code in range(200, 206): + return city.json() + raise Exception("City not found") + + +def get_layer(city_id: str, layer_id: str) -> Literal[True]: + layer = requests.get(f"{base_url}/layers/{layer_id}/{city_id}") + if layer.status_code in range(200, 206): + return layer.json() + raise Exception("Indicator not found") + + +def export_data(data: object, city_id: str, layer_id: str, file_format: str, year: str): + file_name = f"{city_id}__{layer_id}__{year}.{file_format}" + local_path = os.path.join(local_storage_path, file_name) + ( + data.rio.to_raster(raster_path=local_path, driver="COG") + if file_format == "tif" + else data.to_file(local_path, driver="GeoJSON") + ) + if should_push_to_s3: + s3_path = f"cid/dev/{city_id}/{file_format}/{file_name}" + s3_client.upload_file( + local_path, aws_bucket, s3_path, ExtraArgs={"ACL": "public-read"} + ) + + +errors = [] + + +def process_layer(city_id, layer_id, year): + print(f"Starting processing for {layer_id} | {city_id} ..... ") + city = get_city(city_id) + layer = get_layer(city_id, layer_id) + script = layer["class_name"] + file_type = layer["file_type"] + admin_level = city.get("city_admin_level", None) + if layer and script and admin_level and file_type: + file = Path(script).stem + try: + city_boundary = get_city_boundary(city_id, admin_level) + bbox = get_geojson_df_bounds(city_boundary) + params = "" + if layer_id in year_layer: + params = f"year={year}" + elif layer_id in se_layer: + params = f"start_date='{year}-01-01', end_date = '{year}-12-31'" + elif layer_id == "open_space": + params = f"osm_class={osm_layer}" + output = eval(f"{file}({params}).get_data(bbox)") + export_data( + data=output, + city_id=city_id, + layer_id=layer_id, + file_format=file_type, + year=year, + ) + except Exception as e: + errors.append(f"{city_id}|{layer_id}|{e}") + print(f"Processing completed for {layer_id} | {city_id} ..... ") + + +def calculate_layer_for_city(city_id): + print(f"************ Processing layers for {city_id} **************") + pool = ThreadPool(5) + results = [] + if not local_storage_path: + raise Exception("Please specify the local path.") + if not os.path.exists(local_storage_path): + os.makedirs(local_storage_path) + for l, year_list in layer_overwrite_list.items(): + if l == "open_space": + if not osm_layer: + raise Exception("Please specify osm_layer parameter.") + curr_year = datetime.now().year + results.append(pool.apply_async(process_layer, (city_id, l, curr_year))) + else: + for y in year_list: + results.append(pool.apply_async(process_layer, (city_id, l, y))) + + pool.close() + pool.join() + for r in results: + output = r.get() + + +if __name__ == "__main__": + + with Pool(5) as p: + output = p.map(calculate_layer_for_city, city_id_list) + print(errors) diff --git a/scripts/env.txt b/scripts/env.txt new file mode 100644 index 0000000..a4523cd --- /dev/null +++ b/scripts/env.txt @@ -0,0 +1,4 @@ +AWS_BUCKET = "wri-cities-data-api" +AWS_ACCESS_KEY_ID = "" +AWS_SECRET_ACCESS_KEY = "" +AWS_SESSION_TOKEN = "" diff --git a/setup.py b/setup.py index a46fcde..b545c0c 100644 --- a/setup.py +++ b/setup.py @@ -1,4 +1,4 @@ -from setuptools import setup, find_packages +from setuptools import find_packages, setup setup( name="city_metrix", @@ -26,6 +26,6 @@ "cdsapi", "timezonefinder", "scikit-image>=0.24.0", - "exactextract>=0.2.0" + "exactextract>=0.2.0", ], )