From bceaf585e49b48fe0476f575ad024655c51fe327 Mon Sep 17 00:00:00 2001 From: David Hoese Date: Mon, 22 Mar 2021 14:07:10 -0500 Subject: [PATCH 1/6] Cleanup test_geometry.py to use smarter skipifs --- pyresample/test/test_geometry.py | 83 +++++++++++++++++--------------- 1 file changed, 43 insertions(+), 40 deletions(-) diff --git a/pyresample/test/test_geometry.py b/pyresample/test/test_geometry.py index c807773b5..1776980fc 100644 --- a/pyresample/test/test_geometry.py +++ b/pyresample/test/test_geometry.py @@ -33,10 +33,17 @@ import unittest import pyproj +from pyproj import CRS + +try: + import dask.array as da +except ImportError: + da = None + try: - from pyproj import CRS + import xarray as xr except ImportError: - CRS = None + xr = None class Test(unittest.TestCase): @@ -237,7 +244,6 @@ def test_dump(self): mock_open.assert_called_once_with('area_file.yml', 'a') mock_open.return_value.__enter__().write.assert_called_once_with(yaml_string) - def test_parse_area_file(self): """Test parsing the are file.""" from pyresample import utils @@ -423,44 +429,37 @@ def test_swath_hash(self): self.assertIsInstance(hash(swath_def), int) - try: - import dask.array as da - except ImportError: - print("Not testing with dask arrays") - else: - dalons = da.from_array(lons, chunks=1000) - dalats = da.from_array(lats, chunks=1000) - swath_def = geometry.SwathDefinition(dalons, dalats) - - self.assertIsInstance(hash(swath_def), int) - - try: - import xarray as xr - except ImportError: - print("Not testing with xarray") - else: - xrlons = xr.DataArray(lons) - xrlats = xr.DataArray(lats) - swath_def = geometry.SwathDefinition(xrlons, xrlats) - - self.assertIsInstance(hash(swath_def), int) - - try: - import xarray as xr - import dask.array as da - except ImportError: - print("Not testing with xarrays and dask arrays") - else: - xrlons = xr.DataArray(da.from_array(lons, chunks=1000)) - xrlats = xr.DataArray(da.from_array(lats, chunks=1000)) - swath_def = geometry.SwathDefinition(xrlons, xrlats) - - self.assertIsInstance(hash(swath_def), int) + @pytest.mark.skipif(da is None, reason="dask is not installed") + def test_swath_hash_dask(self): + """Test hashing SwathDefinitions with dask arrays underneath.""" + lons = np.array([1.2, 1.3, 1.4, 1.5]) + lats = np.array([65.9, 65.86, 65.82, 65.78]) + dalons = da.from_array(lons, chunks=1000) + dalats = da.from_array(lats, chunks=1000) + swath_def = geometry.SwathDefinition(dalons, dalats) + self.assertIsInstance(hash(swath_def), int) - lons = np.ma.array([1.2, 1.3, 1.4, 1.5]) - lats = np.ma.array([65.9, 65.86, 65.82, 65.78]) - swath_def = geometry.SwathDefinition(lons, lats) + @pytest.mark.skipif(xr is None, reason="xarray is not installed") + def test_swath_hash_xarray(self): + """Test hashing SwathDefinitions with DataArrays underneath.""" + lons = np.array([1.2, 1.3, 1.4, 1.5]) + lats = np.array([65.9, 65.86, 65.82, 65.78]) + xrlons = xr.DataArray(lons) + xrlats = xr.DataArray(lats) + swath_def = geometry.SwathDefinition(xrlons, xrlats) + self.assertIsInstance(hash(swath_def), int) + @pytest.mark.skipif(da is None, reason="dask is not installed") + @pytest.mark.skipif(xr is None, reason="xarray is not installed") + def test_swath_hash_xarray_with_dask(self): + """Test hashing SwathDefinitions with DataArrays:dask underneath.""" + lons = np.array([1.2, 1.3, 1.4, 1.5]) + lats = np.array([65.9, 65.86, 65.82, 65.78]) + dalons = da.from_array(lons, chunks=1000) + dalats = da.from_array(lats, chunks=1000) + xrlons = xr.DataArray(dalons) + xrlats = xr.DataArray(dalats) + swath_def = geometry.SwathDefinition(xrlons, xrlats) self.assertIsInstance(hash(swath_def), int) def test_area_equal(self): @@ -2337,12 +2336,16 @@ def test_freeze(self): (np.linspace(-75, -90.0, 10),), ], ) - def test_freeze_longlat_antimeridian(self, lats): + @pytest.mark.parametrize('use_dask', [False, True]) + def test_freeze_longlat_antimeridian(self, lats, use_dask): """Test geographic areas over the antimeridian.""" area = geometry.DynamicAreaDefinition('test_area', 'A test area', 'EPSG:4326') lons = np.linspace(175, 185, 10) lons[lons > 180] -= 360 + if use_dask: + lons = da.from_array(lons) + lats = da.from_array(lats) result = area.freeze((lons, lats), resolution=0.0056) From f07922c98c70e4f0fddc8bdb665db2fa23be1cb9 Mon Sep 17 00:00:00 2001 From: David Hoese Date: Mon, 22 Mar 2021 17:32:32 -0500 Subject: [PATCH 2/6] Improve dask support in DynamicAreaDefinition's freeze method --- pyresample/geometry.py | 34 +++++++++++++++++++++++++++++++- pyresample/test/test_geometry.py | 15 +++++++++++--- pyresample/utils/proj4.py | 31 +++++++++++++++++++++++++++++ 3 files changed, 76 insertions(+), 4 deletions(-) diff --git a/pyresample/geometry.py b/pyresample/geometry.py index 9eb0555d1..9011cbb4b 100644 --- a/pyresample/geometry.py +++ b/pyresample/geometry.py @@ -974,9 +974,14 @@ def freeze(self, lonslats=None, resolution=None, shape=None, proj_info=None): area_extent, self.rotation) def _compute_bound_centers(self, proj_dict, lonslats): - proj4 = Proj(proj_dict) lons, lats = self._extract_lons_lats(lonslats) + if hasattr(lons, 'compute'): + return self._compute_bound_centers_dask(proj_dict, lons, lats) + return self._compute_bound_centers_numpy(proj_dict, lons, lats) + + def _compute_bound_centers_numpy(self, proj_dict, lons, lats): # TODO: Do more dask-friendly things here + proj4 = Proj(proj_dict) xarr, yarr = proj4(np.asarray(lons), np.asarray(lats)) xarr[xarr > 9e29] = np.nan yarr[yarr > 9e29] = np.nan @@ -993,6 +998,33 @@ def _compute_bound_centers(self, proj_dict, lonslats): xmax = np.nanmax(xarr[xarr < 0]) + 360 return xmin, ymin, xmax, ymax + def _compute_bound_centers_dask(self, proj_dict, lons,lats): + from pyresample.utils.proj4 import transform_dask + import dask.array as da + crs = CRS(proj_dict) + xarr, yarr = transform_dask(crs, lons, lats) + xarr = da.where(xarr > 9e29, np.nan, xarr) + yarr = da.where(yarr > 9e29, np.nan, yarr) + _xmin = np.nanmin(xarr) + _xmax = np.nanmax(xarr) + _ymin = np.nanmin(yarr) + _ymax = np.nanmax(yarr) + xmin, xmax, ymin, ymax = da.compute( + _xmin, + _xmax, + _ymin, + _ymax) + + x_passes_antimeridian = (xmax - xmin) > 355 + epsilon = 0.1 + y_is_pole = (ymax >= 90 - epsilon) or (ymin <= -90 + epsilon) + if crs.is_geographic and x_passes_antimeridian and not y_is_pole: + # cross anti-meridian of projection + xmin = np.nanmin(xarr[xarr >= 0]) + xmax = np.nanmax(xarr[xarr < 0]) + 360 + xmin, xmax = da.compute(xmin, xmax) + return xmin, ymin, xmax, ymax + def _extract_lons_lats(self, lonslats): try: lons, lats = lonslats diff --git a/pyresample/test/test_geometry.py b/pyresample/test/test_geometry.py index 1776980fc..1787955b7 100644 --- a/pyresample/test/test_geometry.py +++ b/pyresample/test/test_geometry.py @@ -2339,17 +2339,26 @@ def test_freeze(self): @pytest.mark.parametrize('use_dask', [False, True]) def test_freeze_longlat_antimeridian(self, lats, use_dask): """Test geographic areas over the antimeridian.""" + import dask + from pyresample.test.utils import CustomScheduler area = geometry.DynamicAreaDefinition('test_area', 'A test area', 'EPSG:4326') lons = np.linspace(175, 185, 10) lons[lons > 180] -= 360 + is_pole = (np.abs(lats) > 88).any() if use_dask: + # if we aren't at a pole then we adjust the coordinates + # that takes a total of 2 computations + num_computes = 1 if is_pole else 2 lons = da.from_array(lons) lats = da.from_array(lats) - result = area.freeze((lons, lats), - resolution=0.0056) + with dask.config.set(scheduler=CustomScheduler(num_computes)): + result = area.freeze((lons, lats), + resolution=0.0056) + else: + result = area.freeze((lons, lats), + resolution=0.0056) - is_pole = (np.abs(lats) > 88).any() extent = result.area_extent if is_pole: assert extent[0] < -178 diff --git a/pyresample/utils/proj4.py b/pyresample/utils/proj4.py index df226ea20..7be1d2802 100644 --- a/pyresample/utils/proj4.py +++ b/pyresample/utils/proj4.py @@ -19,6 +19,7 @@ import math from collections import OrderedDict +import numpy as np from pyproj import CRS @@ -91,3 +92,33 @@ def get_geostationary_height(geos_area_crs): params = geos_area_crs.coordinate_operation.params h_param = [p for p in params if 'satellite height' in p.name.lower()][0] return h_param.value + + +def _transform_dask_chunk(x, y, crs_from, crs_to): + from pyproj import Transformer + crs_from = CRS(crs_from) + crs_to = CRS(crs_to) + transformer = Transformer.from_crs(crs_from, crs_to, + always_xy=True) + return np.stack(transformer.transform(x, y), axis=-1) + + +def transform_dask(crs, x, y, inverse=False): + """Transform coordinates using pyproj in a dask-friendly way.""" + import dask.array as da + crs = CRS.from_user_input(crs) + if not inverse: + crs_to = crs + crs_from = CRS(4326) + else: + crs_to = CRS(4326) + crs_from = crs + # CRS objects aren't thread-safe until pyproj 3.1+ + # convert to WKT strings to be safe + result = da.map_blocks(_transform_dask_chunk, x, y, + crs_from.to_wkt(), crs_to.to_wkt(), + dtype=x.dtype, chunks=x.chunks + ((2,),), + new_axis=x.ndim) + x = result[..., 0] + y = result[..., 1] + return x, y From 612630ffb8fe098a24bd475d39c9d414d4ee5ca3 Mon Sep 17 00:00:00 2001 From: David Hoese Date: Mon, 22 Mar 2021 17:33:12 -0500 Subject: [PATCH 3/6] Fix missing space style issue --- pyresample/geometry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyresample/geometry.py b/pyresample/geometry.py index 9011cbb4b..66b811547 100644 --- a/pyresample/geometry.py +++ b/pyresample/geometry.py @@ -998,7 +998,7 @@ def _compute_bound_centers_numpy(self, proj_dict, lons, lats): xmax = np.nanmax(xarr[xarr < 0]) + 360 return xmin, ymin, xmax, ymax - def _compute_bound_centers_dask(self, proj_dict, lons,lats): + def _compute_bound_centers_dask(self, proj_dict, lons, lats): from pyresample.utils.proj4 import transform_dask import dask.array as da crs = CRS(proj_dict) From 4cd208bedbc12bcfafc4613622480c48f3d4a92e Mon Sep 17 00:00:00 2001 From: David Hoese Date: Tue, 13 Apr 2021 15:42:09 -0500 Subject: [PATCH 4/6] Add DaskFriendlyTransformer class to wrap pyproj Transformer with dask --- pyresample/geometry.py | 6 ++-- pyresample/utils/proj4.py | 65 ++++++++++++++++++++++++--------------- 2 files changed, 45 insertions(+), 26 deletions(-) diff --git a/pyresample/geometry.py b/pyresample/geometry.py index 66b811547..b2ac930bb 100644 --- a/pyresample/geometry.py +++ b/pyresample/geometry.py @@ -999,10 +999,12 @@ def _compute_bound_centers_numpy(self, proj_dict, lons, lats): return xmin, ymin, xmax, ymax def _compute_bound_centers_dask(self, proj_dict, lons, lats): - from pyresample.utils.proj4 import transform_dask + from pyresample.utils.proj4 import DaskFriendlyTransformer import dask.array as da crs = CRS(proj_dict) - xarr, yarr = transform_dask(crs, lons, lats) + transformer = DaskFriendlyTransformer.from_crs(CRS(4326), crs, + always_xy=True) + xarr, yarr = transformer.transform(lons, lats) xarr = da.where(xarr > 9e29, np.nan, xarr) yarr = da.where(yarr > 9e29, np.nan, yarr) _xmin = np.nanmin(xarr) diff --git a/pyresample/utils/proj4.py b/pyresample/utils/proj4.py index 7be1d2802..166ca6abb 100644 --- a/pyresample/utils/proj4.py +++ b/pyresample/utils/proj4.py @@ -20,7 +20,7 @@ from collections import OrderedDict import numpy as np -from pyproj import CRS +from pyproj import CRS, Transformer as PROJTransformer def convert_proj_floats(proj_pairs): @@ -94,31 +94,48 @@ def get_geostationary_height(geos_area_crs): return h_param.value -def _transform_dask_chunk(x, y, crs_from, crs_to): - from pyproj import Transformer +def _transform_dask_chunk(x, y, crs_from, crs_to, kwargs): crs_from = CRS(crs_from) crs_to = CRS(crs_to) - transformer = Transformer.from_crs(crs_from, crs_to, - always_xy=True) + transformer = PROJTransformer.from_crs(crs_from, crs_to, **kwargs) return np.stack(transformer.transform(x, y), axis=-1) -def transform_dask(crs, x, y, inverse=False): - """Transform coordinates using pyproj in a dask-friendly way.""" - import dask.array as da - crs = CRS.from_user_input(crs) - if not inverse: - crs_to = crs - crs_from = CRS(4326) - else: - crs_to = CRS(4326) - crs_from = crs - # CRS objects aren't thread-safe until pyproj 3.1+ - # convert to WKT strings to be safe - result = da.map_blocks(_transform_dask_chunk, x, y, - crs_from.to_wkt(), crs_to.to_wkt(), - dtype=x.dtype, chunks=x.chunks + ((2,),), - new_axis=x.ndim) - x = result[..., 0] - y = result[..., 1] - return x, y +class DaskFriendlyTransformer: + """Wrapper around the pyproj Transformer class that uses dask.""" + + def __init__(self, src_crs, dst_crs, **kwargs): + """Initialize the transformer with CRS objects. + + This method should not be used directly, just like pyproj.Transformer + should not be created directly. + + """ + self.src_crs = src_crs + self.dst_crs = dst_crs + self.kwargs = kwargs + + @classmethod + def from_crs(cls, crs_from, crs_to, **kwargs): + """Create transformer object from two CRS objects.""" + return cls(crs_from, crs_to, **kwargs) + + def transform(self, x, y, inverse=False): + """Transform coordinates.""" + import dask.array as da + if not inverse: + crs_to = self.src_crs + crs_from = self.dst_crs + else: + crs_to = self.dst_crs + crs_from = self.src_crs + # CRS objects aren't thread-safe until pyproj 3.1+ + # convert to WKT strings to be safe + result = da.map_blocks(_transform_dask_chunk, x, y, + crs_from.to_wkt(), crs_to.to_wkt(), + dtype=x.dtype, chunks=x.chunks + ((2,),), + kwargs=self.kwargs, + new_axis=x.ndim) + x = result[..., 0] + y = result[..., 1] + return x, y From bca8e1155608cce326334b94c8343bf9f9781c24 Mon Sep 17 00:00:00 2001 From: David Hoese Date: Tue, 13 Apr 2021 15:46:06 -0500 Subject: [PATCH 5/6] Remove unnecessary try/except on pyresample test imports --- pyresample/test/test_geometry.py | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/pyresample/test/test_geometry.py b/pyresample/test/test_geometry.py index 1787955b7..e7c526135 100644 --- a/pyresample/test/test_geometry.py +++ b/pyresample/test/test_geometry.py @@ -35,15 +35,8 @@ from pyproj import CRS -try: - import dask.array as da -except ImportError: - da = None - -try: - import xarray as xr -except ImportError: - xr = None +import dask.array as da +import xarray as xr class Test(unittest.TestCase): @@ -429,7 +422,6 @@ def test_swath_hash(self): self.assertIsInstance(hash(swath_def), int) - @pytest.mark.skipif(da is None, reason="dask is not installed") def test_swath_hash_dask(self): """Test hashing SwathDefinitions with dask arrays underneath.""" lons = np.array([1.2, 1.3, 1.4, 1.5]) @@ -439,7 +431,6 @@ def test_swath_hash_dask(self): swath_def = geometry.SwathDefinition(dalons, dalats) self.assertIsInstance(hash(swath_def), int) - @pytest.mark.skipif(xr is None, reason="xarray is not installed") def test_swath_hash_xarray(self): """Test hashing SwathDefinitions with DataArrays underneath.""" lons = np.array([1.2, 1.3, 1.4, 1.5]) @@ -449,8 +440,6 @@ def test_swath_hash_xarray(self): swath_def = geometry.SwathDefinition(xrlons, xrlats) self.assertIsInstance(hash(swath_def), int) - @pytest.mark.skipif(da is None, reason="dask is not installed") - @pytest.mark.skipif(xr is None, reason="xarray is not installed") def test_swath_hash_xarray_with_dask(self): """Test hashing SwathDefinitions with DataArrays:dask underneath.""" lons = np.array([1.2, 1.3, 1.4, 1.5]) From 81bc10bc53b080928fca87f664ec8b5db855083e Mon Sep 17 00:00:00 2001 From: David Hoese Date: Wed, 14 Apr 2021 10:47:38 -0500 Subject: [PATCH 6/6] Allow for transform_kwargs to be passed to dask friendly transformer --- pyresample/utils/proj4.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/pyresample/utils/proj4.py b/pyresample/utils/proj4.py index 166ca6abb..0ed91b689 100644 --- a/pyresample/utils/proj4.py +++ b/pyresample/utils/proj4.py @@ -94,11 +94,11 @@ def get_geostationary_height(geos_area_crs): return h_param.value -def _transform_dask_chunk(x, y, crs_from, crs_to, kwargs): +def _transform_dask_chunk(x, y, crs_from, crs_to, kwargs, transform_kwargs): crs_from = CRS(crs_from) crs_to = CRS(crs_to) transformer = PROJTransformer.from_crs(crs_from, crs_to, **kwargs) - return np.stack(transformer.transform(x, y), axis=-1) + return np.stack(transformer.transform(x, y, **transform_kwargs), axis=-1) class DaskFriendlyTransformer: @@ -120,21 +120,18 @@ def from_crs(cls, crs_from, crs_to, **kwargs): """Create transformer object from two CRS objects.""" return cls(crs_from, crs_to, **kwargs) - def transform(self, x, y, inverse=False): + def transform(self, x, y, **kwargs): """Transform coordinates.""" import dask.array as da - if not inverse: - crs_to = self.src_crs - crs_from = self.dst_crs - else: - crs_to = self.dst_crs - crs_from = self.src_crs + crs_from = self.src_crs + crs_to = self.dst_crs # CRS objects aren't thread-safe until pyproj 3.1+ # convert to WKT strings to be safe result = da.map_blocks(_transform_dask_chunk, x, y, crs_from.to_wkt(), crs_to.to_wkt(), dtype=x.dtype, chunks=x.chunks + ((2,),), kwargs=self.kwargs, + transform_kwargs=kwargs, new_axis=x.ndim) x = result[..., 0] y = result[..., 1]