Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log intersection errors #122

Merged
merged 5 commits into from
Nov 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package org.globalforestwatch.features

import cats.data.NonEmptyList
import geotrellis.vector.Geometry
import com.vividsolutions.{jts => vs}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.{col, isnull, struct, udf}
import org.globalforestwatch.util.GeometryFixer
import scala.util.Try

object SpatialFeatureDF {
def apply(input: NonEmptyList[String],
Expand Down Expand Up @@ -51,10 +56,57 @@ object SpatialFeatureDF {
// ST_PrecisionReduce may create invalid geometry if it contains a "sliver" that is below the precision threshold
// ST_Buffer(0) fixes these invalid geometries
featureDF
.selectExpr(
s"ST_Buffer(ST_PrecisionReduce(ST_GeomFromWKB(${wkbField}), 11), 0) AS polyshape",
s"struct(${featureObj.featureIdExpr}) as featureId"
)
.where(s"${wkbField} != '${emptyPolygonWKB}'")
}

/*
* Use GeoSpark to directly generate a DataFrame with a geometry column
* Any geometry that fails to be parsed as WKB will be dropped here
*/
def applyValidated(
input: NonEmptyList[String],
featureObj: Feature,
filters: FeatureFilter,
wkbField: String,
spark: SparkSession,
delimiter: String = "\t"
): DataFrame = {
import spark.implicits._

val featureDF: DataFrame = FeatureDF(input, featureObj, filters, spark, delimiter)
val emptyPolygonWKB = "0106000020E610000000000000"
val readOptionWkbUDF = udf{ s: String => readOption(s) }
featureDF
.where(s"${wkbField} != '${emptyPolygonWKB}'")
.selectExpr(
s"ST_Buffer(ST_PrecisionReduce(ST_GeomFromWKB(${wkbField}), 11), 0) AS polyshape",
s"${wkbField} AS wkb",
s"struct(${featureObj.featureIdExpr}) as featureId"
)
.where(s"${wkbField} != '${emptyPolygonWKB}'")
.select(
readOptionWkbUDF (col("wkb")).as("polyshape"),
col("featureId")
)
.where(!isnull('polyshape))
}

private val threadLocalWkbReader = new ThreadLocal[vs.io.WKBReader]

def readOption(wkbHexString: String): Option[vs.geom.Geometry] = {
if (threadLocalWkbReader.get() == null) {
val precisionModel = new vs.geom.PrecisionModel(1e11)
val factory = new vs.geom.GeometryFactory(precisionModel)
val wkbReader = new vs.io.WKBReader(factory)
threadLocalWkbReader.set(wkbReader)
}
val wkbReader = threadLocalWkbReader.get()

Try{
val binWkb = javax.xml.bind.DatatypeConverter.parseHexBinary(wkbHexString)
wkbReader.read(binWkb)
}.toOption
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package org.globalforestwatch.features

import cats.data.{NonEmptyList, Validated}
import com.vividsolutions.jts.geom.{
Envelope => GeoSparkEnvelope, Geometry => GeoSparkGeometry, Point => GeoSparkPoint, Polygon => GeoSparkPolygon,
MultiPolygon => GeoSparkMultiPolygon, Polygonal => GeoSparkPolygonal, GeometryCollection => GeoSparkGeometryCollection
}
import org.apache.log4j.Logger
import geotrellis.store.index.zcurve.Z2
import geotrellis.vector
import geotrellis.vector.{Geometry, MultiPolygon}
import org.apache.spark.HashPartitioner
import org.apache.spark.api.java.JavaPairRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.datasyslab.geospark.spatialRDD.SpatialRDD
import org.datasyslab.geosparksql.utils.Adapter
import org.globalforestwatch.summarystats.{GeometryError, ValidatedRow}
import org.globalforestwatch.util.{GridRDD, SpatialJoinRDD}
import org.globalforestwatch.util.IntersectGeometry.{extractPolygons, validatedIntersection}
import org.globalforestwatch.util.Util.getAnyMapValue
import org.globalforestwatch.util.ImplicitGeometryConverter._

object ValidatedFeatureRDD {
val logger = Logger.getLogger("FeatureRDD")

/**
* Reads features from source and optionally splits them by 1x1 degree grid.
* - If the feature WKB is invalid, the feature will be dropped
* - If there is a problem with intersection logic, the erroring feature id will propagate to output
*/
def apply(
input: NonEmptyList[String],
featureType: String,
filters: FeatureFilter,
splitFeatures: Boolean,
spark: SparkSession
): RDD[(FeatureId, ValidatedRow[geotrellis.vector.Feature[Geometry, FeatureId]])] = {

if (splitFeatures) {
val featureObj: Feature = Feature(featureType)
val featureDF: DataFrame = SpatialFeatureDF.applyValidated(input, featureObj, filters, "geom", spark)
splitGeometries(featureType, featureDF, spark)
} else {
val featureObj: Feature = Feature(featureType)
val featuresDF: DataFrame = FeatureDF(input, featureObj, filters, spark)

featuresDF.rdd
.mapPartitions(
{ iter: Iterator[Row] =>
for {
i <- iter
if featureObj.isNonEmptyGeom(i)
} yield {
val feat = featureObj.get(i)
(feat.data, Validated.Valid(feat))
}
},
preservesPartitioning = true
)
}
}

private def splitGeometries(
featureType: String,
featureDF: DataFrame,
spark: SparkSession
): RDD[(FeatureId, ValidatedRow[geotrellis.vector.Feature[Geometry, FeatureId]])] = {

val spatialFeatureRDD: SpatialRDD[GeoSparkGeometry] = Adapter.toSpatialRdd(featureDF, "polyshape")
spatialFeatureRDD.analyze()

spatialFeatureRDD.rawSpatialRDD = spatialFeatureRDD.rawSpatialRDD.rdd.map { geom: GeoSparkGeometry =>
val featureId = FeatureId.fromUserData(featureType, geom.getUserData.asInstanceOf[String], delimiter = ",")
geom.setUserData(featureId)
geom
}

val envelope: GeoSparkEnvelope = spatialFeatureRDD.boundaryEnvelope
val spatialGridRDD = GridRDD(envelope, spark, clip = true)
val flatJoin: JavaPairRDD[GeoSparkPolygon, GeoSparkGeometry] =
SpatialJoinRDD.flatSpatialJoin(
spatialFeatureRDD,
spatialGridRDD,
considerBoundaryIntersection = true
)

/*
partitions will come back very skewed and we will need to even them out for any downstream analysis
For the summary analysis we will eventually use a range partitioner.
However, the range partitioner uses sampling to come up with the break points for the different partitions.
If the input RDD is already heavily skewed, sampling will be off and the range partitioner won't do a good job.
*/
val hashPartitioner = new HashPartitioner(flatJoin.getNumPartitions)

flatJoin.rdd
.keyBy({ pair: (GeoSparkPolygon, GeoSparkGeometry) =>
Z2(
(pair._1.getCentroid.getX * 100).toInt,
(pair._1.getCentroid.getY * 100).toInt
).z
})
.partitionBy(hashPartitioner)
.flatMap { case (_, (gridCell, geom)) =>
val (fid, geometries) = validatedIntersection(geom, gridCell)
geometries.traverse { geoms => geoms }.map { vg => (fid.asInstanceOf[FeatureId], vg) }
}
.flatMap {
case (_, Validated.Valid(mp)) if mp.isEmpty =>
// This is Valid result of intersection, but with no area == not an intersection
None // flatMap will drop the None,
case (fid, validated) =>
val validatedFeature = validated.map { intersection =>
val geotrellisGeom: MultiPolygon = intersection
geotrellis.vector.Feature(geotrellisGeom, fid)
}
Some(fid -> validatedFeature)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import org.globalforestwatch.features.FeatureId
import org.globalforestwatch.grids.GridSources
import scala.reflect.ClassTag
import cats.kernel.Semigroup
import cats.data.Validated.{Valid, Invalid}
import cats.data.Validated.{Valid, Invalid, valid, invalid}
import org.globalforestwatch.summarystats.forest_change_diagnostic.ForestChangeDiagnosticSummary


Expand All @@ -35,7 +35,7 @@ trait ErrorSummaryRDD extends LazyLogging with java.io.Serializable {
windowLayout: LayoutDefinition,
kwargs: Map[String, Any],
partition: Boolean = true
)(implicit kt: ClassTag[SUMMARY], vt: ClassTag[FEATUREID], ord: Ordering[SUMMARY] = null): RDD[(FEATUREID, ValidatedRow[SUMMARY])] = {
)(implicit kt: ClassTag[SUMMARY], vt: ClassTag[FEATUREID]): RDD[(FEATUREID, ValidatedRow[SUMMARY])] = {

/* Intersect features with each tile from windowLayout grid and generate a record for each intersection.
* Each features will intersect one or more windows, possibly creating a duplicate record.
Expand Down Expand Up @@ -180,4 +180,4 @@ trait ErrorSummaryRDD extends LazyLogging with java.io.Serializable {
object ErrorSummaryRDD {
val rasterizeOptions: Rasterizer.Options =
Rasterizer.Options(includePartial = false, sampleType = PixelIsPoint)
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.globalforestwatch.summarystats.forest_change_diagnostic

import cats.data.NonEmptyList
import cats.data.Validated.{Invalid, Valid}
import cats.data.Validated.{Invalid, Valid, invalid, valid}
import cats.syntax._

import java.util
Expand All @@ -25,7 +25,7 @@
val name = "forest_change_diagnostic"

def apply(
mainRDD: RDD[Feature[Geometry, FeatureId]],
validatedRDD: RDD[(FeatureId, ValidatedRow[Feature[Geometry, FeatureId]])],
featureType: String,
intermediateListSource: Option[NonEmptyList[String]],
fireAlertRDD: SpatialRDD[GeoSparkGeometry],
Expand All @@ -34,24 +34,27 @@

val runOutputUrl: String = getOutputUrl(kwargs)

mainRDD.persist(StorageLevel.MEMORY_AND_DISK_2)
// These locations can't be processed because of an error in handling their geometry
val erroredLocationsRDD = validatedRDD.flatMap{ case (fid, feat) => feat.toEither.left.toOption.map { err => (fid, err) } }
val mainRDD = validatedRDD.flatMap{ case (_, feat) => feat.toEither.right.toOption }

validatedRDD.persist(StorageLevel.MEMORY_AND_DISK_2)

// For standard GFW Pro Feature IDs we create a Grid Filter
// This will allow us to only process those parts of the dissolved list geometry which were updated
// When using other Feature IDs such as WDPA or GADM,
// there will be no intermediate results and this part will be ignored
val gridFilter: List[String] =
mainRDD
.filter { feature: Feature[Geometry, FeatureId] =>
feature.data match {
case gfwproId: GfwProFeatureId => gfwproId.locationId == -2
case _ => false
mainRDD
.filter { feature: Feature[Geometry, FeatureId] =>
feature.data match {
case gfwproId: GfwProFeatureId => gfwproId.locationId == -2
case _ => false
}
}
}
.map(f => pointGridId(f.geom.getCentroid, 1))
.collect
.toList
.map(f => pointGridId(f.geom.getCentroid, 1))
.collect
.toList

val featureRDD: RDD[Feature[Geometry, FeatureId]] =
toFeatureRdd(mainRDD, gridFilter, intermediateListSource.isDefined)
Expand Down Expand Up @@ -95,7 +98,10 @@
kwargs)
else dataRDD

val summaryDF = ForestChangeDiagnosticDF.getFeatureDataFrame(finalRDD, spark)
val rejoinedRDD = finalRDD
.union(erroredLocationsRDD.map{ case (fid, err) => (fid, invalid[JobError, ForestChangeDiagnosticData](err)) })

val summaryDF = ForestChangeDiagnosticDF.getFeatureDataFrame(rejoinedRDD, spark)

ForestChangeDiagnosticExport.export(
featureType,
Expand Down Expand Up @@ -299,4 +305,4 @@
)
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ object ForestChangeDiagnosticCommand extends SummaryCommand with LazyLogging {
val featureFilter = FeatureFilter.fromOptions(default.featureType, filterOptions)

runAnalysis { implicit spark =>
val featureRDD = FeatureRDD(default.featureUris, default.featureType, featureFilter, splitFeatures = true, spark)
val featureRDD = ValidatedFeatureRDD(default.featureUris, default.featureType, featureFilter, splitFeatures = true, spark)
val fireAlertRDD = FireAlertRDD(spark, fireAlert.alertType, fireAlert.alertSource, FeatureFilter.empty)
ForestChangeDiagnosticAnalysis(featureRDD, default.featureType, intermediateListSource, fireAlertRDD, kwargs)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package org.globalforestwatch.util
import org.apache.log4j.Logger
import geotrellis.vector.{
Geometry,
GeomFactory,
LineString,
MultiPoint,
Point,
Expand All @@ -11,6 +12,7 @@ import geotrellis.vector.{
}
import geotrellis.vector.io.wkb.WKB
import org.globalforestwatch.util.GeotrellisGeometryReducer.{gpr, reduce}
import scala.util.Try

object GeotrellisGeometryValidator extends java.io.Serializable {
val logger = Logger.getLogger("Geotrellis Geometry Validator")
Expand Down Expand Up @@ -59,8 +61,8 @@ object GeotrellisGeometryValidator extends java.io.Serializable {
}

def makeValidGeom(wkb: String): Geometry = {
val geom: Geometry = WKB.read(wkb)
makeValidGeom(geom)
val geom: Option[Geometry] = Try(WKB.read(wkb)).toOption
geom.map(makeValidGeom(_)).getOrElse(GeomFactory.factory.createPoint())
}

def makeMultiGeom(geom: Geometry): Geometry = {
Expand Down
44 changes: 44 additions & 0 deletions src/main/scala/org/globalforestwatch/util/IntersectGeometry.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package org.globalforestwatch.util

import cats.data.Validated
import cats.data.Validated.{Valid, Invalid}
import com.vividsolutions.jts.geom.{
Geometry,
GeometryCollection,
MultiPolygon,
Polygon,
TopologyException,
}
import scala.util.{Try, Success, Failure}

import org.globalforestwatch.util.GeoSparkGeometryConstructor.createMultiPolygon
import org.globalforestwatch.summarystats.{GeometryError, ValidatedRow}

object IntersectGeometry {

Expand Down Expand Up @@ -39,6 +44,45 @@ object IntersectGeometry {
}
case _ => List()
}
}

def validatedIntersection(
thisGeom: Geometry,
thatGeom: Geometry
): (Object, ValidatedRow[List[MultiPolygon]]) = {

/**
* Intersection can return GeometryCollections
* Here we filter resulting geometries and only return those of the same type as thisGeom
* */
val userData = thisGeom.getUserData

val attempt = Try {
val intersection: Geometry = thisGeom intersection thatGeom
intersection match {
case poly: Polygon =>
val multi = createMultiPolygon(Array(poly))
multi.setUserData(userData)
List(multi)
case multi: MultiPolygon =>
multi.setUserData(userData)
List(multi)
case collection: GeometryCollection =>
val maybe_multi = extractPolygons(collection)
maybe_multi match {
case Some(multi) =>
multi.setUserData(userData)
List(multi)
case _ => List()
}
case _ => List()
}
}

(userData, attempt match {
case Success(v) => Valid(v)
case Failure(e) => Invalid(GeometryError(s"Failed intersection with ${thatGeom}"))
})

}

Expand Down
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "1.7.8"
version in ThisBuild := "1.7.9"