From c8e6344832aeab2eb22d2ea480c862c9115258ba Mon Sep 17 00:00:00 2001 From: bbakernoaa Date: Fri, 13 Sep 2024 15:00:56 +0000 Subject: [PATCH] add bones to the fire blending in the prep_emissions job --- jobs/JGLOBAL_PREP_EMISSIONS | 9 + parm/config/gefs/config.prep_emissions | 28 ++ parm/config/gefs/config.resources | 2 +- parm/config/gfs/config.aero | 2 +- parm/config/gfs/config.resources | 4 +- parm/prep/aero_emissions.yaml | 24 ++ parm/ufs/gocart/ExtData.hfed | 8 + scripts/exglobal_prep_emissions.py | 36 ++- ush/python/pygfs/task/aero_emissions.py | 324 ++++++++++++++++++++++-- 9 files changed, 406 insertions(+), 31 deletions(-) create mode 100644 parm/prep/aero_emissions.yaml create mode 100644 parm/ufs/gocart/ExtData.hfed diff --git a/jobs/JGLOBAL_PREP_EMISSIONS b/jobs/JGLOBAL_PREP_EMISSIONS index 84edac8e50..37e5ce6fff 100755 --- a/jobs/JGLOBAL_PREP_EMISSIONS +++ b/jobs/JGLOBAL_PREP_EMISSIONS @@ -13,6 +13,9 @@ source "${HOMEgfs}/ush/jjob_header.sh" -e "prep_emissions" -c "base prep_emissio ############################################## # Generate COM variables from templates # TODO: Add necessary COMIN, COMOUT variables for this job +COM_CHEM_HISTORY_TMPL +YMD="${PDY}" HH="${cyc}" declare_from_tmpl -rx \ + COMOUT_CHEM_HISTORY:COM_CHEM_HISTORY_TMPL ############################################################### # Run relevant script @@ -32,4 +35,10 @@ if [[ -e "${pgmout}" ]] ; then cat "${pgmout}" fi +########################################## +# Remove the Temporary working directory +########################################## +cd "${DATAROOT}" || exit 1 +[[ ${KEEPDATA} = "NO" ]] && rm -rf "${DATA}" + exit 0 diff --git a/parm/config/gefs/config.prep_emissions b/parm/config/gefs/config.prep_emissions index fa411c27ad..d48ee08084 100644 --- a/parm/config/gefs/config.prep_emissions +++ b/parm/config/gefs/config.prep_emissions @@ -2,10 +2,38 @@ ########## config.prep_emissions ########## # aerosol emissions preprocessing specific +# TODO: this is duplicated in the config.aero file. Should use a common function +case ${machine} in + "HERA") + AERO_INPUTS_DIR="/scratch1/NCEPDEV/global/glopara/data/gocart_emissions" + ;; + "ORION" | "HERCULES") + AERO_INPUTS_DIR="/work2/noaa/global/wkolczyn/noscrub/global-workflow/gocart_emissions" + ;; + "S4") + AERO_INPUTS_DIR="/data/prod/glopara/gocart_emissions" + ;; + "WCOSS2") + AERO_INPUTS_DIR="/lfs/h2/emc/global/noscrub/emc.global/data/gocart_emissions" + ;; + "GAEA") + AERO_INPUTS_DIR="/gpfs/f5/epic/proj-shared/global/glopara/data/gocart_emissions" + ;; + "JET") + AERO_INPUTS_DIR="/lfs4/HFIP/hfv3gfs/glopara/data/gocart_emissions" + ;; + *) + echo "FATAL ERROR: Machine ${machine} unsupported for aerosols" + exit 2 + ;; +esac +export AERO_INPUTS_DIR echo "BEGIN: config.prep_emissions" # Get task specific resources +export STARTHOUR=${STARTHOUR:-00} +export PREP_EMISSION_CONFIG="${PARMgfs}/prep/aero_emissions.yaml" source "${EXPDIR}/config.resources" prep_emissions echo "END: config.prep_emissions" diff --git a/parm/config/gefs/config.resources b/parm/config/gefs/config.resources index 79f3426f56..6f2020bbe0 100644 --- a/parm/config/gefs/config.resources +++ b/parm/config/gefs/config.resources @@ -78,7 +78,7 @@ case ${step} in export ntasks=1 export threads_per_task=1 export tasks_per_node=$(( max_tasks_per_node / threads_per_task )) - export memory="1GB" + export memory="20GB" ;; "fcst" | "efcs") diff --git a/parm/config/gfs/config.aero b/parm/config/gfs/config.aero index 2fae019574..50683ad2fb 100644 --- a/parm/config/gfs/config.aero +++ b/parm/config/gfs/config.aero @@ -35,7 +35,7 @@ export AERO_INPUTS_DIR export AERO_DIAG_TABLE="${PARMgfs}/ufs/fv3/diag_table.aero" export AERO_FIELD_TABLE="${PARMgfs}/ufs/fv3/field_table.aero" -# Biomass burning emission dataset. Choose from: gbbepx, qfed, none +# Biomass burning emission dataset. Choose from: gbbepx, qfed, hfed, none export AERO_EMIS_FIRE="qfed" # Directory containing GOCART configuration files export AERO_CONFIG_DIR="${PARMgfs}/ufs/gocart" diff --git a/parm/config/gfs/config.resources b/parm/config/gfs/config.resources index b50e1c5fbb..3d7a2789c9 100644 --- a/parm/config/gfs/config.resources +++ b/parm/config/gfs/config.resources @@ -11,7 +11,7 @@ if (( $# != 1 )); then echo "Must specify an input task argument to set resource variables!" echo "argument can be any one of the following:" - echo "stage_ic aerosol_init" + echo "stage_ic aerosol_init prep_emissions" echo "prep prepsnowobs prepatmiodaobs" echo "atmanlinit atmanlvar atmanlfv3inc atmanlfinal" echo "atmensanlinit atmensanlobs atmensanlsol atmensanlletkf atmensanlfv3inc atmensanlfinal" @@ -1193,7 +1193,7 @@ case ${step} in walltime="02:00:00" export ntasks=141 threads_per_task=6 - export tasks_per_node=21 + export tasks_per_node=21 export ntasks_postsndcfp=9 export tasks_per_node_postsndcfp=1 postsnd_req_cores=$(( tasks_per_node * threads_per_task )) diff --git a/parm/prep/aero_emissions.yaml b/parm/prep/aero_emissions.yaml new file mode 100644 index 0000000000..4a2af28904 --- /dev/null +++ b/parm/prep/aero_emissions.yaml @@ -0,0 +1,24 @@ +aero_emissions: + config: + debug: False + ratio: 0.95 # weighting ratio + emistype: 'QFED' # EMission Type, Valid answers: 'QFED, GBBEPx, HFED' + climfile_str: 'GBBEPx-all01GRID_v4r0_climMean' # climate file base string used for glob later + GBBEPx_version: 'v4r0' # gbbepx version + qfed_version: '006' # qfed version + species: [ 'so2','oc','bc' ] # species to be used + historical: False # set to true to just use true data for the given day + data: + mkdir: + - "{{ DATA }}" + - "{{ COMOUT_CHEM_HISTORY}}" + copy: + - ["{{ AERO_INPUTS_DIR }}/nexus/QFED/{{ current_cycle | strftime('%Y/%m') }}/qfed2.emis_oc.006.{{ current_cycle | strftime('%Y%m%d') }}.nc4", "{{ DATA }}/"] + - ["{{ AERO_INPUTS_DIR }}/nexus/QFED/{{ current_cycle | strftime('%Y/%m') }}/qfed2.emis_so2.006.{{ current_cycle | strftime('%Y%m%d') }}.nc4", "{{ DATA }}/"] + - ["{{ AERO_INPUTS_DIR }}/nexus/QFED/{{ current_cycle | strftime('%Y/%m') }}/qfed2.emis_bc.006.{{ current_cycle | strftime('%Y%m%d') }}.nc4", "{{ DATA }}/"] + {% for fdate in forecast_dates %} + - ["{{ AERO_INPUTS_DIR }}/nexus/GBBEPx/v4/climMean/GBBEPx-all01GRID_v4r0_climMean_{{ fdate | strftime('%m%d') }}.nc", "{{ DATA }}/"] # copy climo files + {% endfor %} + data_out: + copy: + - ["{{ DATA }}/gefs_blended_emissions.{{ current_cycle | strftime('%Y%m%d') }}.nc", "{{ COMOUT_CHEM_HISTORY }}/{{ RUN }}.{{ current_cycle | strftime('%Y%m%d') }}.gefs_blended_emissions.nc"] \ No newline at end of file diff --git a/parm/ufs/gocart/ExtData.hfed b/parm/ufs/gocart/ExtData.hfed new file mode 100644 index 0000000000..4be4f1bd01 --- /dev/null +++ b/parm/ufs/gocart/ExtData.hfed @@ -0,0 +1,8 @@ +#====== BIOMASS BURNING EMISSIONS ======================================= + +# HFED +#-------------------------------------------------------------------------------------------------------------------------------- +SU_BIOMASS NA N Y %y4-%m2-%d2t12:00:00 none 0.7778 biomass ExtData/nexus/HFED/Y1994/M%m2/hfed.emis_so2.x576_y361.%y4%m2.nc4 +OC_BIOMASS NA N Y %y4-%m2-%d2t12:00:00 none 0.7778 biomass ExtData/nexus/HFED/Y1994/M%m2/hfed.emis_oc.x576_y361.%y4%m2.nc4 +BC_BIOMASS NA N Y %y4-%m2-%d2t12:00:00 none 0.7778 biomass ExtData/nexus/HFED/Y1994/M%m2/hfed.emis_bc.x576_y361.%y4%m2.nc4 +# EMI_NH3_BB NA N Y %y4-%m2-%d2t12:00:00 none 0.7778 biomass ExtData/nexus/HFED/Y1994/M%m2/hfed.emis_nh3.x576_y361.%y4%m2.nc4 \ No newline at end of file diff --git a/scripts/exglobal_prep_emissions.py b/scripts/exglobal_prep_emissions.py index ef0e709142..c7e80cc579 100755 --- a/scripts/exglobal_prep_emissions.py +++ b/scripts/exglobal_prep_emissions.py @@ -1,10 +1,20 @@ #!/usr/bin/env python3 -# exglobal_prep_emissions.py -# This script creates a emissions object -# which perform the pre-processing for aerosol emissions +""" +This script initializes a logger, reads configuration from the environment, and performs emissions pre-processing tasks using the AerosolEmissions class. + +The script does the following: +1. Initializes a root logger with the specified logging level and colored log output. +2. Reads configuration from the environment and converts it into a Python dictionary. +3. Instantiates an AerosolEmissions object with the configuration. +4. Retrieves specific keys from the emissions task configuration and stores them in a dictionary. +5. Sets the 'emistype' attribute in the configuration dictionary based on the 'emistype' value in the emissions configuration. +6. Initializes, configures, runs, and finalizes the emissions task using the provided parameters. + +Note: Make sure to have the necessary dependencies (wxflow, pygfs) installed to run this script successfully. +""" import os -from wxflow import Logger, cast_strdict_as_dtypedict +from wxflow import Logger, AttrDict, cast_strdict_as_dtypedict from pygfs import AerosolEmissions @@ -19,7 +29,17 @@ # Instantiate the emissions pre-processing task emissions = AerosolEmissions(config) - emissions.initialize() - emissions.configure() - emissions.execute(emissions.task_config.DATA, emissions.task_config.APRUN) - emissions.finalize() + + # get local keys for configuration + keys = ['DATA', 'forecast_dates', 'cdate', 'aero_emission_yaml'] + edict = AttrDict() + for key in keys: + edict[key] = emissions.task_config[key] + edict['CONFIG'] = edict.aero_emission_yaml.aero_emissions['config'] + edict.aero_emission_yaml['emistype'] = edict['CONFIG'].emistype + + # print(aero_emission_yaml.aero_emissions['fix_data']) + emissions.initialize(edict.aero_emission_yaml) + emissions.configure(edict.aero_emission_yaml) + emissions.run(workdir=edict.DATA, current_date=edict.cdate, forecast_dates=edict.forecast_dates, Config_dict=edict.CONFIG) + emissions.finalize(edict['CONFIG']) \ No newline at end of file diff --git a/ush/python/pygfs/task/aero_emissions.py b/ush/python/pygfs/task/aero_emissions.py index 5f2d4c6840..b915df2610 100644 --- a/ush/python/pygfs/task/aero_emissions.py +++ b/ush/python/pygfs/task/aero_emissions.py @@ -4,23 +4,30 @@ from logging import getLogger from typing import Dict, Any, Union from pprint import pformat +from glob import glob +from numpy import sort +import xarray as xr +from datetime import timedelta -from wxflow import (AttrDict, - parse_j2yaml, - FileHandler, - Jinja, - logit, - Task, - add_to_datetime, to_timedelta, - WorkflowException, - Executable, which) +from wxflow import ( + AttrDict, + parse_j2yaml, + FileHandler, + Jinja, + logit, + Task, + add_to_datetime, + to_timedelta, + WorkflowException, + Executable, + which, +) -logger = getLogger(__name__.split('.')[-1]) +logger = getLogger(__name__.split(".")[-1]) class AerosolEmissions(Task): - """Aerosol Emissions pre-processing Task - """ + """Aerosol Emissions pre-processing Task""" @logit(logger, name="AerosolEmissions") def __init__(self, config: Dict[str, Any]) -> None: @@ -37,27 +44,50 @@ def __init__(self, config: Dict[str, Any]) -> None: """ super().__init__(config) - local_variable = "something" + current_datetime = add_to_datetime( + self.task_config.current_cycle, + to_timedelta(f"{self.task_config.STARTHOUR}H"), + ) + nforecast_hours = self.task_config["FHMAX_GFS"] + nforecast_days = nforecast_hours // 24 + 1 + forecast_dates = [current_datetime + timedelta(days=i) for i in range(nforecast_days)] localdict = AttrDict( - {'variable_used_repeatedly': local_variable} + {"cdate": current_datetime, "nforecast_days": nforecast_days, "forecast_dates": forecast_dates} ) # Extend task_config with localdict self.task_config = AttrDict(**self.task_config, **localdict) + # Read the aero_emission.yaml file for common configuration + logger.info( + f"Read the prep_emission configuration yaml file {self.task_config.PREP_EMISSION_CONFIG}" + ) + self.task_config.aero_emission_yaml = parse_j2yaml( + self.task_config.PREP_EMISSION_CONFIG, self.task_config + ) + logger.debug( + f"aero_emission_yaml:\n{pformat(self.task_config.aero_emission_yaml)}" + ) + @staticmethod @logit(logger) - def initialize() -> None: - """Initialize the work directory - """ + def initialize(aero_emission_yaml: Dict) -> None: + """Initialize the work directory""" + logger.info("Copy Static Data to run directory") + # FileHandler(aero_emission_yaml.aero_emissions.fix_data).sync() @staticmethod @logit(logger) - def configure() -> None: + def configure(aero_emission_yaml: Dict) -> None: """Configure the artifacts in the work directory. Copy run specific data to run directory """ + # need to add climatology files to copy over dynamically + logger.info( + f"Copy '{aero_emission_yaml.aero_emissions.config.data}' data to run directory" + ) + FileHandler(aero_emission_yaml.aero_emissions.config.data).sync() @staticmethod @logit(logger) @@ -75,10 +105,266 @@ def execute(workdir: Union[str, os.PathLike], aprun_cmd: str) -> None: ------- None """ + AerosolEmissions.run(workdir) + + @classmethod + @logit(logger) + def run(cls, workdir: Union[str, os.PathLike], current_date: str = None, forecast_dates: list = None, Config_dict: Dict = {}) -> None: + emistype = Config_dict.emistype + ratio = Config_dict.ratio + debug = Config_dict.debug + climfiles = sort(glob("{}{}".format(Config_dict.climfile_str, "*.nc"))) + print(Config_dict.data_out['copy'][0][0]) + output_name = Config_dict.data_out.copy + + if emistype.lower() == "qfed": + basefile = glob("qfed2.emis_*.nc4") + elif emistype.lower() == "gbbepx": + basefile = glob("GBBEPx_all01GRID.emissions_v*.nc") + elif emistype.lower() == "hfed": + basefile = glob("hfed.emis_*.x576_y361.*nc4") + + dset = AerosolEmissions.make_fire_emission( + d=current_date, + climos=climfiles, + ratio=ratio, + scale_climo=True, + obsfile=basefile) + print(dset) + + AerosolEmissions.write_ncf(dset, Config_dict.data_out['copy'][0][0]) + + @staticmethod + @logit(logger) + def open_qfed(fname) -> xr.Dataset: + """ + Open QFED2 fire emissions data + + Parameters + ---------- + fname : str or list of str + Path(s) to the QFED2 fire emissions files + + Returns + ------- + xr.Dataset + Dataset containing the fire emissions data + """ + + vrs = ["BC", "CH4", "CO", "CO2", "NH3", "NOx", "OC", "PM2.5", "SO2"] + qfed_vars = ["bc", "ch4", "co", "co2", "nh3", "no", "oc", "pm25", "so2"] + das = [] + + if len(fname) > 1: + files = sort(fname) + else: + files = sort(glob(fname)) + + found_species = [] + dset_dict = {} + for f in files: + index_good = [[i, v] for i, v in enumerate(qfed_vars) if v in f] + good = index_good[0][0] + found_species.append(index_good[0][1]) + da = xr.open_dataset(f,decode_cf=False).biomass + da.name = vrs[good] + dset_dict[vrs[good]] = da + + dset = xr.Dataset(dset_dict) + return dset + + + @staticmethod + @logit(logger) + def open_climatology(fname) -> xr.Dataset: + # array to house datasets + das = [] + # print("") + # print("Opening Climatology Files...") + + if len(fname) > 1: + files = sort(fname) + else: + files = sort(glob(fname)) + # print(files) + xr.open_dataset(files[0]) + for i, f in enumerate(files): + # print(" opening:", f) + das.append(xr.open_dataset(f, engine="netcdf4")) + + return xr.concat(das, dim="time") + + @staticmethod + @logit(logger) + def write_ncf(dset, outfile) -> None: + """ + Write the given dataset to a NetCDF file with specified encoding. + + Parameters: + - dset (xarray.Dataset): The dataset to be written to the NetCDF file. + - outfile (str): The path and filename of the output NetCDF file. + + Returns: + None + """ + # print("Output File:", outfile) + encoding = {} + for v in dset.data_vars: + encoding[v] = dict(zlib=True, complevel=4) + if "latitude" in dset: + encoding["latitude"] = dict(zlib=True, complevel=4) + encoding["longitude"] = dict(zlib=True, complevel=4) + if "lat_b" in dset: + encoding["lat_b"] = dict(zlib=True, complevel=4) + encoding["lon_b"] = dict(zlib=True, complevel=4) + if "time" in dset: + encoding["time"] = dict(dtype="i4") + dset.load().to_netcdf(outfile, encoding=encoding) + + @staticmethod + @logit(logger) + def create_climatology( + emissions, climatology, lat_coarse=50, lon_coarse=50 + ) -> xr.Dataset: + """ + Create scaled climatology data based on emission data. + + Parameters: + emissions (xarray.DataArray): Emission data. + climatology (xarray.Dataset): Input climatology data. + lat_coarse (int, optional): Coarsening factor for latitude. Defaults to 50. + lon_coarse (int, optional): Coarsening factor for longitude. Defaults to 50. + + Returns: + xarray.Dataset: Scaled climatology data. + + """ + # Create a copy of the climatology + clim = climatology.copy() + + # Coarsen the climatology + clim_coarse = climatology.coarsen( + lat=lat_coarse, lon=lon_coarse, boundary="trim" + ).sum() + + # Calculate the ratio of emissions to climatology and handle NaN values + ratio = (emissions.squeeze().data / clim_coarse.where(clim_coarse > 0)).fillna( + 0 + ) + + # Interpolate the ratio to match the coordinates of the climatology + ratio_interp = ratio.sel(lat=clim.lat, lon=clim.lon, method="nearest") + + # Loop through each time slice and scale the climatology + for index, time_slice in enumerate(clim.time): + # Get the current time slice of the climatology + clim_slice = clim.data[index, :, :] + + # Calculate the weighted alpha ratio parameter + alpha = 1.0 - 1.0 / (index + 1) + + # Scale the current time slice + scaled_slice = clim_slice * ratio_interp[index, :, :] + + # Update the climatology with the scaled time slice + clim.data[index, :, :] = scaled_slice.squeeze().data + + return clim.compute() + + @staticmethod + @logit(logger) + def make_fire_emission( + d=None, + climos=None, + ratio=0.9, + scale_climo=True, + obsfile="GBBEPx_all01GRID.emissions_v004_20190601.nc", + ): + """ + Generate fire emissions data for a given date and forecast period. + + Parameters: + - d (str or pd.Timestamp): The date for which fire emissions are generated. + - climos (dict): Dictionary containing pre-calculated climatology data for scaling. + - ratio (float): The ratio of original data to climatology data for blending. + - scale_climo (bool): Flag indicating whether to scale the climatology data. + - n_forecast_days (int): Number of forecast days. + - obsfile (str): Path to the file containing observed fire emissions data. + - climo_directory (str): Directory containing climatology files. + + Returns: + - list: A list of xarray.Dataset objects representing fire emissions data for each forecast day. + """ + import pandas as pd + import numpy as np + + # get the timestamp + dd = pd.Timestamp(d) + + # open fire emission + if len(obsfile) > 1: + if "QFED".lower() in obsfile[0].lower(): + g = AerosolEmissions.open_qfed(obsfile) + else: + g = xr.open_mfdataset(obsfile, decode_cf=False) + else: + if "QFED".lower() in obsfile.lower(): + g = AerosolEmissions.open_qfed(obsfile) + else: + g = xr.open_mfdataset(obsfile, decode_cf=False) + + # open climotology + climo = AerosolEmissions.open_climatology(climos) + climo = climo.sel(lat=g["lat"], lon=g["lon"], method="nearest") + + # make weighted climo + gc = g.coarsen(lat=150, lon=150, boundary="trim").sum() + + dsets = [] + climo_scaled = {} + for tslice in np.arange(len(climos)): + print(tslice) + # make copy of original data + if tslice == 0: + dset = g.copy() + else: + dset = dsets[tslice - 1].copy() + dset.update({"time": [float(tslice * 24)]}) + dset.time.attrs = g.time.attrs + + for v in g.data_vars: + if not scale_climo: + if tslice > 5: + # kk = ratio * dset[v] + (1 - ratio) * climo[v].data[tslice, :, :] + dset[v].data = ( + ratio * dset[v] + (1 - ratio) * climo[v].data[tslice, :, :] + ) + else: + if tslice == 0: + # print("creating climatology scaling for", v) + climo_scaled[v] = AerosolEmissions.create_climatology( + gc[v], climo[v], lon_coarse=150, lat_coarse=150 + ) + else: + # cn = create_climdata(gc[v],climo[v]) + # print(cn) + if tslice > 5: + dset[v].data = ( + ratio * dset[v] + + (1 - ratio) * climo_scaled[v].data[tslice, :, :] + ) + else: + dset[v] = dset[v] + # print(dset) + dsets.append(dset) + return xr.concat(dsets, dim="time") @staticmethod @logit(logger) - def finalize() -> None: + def finalize(Config_dict: Dict) -> None: """Perform closing actions of the task. Copy data back from the DATA/ directory to COM/ """ + print(Config_dict.data_out) + logger.info(f"Copy '{Config_dict.data_out}' processed data to COM/ directory") + FileHandler(Config_dict.data_out).sync() \ No newline at end of file