diff --git a/src/fmu/ensemble/ensemble.py b/src/fmu/ensemble/ensemble.py index de944d8b..e36ffd3b 100644 --- a/src/fmu/ensemble/ensemble.py +++ b/src/fmu/ensemble/ensemble.py @@ -6,11 +6,19 @@ from __future__ import division from __future__ import print_function +import sys import re import os import glob from datetime import datetime +if sys.version_info >= (3, 2): + from concurrent.futures import ProcessPoolExecutor + + USE_CONCURRENT = True +else: + USE_CONCURRENT = False + import six import pandas as pd import numpy as np @@ -205,6 +213,34 @@ def _shortcut2path(keys, shortpath): # calling function handle further errors. return shortpath + def _check_loading_of_realization(self, realization, realdir, realidxregexp): + """Helper function for checking and logging the results of loading a + realization. Successfully loaded realizations will be added to + self._realizations. + + Args: + realization (ScratchRealization): Initialized ScratchRealization + realdir (str): directory to realization + realidxregexp (str): Regular expression or None + + Returns: + int: either 0 on fail, or 1 on success + """ + return_value = 0 + if realization.index is None: + logger.critical( + "Could not determine realization index " + "for path " + realdir + ) + if not realidxregexp: + logger.critical("Maybe you need to supply a regexp.") + else: + logger.critical("Your regular expression is maybe wrong.") + else: + self._realizations[realization.index] = realization + return_value = 1 + + return return_value + def add_realizations(self, paths, realidxregexp=None, autodiscovery=True): """Utility function to add realizations to the ensemble. @@ -215,9 +251,13 @@ def add_realizations(self, paths, realidxregexp=None, autodiscovery=True): This function passes on initialization to ScratchRealization and stores a reference to those generated objects. + This function will use multithreading when running in Python 3.2+, + via the concurrent.futures modules. + Args: paths (list/str): String or list of strings with wildcards to file system. Absolute or relative paths. + realidxregexp (str): Regular expression or None autodiscovery (boolean): whether files can be attempted auto-discovered @@ -234,21 +274,31 @@ def add_realizations(self, paths, realidxregexp=None, autodiscovery=True): globbedpaths = glob.glob(paths) count = 0 - for realdir in globbedpaths: - realization = ScratchRealization( - realdir, realidxregexp=realidxregexp, autodiscovery=autodiscovery - ) - if realization.index is None: - logger.critical( - "Could not determine realization index " + "for path " + realdir + if USE_CONCURRENT: + with ProcessPoolExecutor() as executor: + realfutures = [ + executor.submit( + ScratchRealization, + realdir, + realidxregexp=realidxregexp, + autodiscovery=autodiscovery, + ) + for realdir in globbedpaths + ] + + for idx, realfuture in enumerate(realfutures): + count += self._check_loading_of_realization( + realfuture.result(), globbedpaths[idx], realidxregexp ) - if not realidxregexp: - logger.critical("Maybe you need to supply a regexp.") - else: - logger.critical("Your regular expression is maybe wrong.") - else: - count += 1 - self._realizations[realization.index] = realization + else: + for realdir in globbedpaths: + realization = ScratchRealization( + realdir, realidxregexp=realidxregexp, autodiscovery=autodiscovery + ) + count += self._check_loading_of_realization( + realization, realdir, realidxregexp + ) + logger.info("add_realizations() found %d realizations", len(self._realizations)) return count @@ -469,10 +519,49 @@ def load_csv(self, localpath, convert_numeric=True, force_reread=False): """ return self.load_file(localpath, "csv", convert_numeric, force_reread) + @staticmethod + def _load_file( + realization, localpath, fformat, convert_numeric, force_reread, index + ): + """Wrapper function to be used for parallel loading of files + + Args: + realization: Single realization + localpath (str): path to the text file, relative to each realization + fformat (str): string identifying the file format. Supports 'txt' + and 'csv'. + convert_numeric (boolean): If set to True, numerical columns + will be searched for and have their dtype set + to integers or floats. If scalars, only numerical + data will be loaded. + force_reread (boolean): Force reread from file system. If + False, repeated calls to this function will + returned cached results. + index (int): realization index + + Returns: + realization with loaded file. + + """ + try: + realization.load_file(localpath, fformat, convert_numeric, force_reread) + except ValueError: + # This would at least occur for unsupported fileformat, + # and that we should not skip. + logger.critical("load_file() failed in realization %d", index) + raise ValueError + except IOError: + # At ensemble level, we allow files to be missing in + # some realizations + logger.warning("Could not read %s for realization %d", localpath, index) + + return realization + def load_file(self, localpath, fformat, convert_numeric=False, force_reread=False): """Function for calling load_file() in every realization - This function may utilize multithreading. + This function will use multithreading when running in Python 3.2+, + via the concurrent.futures modules. Args: localpath (str): path to the text file, relative to each realization @@ -487,22 +576,39 @@ def load_file(self, localpath, fformat, convert_numeric=False, force_reread=Fals returned cached results. Returns: pd.Dataframe: with loaded data aggregated. Column 'REAL' - distuinguishes each realizations data. + distinguishes each realizations data. + """ - for index, realization in self._realizations.items(): - try: - realization.load_file(localpath, fformat, convert_numeric, force_reread) - except ValueError: - # This would at least occur for unsupported fileformat, - # and that we should not skip. - logger.critical("load_file() failed in realization %d", index) - raise ValueError - except IOError: - # At ensemble level, we allow files to be missing in - # some realizations - logger.warning("Could not read %s for realization %d", localpath, index) + if USE_CONCURRENT: + args = [] + for index, realization in self._realizations.items(): + args.append( + ( + realization, + localpath, + fformat, + convert_numeric, + force_reread, + index, + ) + ) + with ProcessPoolExecutor() as executor: + for realization in executor.map(self._load_file, *zip(*args)): + self._realizations[realization.index] = realization + else: + for index, realization in self._realizations.items(): + self._load_file( + realization, + localpath, + fformat, + convert_numeric, + force_reread, + index, + ) + if self.get_df(localpath).empty: raise ValueError("No ensemble data found for %s", localpath) + return self.get_df(localpath) def find_files(self, paths, metadata=None, metayaml=False): @@ -618,6 +724,7 @@ def get_df(self, localpath): # No logging here, those error messages # should have appeared at construction using load_*() pass + if dflist: # Merge a dictionary of dataframes. The dict key is # the realization index, and end up in a MultiIndex @@ -628,6 +735,27 @@ def get_df(self, localpath): else: raise ValueError("No data found for " + localpath) + @staticmethod + def _load_smry( + realidx, + realization, + time_index, + column_keys, + start_date, + end_date, + include_restart, + ): + logger.info("Loading smry from realization %s", realidx) + realization.load_smry( + time_index=time_index, + column_keys=column_keys, + cache_eclsum=False, + start_date=start_date, + end_date=end_date, + include_restart=include_restart, + ) + return realization + def load_smry( self, time_index="raw", @@ -697,21 +825,39 @@ def load_smry( """ if not stacked: raise NotImplementedError - # Future: Multithread this! - for realidx, realization in self._realizations.items(): - # We do not store the returned DataFrames here, - # instead we look them up afterwards using get_df() - # Downside is that we have to compute the name of the - # cached object as it is not returned. - logger.info("Loading smry from realization %s", realidx) - realization.load_smry( - time_index=time_index, - column_keys=column_keys, - cache_eclsum=cache_eclsum, - start_date=start_date, - end_date=end_date, - include_restart=include_restart, - ) + + if USE_CONCURRENT: + args = [] + for realidx, realization in self._realizations.items(): + args.append( + ( + realidx, + realization, + time_index, + column_keys, + start_date, + end_date, + include_restart, + ) + ) + with ProcessPoolExecutor() as executor: + for realization in executor.map(self._load_smry, *zip(*args)): + self._realizations[realization.index] = realization + else: + for realidx, realization in self._realizations.items(): + # We do not store the returned DataFrames here, + # instead we look them up afterwards using get_df() + # Downside is that we have to compute the name of the + # cached object as it is not returned. + logger.info("Loading smry from realization %s", realidx) + realization.load_smry( + time_index=time_index, + column_keys=column_keys, + cache_eclsum=cache_eclsum, + start_date=start_date, + end_date=end_date, + include_restart=include_restart, + ) if isinstance(time_index, list): time_index = "custom" return self.get_df("share/results/tables/unsmry--" + time_index + ".csv") @@ -907,6 +1053,8 @@ def get_smry_dates( Returns: list of datetimes. Empty list if no data found. """ + if USE_CONCURRENT: + cache_eclsum = False # Build list of list of eclsum dates eclsumsdates = [] @@ -1011,6 +1159,7 @@ def _get_smry_dates(eclsumsdates, freq, normalize, start_date, end_date): datetimes = [start_date] + datetimes if end_date and end_date not in datetimes: datetimes = datetimes + [end_date] + return datetimes def get_smry_stats( @@ -1095,7 +1244,7 @@ def get_smry_stats( return pd.concat(dframes, names=["STATISTIC"], sort=False) - def get_wellnames(self, well_match=None): + def get_wellnames(self, well_match=None, cache=True): """ Return a union of all Eclipse Summary well names in all realizations (union). In addition, can return a list @@ -1104,6 +1253,7 @@ def get_wellnames(self, well_match=None): Args: well_match: `Optional`. String (or list of strings) with wildcard filter. If None, all wells are returned + cache (bool): `Optional`. Bool to set caching or not. Returns: list of strings with eclipse well names. Empty list if no summary file or no matched well names. @@ -1113,7 +1263,7 @@ def get_wellnames(self, well_match=None): well_match = [well_match] result = set() for _, realization in self._realizations.items(): - eclsum = realization.get_eclsum() + eclsum = realization.get_eclsum(cache=cache) if eclsum: if well_match is None: result = result.union(set(eclsum.wells())) diff --git a/src/fmu/ensemble/ensembleset.py b/src/fmu/ensemble/ensembleset.py index e99af1d6..df14ecd9 100644 --- a/src/fmu/ensemble/ensembleset.py +++ b/src/fmu/ensemble/ensembleset.py @@ -6,6 +6,7 @@ from __future__ import division from __future__ import print_function +import sys import re import os import glob @@ -526,7 +527,7 @@ def load_smry( self, time_index=None, column_keys=None, - cache_eclsum=True, + cache_eclsum=False, start_date=None, end_date=None, ): @@ -625,9 +626,7 @@ def get_smry( if smrylist: return pd.concat(smrylist, sort=False) - def get_smry_dates( - self, freq="monthly", cache_eclsum=True, start_date=None, end_date=None - ): + def get_smry_dates(self, freq="monthly", start_date=None, end_date=None): """Return list of datetimes from an ensembleset Datetimes from each realization in each ensemble can @@ -658,7 +657,7 @@ def get_smry_dates( rawdates = rawdates.union( ensemble.get_smry_dates( freq="report", - cache_eclsum=cache_eclsum, + cache_eclsum=False, start_date=start_date, end_date=end_date, ) @@ -696,7 +695,15 @@ def get_wellnames(self, well_match=None): summary file or no matched well names. """ + # Caching should not be used when running concurrent + if sys.version_info >= (3, 2): + cache = False + else: + cache = True + result = set() for _, ensemble in self._ensembles.items(): - result = result.union(ensemble.get_wellnames(well_match)) + result = result.union( + ensemble.get_wellnames(well_match=well_match, cache=cache) + ) return sorted(list(result)) diff --git a/src/fmu/ensemble/realization.py b/src/fmu/ensemble/realization.py index 065ffe06..b5bea9ae 100644 --- a/src/fmu/ensemble/realization.py +++ b/src/fmu/ensemble/realization.py @@ -12,6 +12,7 @@ from __future__ import division from __future__ import print_function +import sys import os import re import copy @@ -42,6 +43,8 @@ from .virtualrealization import VirtualRealization from .realizationcombination import RealizationCombination +USE_CONCURRENT = bool(sys.version_info >= (3, 2)) + fmux = Interaction() logger = fmux.basiclogger(__name__) @@ -838,6 +841,13 @@ def get_eclsum(self, cache=True, include_restart=True): EclSum: object representing the summary file. None if nothing was found. """ + + if USE_CONCURRENT: + # Using caching when in concurrent mode will result + # in segementation errors from libecl. + # cache=False + pass + if cache and self._eclsum: # Return cached object if available if self._eclsum_include_restart == include_restart: return self._eclsum @@ -1019,7 +1029,6 @@ def get_smry( ) else: time_index_arg = time_index - if self.get_eclsum(cache=cache_eclsum, include_restart=include_restart): try: dataframe = self.get_eclsum( @@ -1030,7 +1039,9 @@ def get_smry( return pd.DataFrame() if not cache_eclsum: # Ensure EclSum object can be garbage collected - self._eclsum = None + # Commented out otherwise segmetation error + if not USE_CONCURRENT: + self._eclsum = None return dataframe else: return pd.DataFrame() @@ -1237,6 +1248,7 @@ def get_smryvalues(self, props_wildcard=None): prop: self._eclsum.get_values(prop, report_only=False) for prop in props } dates = self._eclsum.get_dates(report_only=False) + return pd.DataFrame(data=data, index=dates) def get_smry_dates( diff --git a/tests/test_ensemble.py b/tests/test_ensemble.py index f3df423e..c1c25de3 100644 --- a/tests/test_ensemble.py +++ b/tests/test_ensemble.py @@ -5,6 +5,7 @@ from __future__ import division from __future__ import print_function +import sys import os import shutil @@ -750,52 +751,54 @@ def test_nonexisting(): def test_eclsumcaching(): """Test caching of eclsum""" + if sys.version_info < (3, 2): + # pylint: disable=W0212 + if "__file__" in globals(): + # Easen up copying test code into interactive sessions + testdir = os.path.dirname(os.path.abspath(__file__)) + else: + testdir = os.path.abspath(".") - if "__file__" in globals(): - # Easen up copying test code into interactive sessions - testdir = os.path.dirname(os.path.abspath(__file__)) - else: - testdir = os.path.abspath(".") - - dirs = testdir + "/data/testensemble-reek001/" + "realization-*/iter-0" - ens = ScratchEnsemble("reektest", dirs) + dirs = testdir + "/data/testensemble-reek001/" + "realization-*/iter-0" + ens = ScratchEnsemble("reektest", dirs) - # The problem here is if you load in a lot of UNSMRY files - # and the Python process keeps them in memory. Not sure - # how to check in code that an object has been garbage collected - # but for garbage collection to work, at least the realization - # _eclsum variable must be None. + # The problem here is if you load in a lot of UNSMRY files + # and the Python process keeps them in memory. Not sure + # how to check in code that an object has been garbage collected + # but for garbage collection to work, at least the realization + # _eclsum variable must be None. + ens.load_smry() - ens.load_smry() - # Default is to do caching, so these will not be None: - assert all([x._eclsum for (idx, x) in ens._realizations.items()]) + # When not using concurrent, in older Python versions, the default is + # to do caching, so these will not be None: + assert all([x._eclsum for (idx, x) in ens._realizations.items()]) - # If we redo this operation, the same objects should all - # be None afterwards: - ens.load_smry(cache_eclsum=None) - assert not any([x._eclsum for (idx, x) in ens._realizations.items()]) + # If we redo this operation, the same objects should all + # be None afterwards: + ens.load_smry(cache_eclsum=None) + assert not any([x._eclsum for (idx, x) in ens._realizations.items()]) - ens.get_smry() - assert all([x._eclsum for (idx, x) in ens._realizations.items()]) + ens.get_smry() + assert all([x._eclsum for (idx, x) in ens._realizations.items()]) - ens.get_smry(cache_eclsum=False) - assert not any([x._eclsum for (idx, x) in ens._realizations.items()]) + ens.get_smry(cache_eclsum=False) + assert not any([x._eclsum for (idx, x) in ens._realizations.items()]) - ens.get_smry_stats() - assert all([x._eclsum for (idx, x) in ens._realizations.items()]) + ens.get_smry_stats() + assert all([x._eclsum for (idx, x) in ens._realizations.items()]) - ens.get_smry_stats(cache_eclsum=False) - assert not any([x._eclsum for (idx, x) in ens._realizations.items()]) + ens.get_smry_stats(cache_eclsum=False) + assert not any([x._eclsum for (idx, x) in ens._realizations.items()]) - ens.get_smry_dates() - assert all([x._eclsum for (idx, x) in ens._realizations.items()]) + ens.get_smry_dates() + assert all([x._eclsum for (idx, x) in ens._realizations.items()]) - # Clear the cached objects because the statement above has cached it.. - for _, realization in ens._realizations.items(): - realization._eclsum = None + # Clear the cached objects because the statement above has cached it.. + for _, realization in ens._realizations.items(): + realization._eclsum = None - ens.get_smry_dates(cache_eclsum=False) - assert not any([x._eclsum for (idx, x) in ens._realizations.items()]) + ens.get_smry_dates(cache_eclsum=False) + assert not any([x._eclsum for (idx, x) in ens._realizations.items()]) def test_filedescriptors():