From e5d45ff319a70dd0364abddb78350b60fbe4dab3 Mon Sep 17 00:00:00 2001 From: jpolchlo Date: Fri, 8 Oct 2021 10:42:46 -0400 Subject: [PATCH 1/5] Capture intersection errors --- .../util/IntersectGeometry.scala | 45 +++++++++++-------- 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/src/main/scala/org/globalforestwatch/util/IntersectGeometry.scala b/src/main/scala/org/globalforestwatch/util/IntersectGeometry.scala index 76f07c13..431c7313 100644 --- a/src/main/scala/org/globalforestwatch/util/IntersectGeometry.scala +++ b/src/main/scala/org/globalforestwatch/util/IntersectGeometry.scala @@ -5,6 +5,7 @@ import com.vividsolutions.jts.geom.{ GeometryCollection, MultiPolygon, Polygon, + TopologyException, } import org.globalforestwatch.util.GeoSparkGeometryConstructor.createMultiPolygon @@ -20,24 +21,32 @@ object IntersectGeometry { * */ val userData = thisGeom.getUserData - val intersection: Geometry = thisGeom intersection thatGeom - intersection match { - case poly: Polygon => - val multi = createMultiPolygon(Array(poly)) - multi.setUserData(userData) - List(multi) - case multi: MultiPolygon => - multi.setUserData(userData) - List(multi) - case collection: GeometryCollection => - val maybe_multi = extractPolygons(collection) - maybe_multi match { - case Some(multi) => - multi.setUserData(userData) - List(multi) - case _ => List() - } - case _ => List() + try { + val intersection: Geometry = thisGeom intersection thatGeom + intersection match { + case poly: Polygon => + val multi = createMultiPolygon(Array(poly)) + multi.setUserData(userData) + List(multi) + case multi: MultiPolygon => + multi.setUserData(userData) + List(multi) + case collection: GeometryCollection => + val maybe_multi = extractPolygons(collection) + maybe_multi match { + case Some(multi) => + multi.setUserData(userData) + List(multi) + case _ => List() + } + case _ => List() + } + } catch { + case e: TopologyException => + val wkt = new com.vividsolutions.jts.io.WKTWriter + new java.io.PrintWriter("/tmp/thisGeom.wkt"){ write(wkt.write(thisGeom)); close } + new java.io.PrintWriter("/tmp/thatGeom.wkt"){ write(wkt.write(thatGeom)); close } + throw e } } From 16d37aa76f32cb02c97bce454b6383cab8c7d76c Mon Sep 17 00:00:00 2001 From: jpolchlo Date: Tue, 12 Oct 2021 10:50:34 -0400 Subject: [PATCH 2/5] Add in (commented) code pathway for more detailed processing of geometry using UDFs; may be useful refernence if buffer(0) is deemed inappropriate --- .../features/SpatialFeatureDF.scala | 40 ++++++++++++++++--- 1 file changed, 34 insertions(+), 6 deletions(-) diff --git a/src/main/scala/org/globalforestwatch/features/SpatialFeatureDF.scala b/src/main/scala/org/globalforestwatch/features/SpatialFeatureDF.scala index 56eba1ae..46c5e1c7 100644 --- a/src/main/scala/org/globalforestwatch/features/SpatialFeatureDF.scala +++ b/src/main/scala/org/globalforestwatch/features/SpatialFeatureDF.scala @@ -1,7 +1,11 @@ package org.globalforestwatch.features import cats.data.NonEmptyList +import geotrellis.vector.Geometry +import com.vividsolutions.{jts => vs} import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.functions.{col, struct, udf} +import org.globalforestwatch.util.GeometryFixer object SpatialFeatureDF { def apply(input: NonEmptyList[String], @@ -48,13 +52,37 @@ object SpatialFeatureDF { FeatureDF(input, featureObj, filters, spark, delimiter) val emptyPolygonWKB = "0106000020E610000000000000" + // featureDF.printSchema + // ST_PrecisionReduce may create invalid geometry if it contains a "sliver" that is below the precision threshold // ST_Buffer(0) fixes these invalid geometries - featureDF - .selectExpr( - s"ST_Buffer(ST_PrecisionReduce(ST_GeomFromWKB(${wkbField}), 11), 0) AS polyshape", - s"struct(${featureObj.featureIdExpr}) as featureId" - ) - .where(s"${wkbField} != '${emptyPolygonWKB}'") + val res = featureDF + .selectExpr( + s"ST_Buffer(ST_PrecisionReduce(ST_GeomFromWKB(${wkbField}), 11), 0) AS polyshape", + s"struct(${featureObj.featureIdExpr}) as featureId" + ) + .where(s"${wkbField} != '${emptyPolygonWKB}'") + + // val readAndReduceUDF = udf{ s: String => readAndReduce(s) } + // val res = featureDF + // .where(s"${wkbField} != '${emptyPolygonWKB}'") + // .selectExpr( + // s"${wkbField} AS wkb", + // s"struct(${featureObj.featureIdExpr}) as featureId" + // ) + // .select( + // readAndReduceUDF(col("wkb")).as("polyshape"), + // col("featureId") + // ) + + // res.printSchema + + res + } + + def readAndReduce(wkbHexString: String): vs.geom.Geometry = { + val geom = geotrellis.vector.io.wkb.WKB.read(javax.xml.bind.DatatypeConverter.parseHexBinary(wkbHexString)) + val fixed = GeometryFixer(geom).fix() + (new vs.io.WKBReader).read((new geotrellis.vector.io.wkb.WKBWriter).write(fixed)) } } From 665e38b8b0dc1fba9b6a62a1fe49ee59bff04ac9 Mon Sep 17 00:00:00 2001 From: jpolchlo Date: Fri, 15 Oct 2021 12:47:13 -0400 Subject: [PATCH 3/5] Supply alternate analytical pathway using validated geometries to log errors in the output; this runs, but we have already cleaned out bad geoms in the first instance using buffer(0) Handle bad WKB inputs; must discard because some feature spark sql expressions can reference geometry --- .../features/SpatialFeatureDF.scala | 68 ++++++---- .../features/ValidatedFeatureRDD.scala | 120 ++++++++++++++++++ .../summarystats/ErrorSummaryRDD.scala | 6 +- .../ForestChangeDiagnosticAnalysis.scala | 34 +++-- .../ForestChangeDiagnosticCommand.scala | 2 +- .../util/GeotrellisGeometryValidator.scala | 6 +- .../util/IntersectGeometry.scala | 49 ++++++- 7 files changed, 236 insertions(+), 49 deletions(-) create mode 100644 src/main/scala/org/globalforestwatch/features/ValidatedFeatureRDD.scala diff --git a/src/main/scala/org/globalforestwatch/features/SpatialFeatureDF.scala b/src/main/scala/org/globalforestwatch/features/SpatialFeatureDF.scala index 46c5e1c7..80a03900 100644 --- a/src/main/scala/org/globalforestwatch/features/SpatialFeatureDF.scala +++ b/src/main/scala/org/globalforestwatch/features/SpatialFeatureDF.scala @@ -4,8 +4,9 @@ import cats.data.NonEmptyList import geotrellis.vector.Geometry import com.vividsolutions.{jts => vs} import org.apache.spark.sql.{DataFrame, SparkSession} -import org.apache.spark.sql.functions.{col, struct, udf} +import org.apache.spark.sql.functions.{col, isnull, struct, udf} import org.globalforestwatch.util.GeometryFixer +import scala.util.Try object SpatialFeatureDF { def apply(input: NonEmptyList[String], @@ -52,37 +53,60 @@ object SpatialFeatureDF { FeatureDF(input, featureObj, filters, spark, delimiter) val emptyPolygonWKB = "0106000020E610000000000000" - // featureDF.printSchema - // ST_PrecisionReduce may create invalid geometry if it contains a "sliver" that is below the precision threshold // ST_Buffer(0) fixes these invalid geometries - val res = featureDF + featureDF .selectExpr( s"ST_Buffer(ST_PrecisionReduce(ST_GeomFromWKB(${wkbField}), 11), 0) AS polyshape", s"struct(${featureObj.featureIdExpr}) as featureId" ) .where(s"${wkbField} != '${emptyPolygonWKB}'") + } - // val readAndReduceUDF = udf{ s: String => readAndReduce(s) } - // val res = featureDF - // .where(s"${wkbField} != '${emptyPolygonWKB}'") - // .selectExpr( - // s"${wkbField} AS wkb", - // s"struct(${featureObj.featureIdExpr}) as featureId" - // ) - // .select( - // readAndReduceUDF(col("wkb")).as("polyshape"), - // col("featureId") - // ) - - // res.printSchema + /* + * Use GeoSpark to directly generate a DataFrame with a geometry column + * Any geometry that fails to be parsed as WKB will be dropped here + */ + def applyValidated( + input: NonEmptyList[String], + featureObj: Feature, + filters: FeatureFilter, + wkbField: String, + spark: SparkSession, + delimiter: String = "\t" + ): DataFrame = { + import spark.implicits._ - res + val featureDF: DataFrame = FeatureDF(input, featureObj, filters, spark, delimiter) + val emptyPolygonWKB = "0106000020E610000000000000" + val readOptionWkbUDF = udf{ s: String => readOption(s) } + featureDF + .where(s"${wkbField} != '${emptyPolygonWKB}'") + .selectExpr( + s"${wkbField} AS wkb", + s"struct(${featureObj.featureIdExpr}) as featureId" + ) + .select( + readOptionWkbUDF (col("wkb")).as("polyshape"), + col("featureId") + ) + .where(!isnull('polyshape)) } - def readAndReduce(wkbHexString: String): vs.geom.Geometry = { - val geom = geotrellis.vector.io.wkb.WKB.read(javax.xml.bind.DatatypeConverter.parseHexBinary(wkbHexString)) - val fixed = GeometryFixer(geom).fix() - (new vs.io.WKBReader).read((new geotrellis.vector.io.wkb.WKBWriter).write(fixed)) + private val threadLocalWkbReader = new ThreadLocal[vs.io.WKBReader] + + def readOption(wkbHexString: String): Option[vs.geom.Geometry] = { + if (threadLocalWkbReader.get() == null) { + val precisionModel = new vs.geom.PrecisionModel(1e11) + val factory = new vs.geom.GeometryFactory(precisionModel) + val wkbReader = new vs.io.WKBReader(factory) + threadLocalWkbReader.set(wkbReader) + } + val wkbReader = threadLocalWkbReader.get() + + Try{ + val binWkb = javax.xml.bind.DatatypeConverter.parseHexBinary(wkbHexString) + wkbReader.read(binWkb) + }.toOption } } diff --git a/src/main/scala/org/globalforestwatch/features/ValidatedFeatureRDD.scala b/src/main/scala/org/globalforestwatch/features/ValidatedFeatureRDD.scala new file mode 100644 index 00000000..66d853a9 --- /dev/null +++ b/src/main/scala/org/globalforestwatch/features/ValidatedFeatureRDD.scala @@ -0,0 +1,120 @@ +package org.globalforestwatch.features + +import cats.data.{NonEmptyList, Validated} +import com.vividsolutions.jts.geom.{ + Envelope => GeoSparkEnvelope, Geometry => GeoSparkGeometry, Point => GeoSparkPoint, Polygon => GeoSparkPolygon, + MultiPolygon => GeoSparkMultiPolygon, Polygonal => GeoSparkPolygonal, GeometryCollection => GeoSparkGeometryCollection +} +import org.apache.log4j.Logger +import geotrellis.store.index.zcurve.Z2 +import geotrellis.vector +import geotrellis.vector.{Geometry, MultiPolygon} +import org.apache.spark.HashPartitioner +import org.apache.spark.api.java.JavaPairRDD +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.datasyslab.geospark.spatialRDD.SpatialRDD +import org.datasyslab.geosparksql.utils.Adapter +import org.globalforestwatch.summarystats.{GeometryError, ValidatedRow} +import org.globalforestwatch.util.{GridRDD, SpatialJoinRDD} +import org.globalforestwatch.util.IntersectGeometry.{extractPolygons, validatedIntersection} +import org.globalforestwatch.util.Util.getAnyMapValue +import org.globalforestwatch.util.ImplicitGeometryConverter._ + +object ValidatedFeatureRDD { + val logger = Logger.getLogger("FeatureRDD") + + /** + * Reads features from source and optionally splits them by 1x1 degree grid. + * - If the feature WKB is invalid, the feature will be dropped + * - If there is a problem with intersection logic, the erroring feature id will propagate to output + */ + def apply( + input: NonEmptyList[String], + featureType: String, + filters: FeatureFilter, + splitFeatures: Boolean, + spark: SparkSession + ): RDD[(FeatureId, ValidatedRow[geotrellis.vector.Feature[Geometry, FeatureId]])] = { + + if (splitFeatures) { + val featureObj: Feature = Feature(featureType) + val featureDF: DataFrame = SpatialFeatureDF.applyValidated(input, featureObj, filters, "geom", spark) + splitGeometries(featureType, featureDF, spark) + } else { + val featureObj: Feature = Feature(featureType) + val featuresDF: DataFrame = FeatureDF(input, featureObj, filters, spark) + + featuresDF.rdd + .mapPartitions( + { iter: Iterator[Row] => + for { + i <- iter + if featureObj.isNonEmptyGeom(i) + } yield { + val feat = featureObj.get(i) + (feat.data, Validated.Valid(feat)) + } + }, + preservesPartitioning = true + ) + } + } + + private def splitGeometries( + featureType: String, + featureDF: DataFrame, + spark: SparkSession + ): RDD[(FeatureId, ValidatedRow[geotrellis.vector.Feature[Geometry, FeatureId]])] = { + + val spatialFeatureRDD: SpatialRDD[GeoSparkGeometry] = Adapter.toSpatialRdd(featureDF, "polyshape") + spatialFeatureRDD.analyze() + + spatialFeatureRDD.rawSpatialRDD = spatialFeatureRDD.rawSpatialRDD.rdd.map { geom: GeoSparkGeometry => + val featureId = FeatureId.fromUserData(featureType, geom.getUserData.asInstanceOf[String], delimiter = ",") + geom.setUserData(featureId) + geom + } + + val envelope: GeoSparkEnvelope = spatialFeatureRDD.boundaryEnvelope + val spatialGridRDD = GridRDD(envelope, spark, clip = true) + val flatJoin: JavaPairRDD[GeoSparkPolygon, GeoSparkGeometry] = + SpatialJoinRDD.flatSpatialJoin( + spatialFeatureRDD, + spatialGridRDD, + considerBoundaryIntersection = true + ) + + /* + partitions will come back very skewed and we will need to even them out for any downstream analysis + For the summary analysis we will eventually use a range partitioner. + However, the range partitioner uses sampling to come up with the break points for the different partitions. + If the input RDD is already heavily skewed, sampling will be off and the range partitioner won't do a good job. + */ + val hashPartitioner = new HashPartitioner(flatJoin.getNumPartitions) + + flatJoin.rdd + .keyBy({ pair: (GeoSparkPolygon, GeoSparkGeometry) => + Z2( + (pair._1.getCentroid.getX * 100).toInt, + (pair._1.getCentroid.getY * 100).toInt + ).z + }) + .partitionBy(hashPartitioner) + .flatMap { case (_, (gridCell, geom)) => + val (fid, geometries) = validatedIntersection(geom, gridCell) + geometries.traverse { geoms => geoms }.map { vg => (fid.asInstanceOf[FeatureId], vg) } + } + .flatMap { + case (_, Validated.Valid(mp)) if mp.isEmpty => + // This is Valid result of intersection, but with no area == not an intersection + None // flatMap will drop the None, + case (fid, validated) => + val validatedFeature = validated.map { intersection => + val geotrellisGeom: MultiPolygon = intersection + geotrellis.vector.Feature(geotrellisGeom, fid) + } + Some(fid -> validatedFeature) + } + } +} diff --git a/src/main/scala/org/globalforestwatch/summarystats/ErrorSummaryRDD.scala b/src/main/scala/org/globalforestwatch/summarystats/ErrorSummaryRDD.scala index e0929f17..c04dd383 100644 --- a/src/main/scala/org/globalforestwatch/summarystats/ErrorSummaryRDD.scala +++ b/src/main/scala/org/globalforestwatch/summarystats/ErrorSummaryRDD.scala @@ -14,7 +14,7 @@ import org.globalforestwatch.features.FeatureId import org.globalforestwatch.grids.GridSources import scala.reflect.ClassTag import cats.kernel.Semigroup -import cats.data.Validated.{Valid, Invalid} +import cats.data.Validated.{Valid, Invalid, valid, invalid} import org.globalforestwatch.summarystats.forest_change_diagnostic.ForestChangeDiagnosticSummary @@ -35,7 +35,7 @@ trait ErrorSummaryRDD extends LazyLogging with java.io.Serializable { windowLayout: LayoutDefinition, kwargs: Map[String, Any], partition: Boolean = true - )(implicit kt: ClassTag[SUMMARY], vt: ClassTag[FEATUREID], ord: Ordering[SUMMARY] = null): RDD[(FEATUREID, ValidatedRow[SUMMARY])] = { + )(implicit kt: ClassTag[SUMMARY], vt: ClassTag[FEATUREID]): RDD[(FEATUREID, ValidatedRow[SUMMARY])] = { /* Intersect features with each tile from windowLayout grid and generate a record for each intersection. * Each features will intersect one or more windows, possibly creating a duplicate record. @@ -180,4 +180,4 @@ trait ErrorSummaryRDD extends LazyLogging with java.io.Serializable { object ErrorSummaryRDD { val rasterizeOptions: Rasterizer.Options = Rasterizer.Options(includePartial = false, sampleType = PixelIsPoint) -} \ No newline at end of file +} diff --git a/src/main/scala/org/globalforestwatch/summarystats/forest_change_diagnostic/ForestChangeDiagnosticAnalysis.scala b/src/main/scala/org/globalforestwatch/summarystats/forest_change_diagnostic/ForestChangeDiagnosticAnalysis.scala index a31a76ee..58897236 100644 --- a/src/main/scala/org/globalforestwatch/summarystats/forest_change_diagnostic/ForestChangeDiagnosticAnalysis.scala +++ b/src/main/scala/org/globalforestwatch/summarystats/forest_change_diagnostic/ForestChangeDiagnosticAnalysis.scala @@ -1,7 +1,7 @@ package org.globalforestwatch.summarystats.forest_change_diagnostic import cats.data.NonEmptyList - import cats.data.Validated.{Invalid, Valid} + import cats.data.Validated.{Invalid, Valid, invalid, valid} import cats.syntax._ import java.util @@ -25,7 +25,7 @@ val name = "forest_change_diagnostic" def apply( - mainRDD: RDD[Feature[Geometry, FeatureId]], + validatedRDD: RDD[(FeatureId, ValidatedRow[Feature[Geometry, FeatureId]])], featureType: String, intermediateListSource: Option[NonEmptyList[String]], fireAlertRDD: SpatialRDD[GeoSparkGeometry], @@ -34,24 +34,27 @@ val runOutputUrl: String = getOutputUrl(kwargs) - mainRDD.persist(StorageLevel.MEMORY_AND_DISK_2) + // These locations can't be processed because of an error in handling their geometry + val erroredLocationsRDD = validatedRDD.flatMap{ case (fid, feat) => feat.toEither.left.toOption.map { err => (fid, err) } } + val mainRDD = validatedRDD.flatMap{ case (_, feat) => feat.toEither.right.toOption } + mainRDD.persist(StorageLevel.MEMORY_AND_DISK_2) // For standard GFW Pro Feature IDs we create a Grid Filter // This will allow us to only process those parts of the dissolved list geometry which were updated // When using other Feature IDs such as WDPA or GADM, // there will be no intermediate results and this part will be ignored val gridFilter: List[String] = - mainRDD - .filter { feature: Feature[Geometry, FeatureId] => - feature.data match { - case gfwproId: GfwProFeatureId => gfwproId.locationId == -2 - case _ => false + mainRDD + .filter { feature: Feature[Geometry, FeatureId] => + feature.data match { + case gfwproId: GfwProFeatureId => gfwproId.locationId == -2 + case _ => false + } } - } - .map(f => pointGridId(f.geom.getCentroid, 1)) - .collect - .toList + .map(f => pointGridId(f.geom.getCentroid, 1)) + .collect + .toList val featureRDD: RDD[Feature[Geometry, FeatureId]] = toFeatureRdd(mainRDD, gridFilter, intermediateListSource.isDefined) @@ -95,7 +98,10 @@ kwargs) else dataRDD - val summaryDF = ForestChangeDiagnosticDF.getFeatureDataFrame(finalRDD, spark) + val rejoinedRDD = finalRDD + .union(erroredLocationsRDD.map{ case (fid, err) => (fid, invalid[JobError, ForestChangeDiagnosticData](err)) }) + + val summaryDF = ForestChangeDiagnosticDF.getFeatureDataFrame(rejoinedRDD, spark) ForestChangeDiagnosticExport.export( featureType, @@ -299,4 +305,4 @@ ) ) } - } \ No newline at end of file + } diff --git a/src/main/scala/org/globalforestwatch/summarystats/forest_change_diagnostic/ForestChangeDiagnosticCommand.scala b/src/main/scala/org/globalforestwatch/summarystats/forest_change_diagnostic/ForestChangeDiagnosticCommand.scala index 7a76784f..2b025eae 100644 --- a/src/main/scala/org/globalforestwatch/summarystats/forest_change_diagnostic/ForestChangeDiagnosticCommand.scala +++ b/src/main/scala/org/globalforestwatch/summarystats/forest_change_diagnostic/ForestChangeDiagnosticCommand.scala @@ -40,7 +40,7 @@ object ForestChangeDiagnosticCommand extends SummaryCommand with LazyLogging { val featureFilter = FeatureFilter.fromOptions(default.featureType, filterOptions) runAnalysis { implicit spark => - val featureRDD = FeatureRDD(default.featureUris, default.featureType, featureFilter, splitFeatures = true, spark) + val featureRDD = ValidatedFeatureRDD(default.featureUris, default.featureType, featureFilter, splitFeatures = true, spark) val fireAlertRDD = FireAlertRDD(spark, fireAlert.alertType, fireAlert.alertSource, FeatureFilter.empty) ForestChangeDiagnosticAnalysis(featureRDD, default.featureType, intermediateListSource, fireAlertRDD, kwargs) } diff --git a/src/main/scala/org/globalforestwatch/util/GeotrellisGeometryValidator.scala b/src/main/scala/org/globalforestwatch/util/GeotrellisGeometryValidator.scala index 72c82618..19e87840 100644 --- a/src/main/scala/org/globalforestwatch/util/GeotrellisGeometryValidator.scala +++ b/src/main/scala/org/globalforestwatch/util/GeotrellisGeometryValidator.scala @@ -2,6 +2,7 @@ package org.globalforestwatch.util import org.apache.log4j.Logger import geotrellis.vector.{ Geometry, + GeomFactory, LineString, MultiPoint, Point, @@ -11,6 +12,7 @@ import geotrellis.vector.{ } import geotrellis.vector.io.wkb.WKB import org.globalforestwatch.util.GeotrellisGeometryReducer.{gpr, reduce} +import scala.util.Try object GeotrellisGeometryValidator extends java.io.Serializable { val logger = Logger.getLogger("Geotrellis Geometry Validator") @@ -59,8 +61,8 @@ object GeotrellisGeometryValidator extends java.io.Serializable { } def makeValidGeom(wkb: String): Geometry = { - val geom: Geometry = WKB.read(wkb) - makeValidGeom(geom) + val geom: Option[Geometry] = Try(WKB.read(wkb)).toOption + geom.map(makeValidGeom(_)).getOrElse(GeomFactory.factory.createPoint()) } def makeMultiGeom(geom: Geometry): Geometry = { diff --git a/src/main/scala/org/globalforestwatch/util/IntersectGeometry.scala b/src/main/scala/org/globalforestwatch/util/IntersectGeometry.scala index 431c7313..8f7742b7 100644 --- a/src/main/scala/org/globalforestwatch/util/IntersectGeometry.scala +++ b/src/main/scala/org/globalforestwatch/util/IntersectGeometry.scala @@ -1,5 +1,7 @@ package org.globalforestwatch.util +import cats.data.Validated +import cats.data.Validated.{Valid, Invalid} import com.vividsolutions.jts.geom.{ Geometry, GeometryCollection, @@ -7,8 +9,10 @@ import com.vividsolutions.jts.geom.{ Polygon, TopologyException, } +import scala.util.{Try, Success, Failure} import org.globalforestwatch.util.GeoSparkGeometryConstructor.createMultiPolygon +import org.globalforestwatch.summarystats.{GeometryError, ValidatedRow} object IntersectGeometry { @@ -21,7 +25,39 @@ object IntersectGeometry { * */ val userData = thisGeom.getUserData - try { + val intersection: Geometry = thisGeom intersection thatGeom + intersection match { + case poly: Polygon => + val multi = createMultiPolygon(Array(poly)) + multi.setUserData(userData) + List(multi) + case multi: MultiPolygon => + multi.setUserData(userData) + List(multi) + case collection: GeometryCollection => + val maybe_multi = extractPolygons(collection) + maybe_multi match { + case Some(multi) => + multi.setUserData(userData) + List(multi) + case _ => List() + } + case _ => List() + } + } + + def validatedIntersection( + thisGeom: Geometry, + thatGeom: Geometry + ): (Object, ValidatedRow[List[MultiPolygon]]) = { + + /** + * Intersection can return GeometryCollections + * Here we filter resulting geometries and only return those of the same type as thisGeom + * */ + val userData = thisGeom.getUserData + + val attempt = Try { val intersection: Geometry = thisGeom intersection thatGeom intersection match { case poly: Polygon => @@ -41,14 +77,13 @@ object IntersectGeometry { } case _ => List() } - } catch { - case e: TopologyException => - val wkt = new com.vividsolutions.jts.io.WKTWriter - new java.io.PrintWriter("/tmp/thisGeom.wkt"){ write(wkt.write(thisGeom)); close } - new java.io.PrintWriter("/tmp/thatGeom.wkt"){ write(wkt.write(thatGeom)); close } - throw e } + (userData, attempt match { + case Success(v) => Valid(v) + case Failure(e) => Invalid(GeometryError(s"Failed intersection with ${thatGeom}")) + }) + } def extractPolygons(multiGeometry: Geometry): Option[MultiPolygon] = { From 942445781d0717ef098b62db6593b2958eb074bc Mon Sep 17 00:00:00 2001 From: Eugene Cheipesh Date: Thu, 28 Oct 2021 13:59:25 -0400 Subject: [PATCH 4/5] 1.7.9 --- version.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.sbt b/version.sbt index c26185d1..65907bd7 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "1.7.8" +version in ThisBuild := "1.7.9" From a525240903ff9ed419c7b94452a7a3eeb23b37b0 Mon Sep 17 00:00:00 2001 From: Eugene Cheipesh Date: Thu, 28 Oct 2021 16:38:37 -0400 Subject: [PATCH 5/5] cache the validated RDD so it can be joined --- .../ForestChangeDiagnosticAnalysis.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/org/globalforestwatch/summarystats/forest_change_diagnostic/ForestChangeDiagnosticAnalysis.scala b/src/main/scala/org/globalforestwatch/summarystats/forest_change_diagnostic/ForestChangeDiagnosticAnalysis.scala index 58897236..f9f5bfc3 100644 --- a/src/main/scala/org/globalforestwatch/summarystats/forest_change_diagnostic/ForestChangeDiagnosticAnalysis.scala +++ b/src/main/scala/org/globalforestwatch/summarystats/forest_change_diagnostic/ForestChangeDiagnosticAnalysis.scala @@ -38,7 +38,7 @@ val erroredLocationsRDD = validatedRDD.flatMap{ case (fid, feat) => feat.toEither.left.toOption.map { err => (fid, err) } } val mainRDD = validatedRDD.flatMap{ case (_, feat) => feat.toEither.right.toOption } - mainRDD.persist(StorageLevel.MEMORY_AND_DISK_2) + validatedRDD.persist(StorageLevel.MEMORY_AND_DISK_2) // For standard GFW Pro Feature IDs we create a Grid Filter // This will allow us to only process those parts of the dissolved list geometry which were updated