Skip to content

Commit

Permalink
GTC-2889 Remove code in FCD related to geometry diffs and intermediat…
Browse files Browse the repository at this point in the history
…e results

We've been ignoring geometry diffs (location id == -2) for a long time,
and we never pass in any intermediate results.

By removing this code, we avoid computing the centroid of each location
geometry, inserting it into the feature id, then removing it from the
feature id without having used it at all.

Move the only remaining code from combineGridResults() into the main
function. Also, we had a duplicate call to
data.withUpdatedCommodityRisk() in combineGridResults(), so got rid of
that (already called data.withUpdatedCommodityRisk() in main analysis
code above).

The test change was just a change in order of categories within a single
result, which is no actual change of the results.
  • Loading branch information
danscales committed Jul 11, 2024
1 parent afde12e commit f9c27da
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ import org.locationtech.jts.geom.Geometry
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.sedona.core.spatialRDD.SpatialRDD
import org.globalforestwatch.features.{CombinedFeatureId, FeatureId, GfwProFeatureId, GridId}
import org.globalforestwatch.grids.GridId.pointGridId
import org.globalforestwatch.features.FeatureId
import org.globalforestwatch.summarystats.{Location, NoIntersectionError, SummaryAnalysis, ValidatedLocation}
import org.globalforestwatch.util.SpatialJoinRDD
import org.apache.spark.storage.StorageLevel
Expand All @@ -33,40 +32,20 @@ object ForestChangeDiagnosticAnalysis extends SummaryAnalysis {
*/
def apply(
features: RDD[ValidatedLocation[Geometry]],
intermediateResultsRDD: Option[RDD[ValidatedLocation[ForestChangeDiagnosticData]]],
fireAlerts: SpatialRDD[Geometry],
saveIntermediateResults: RDD[ValidatedLocation[ForestChangeDiagnosticData]] => Unit,
kwargs: Map[String, Any]
)(implicit spark: SparkSession): RDD[ValidatedLocation[ForestChangeDiagnosticData]] = {
features.persist(StorageLevel.MEMORY_AND_DISK)

try {
val diffGridIds: List[GridId] =
if (intermediateResultsRDD.nonEmpty) collectDiffGridIds(features)
else List.empty

// These records are not covered by diff geometry, they're still valid and can be re-used
val cachedIntermediateResultsRDD = intermediateResultsRDD.map { rdd =>
rdd.filter {
case Valid(Location(CombinedFeatureId(fid1, fid2), _)) =>
!diffGridIds.contains(fid2)
case Invalid(Location(CombinedFeatureId(fid1, fid2), _)) =>
!diffGridIds.contains(fid2)
case _ =>
false
}
}

val partialResult: RDD[ValidatedLocation[ForestChangeDiagnosticData]] = {
ValidatedWorkflow(features)
.flatMap { locationGeometries =>
val diffLocations = filterDiffGridCells(locationGeometries, diffGridIds)

val fireCount: RDD[Location[ForestChangeDiagnosticDataLossYearly]] =
fireStats(diffLocations, fireAlerts, spark)
fireStats(locationGeometries, fireAlerts, spark)

val locationSummaries: RDD[ValidatedLocation[ForestChangeDiagnosticSummary]] = {
val tmp = diffLocations.map { case Location(id, geom) => Feature(geom, id) }
val tmp = locationGeometries.map { case Location(id, geom) => Feature(geom, id) }

// This is where the main analysis happens, in ErrorSummaryRDD.apply(),
// which eventually calls into ForestChangeDiagnosticSummary via
Expand Down Expand Up @@ -111,13 +90,10 @@ object ForestChangeDiagnosticAnalysis extends SummaryAnalysis {
.persist(StorageLevel.MEMORY_AND_DISK)
}

cachedIntermediateResultsRDD match {
case Some(cachedResults) =>
val mergedResults = partialResult.union(cachedResults)
saveIntermediateResults(mergedResults)
combineGridResults(mergedResults)
case None =>
combineGridResults(partialResult)
partialResult.map {
case Valid(Location(fid, data)) if data.equals(ForestChangeDiagnosticData.empty) =>
Invalid(Location(fid, NoIntersectionError))
case data => data
}
} catch {
case e: StackOverflowError =>
Expand All @@ -126,65 +102,6 @@ object ForestChangeDiagnosticAnalysis extends SummaryAnalysis {
}
}

/** Filter only to those rows covered by gridFilter, these are areas where location
* geometries have changed. If gridFilter is empty, all locations will be preserved.
*/
def filterDiffGridCells(
rdd: RDD[Location[Geometry]],
gridFilter: List[GridId]
): RDD[Location[Geometry]] = {
def keepLocationCell(locationId: Int, geom: Geometry): Boolean =
(locationId >= -1) && (gridFilter.isEmpty || gridFilter.contains(GridId(pointGridId(geom.getCentroid, 1))))

rdd.collect {
case Location(gfwFid @ GfwProFeatureId(_, lid), geom) if keepLocationCell(lid, geom) =>
val grid = pointGridId(geom.getCentroid, 1)
val fid = CombinedFeatureId(gfwFid, GridId(grid))
Location(fid, geom)
}
}

/** Collect lists of GridIds for which diff geometry is present (id=-2) */
def collectDiffGridIds(rdd: RDD[ValidatedLocation[Geometry]]): List[GridId] = {
// new logic: get ID with new old geom, get grid IDs, join on same grid ID, collect
// IDs where grid geometry is not the same
rdd
.collect {
case Valid(Location(GfwProFeatureId(_, locationId), geom)) if locationId == -1 =>
GridId(pointGridId(geom.getCentroid, 1))
}
.collect
.toList
}

/** Combine per grid results named by CombinedFeatureId to per location results named by FeatureId Some of the per-grid results fo may
* be Invalid errors. Combining per-grid results will aggregate errors up to Location level.
*/
def combineGridResults(
rdd: RDD[ValidatedLocation[ForestChangeDiagnosticData]]
)(implicit spark: SparkSession): RDD[ValidatedLocation[ForestChangeDiagnosticData]] = {
rdd
.map {
case Valid(Location(CombinedFeatureId(fid, _), data)) =>
(fid, Valid(data))
case Invalid(Location(CombinedFeatureId(fid, _), err)) =>
(fid, Invalid(err))
case Valid(Location(fid, data)) =>
(fid, Valid(data))
case Invalid(Location(fid, err)) =>
(fid, Invalid(err))
}
.reduceByKey(_ combine _)
.map {
case (fid, Valid(data)) if data.equals(ForestChangeDiagnosticData.empty) =>
Invalid(Location(fid, NoIntersectionError))
case (fid, Valid(data)) =>
Valid(Location(fid, data.withUpdatedCommodityRisk()))
case (fid, Invalid(err)) =>
Invalid(Location(fid, err))
}
}

def fireStats(
featureRDD: RDD[Location[Geometry]],
fireAlertRDD: SpatialRDD[Geometry],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import cats.implicits._
import com.monovore.decline.Opts
import org.globalforestwatch.features._
import com.typesafe.scalalogging.LazyLogging
import org.globalforestwatch.summarystats.ValidatedLocation
import org.apache.spark.rdd.RDD
import org.globalforestwatch.config.GfwConfig
import org.globalforestwatch.util.Config

Expand All @@ -17,23 +15,15 @@ object ForestChangeDiagnosticCommand extends SummaryCommand with LazyLogging {
val TreeCoverLossYearStart: Int = 2001
val TreeCoverLossYearEnd: Int = 2023

val intermediateListSourceOpt: Opts[Option[NonEmptyList[String]]] = Opts
.options[String](
"intermediate_list_source",
help = "URI of intermediate list results in TSV format"
)
.orNone

val forestChangeDiagnosticCommand: Opts[Unit] = Opts.subcommand(
name = ForestChangeDiagnosticAnalysis.name,
help = "Compute summary statistics for GFW Pro Forest Change Diagnostic."
) {
(
defaultOptions,
intermediateListSourceOpt,
requiredFireAlertOptions,
featureFilterOptions
).mapN { (default, intermediateListSource, fireAlert, filterOptions) =>
).mapN { (default, fireAlert, filterOptions) =>
val kwargs = Map(
"outputUrl" -> default.outputUrl,
"noOutputPathSuffix" -> default.noOutputPathSuffix,
Expand All @@ -49,19 +39,9 @@ object ForestChangeDiagnosticCommand extends SummaryCommand with LazyLogging {
val featureRDD = ValidatedFeatureRDD(default.featureUris, default.featureType, featureFilter, splitFeatures = true)
val fireAlertRDD = FireAlertRDD(spark, fireAlert.alertType, fireAlert.alertSource, FeatureFilter.empty)

val intermediateResultsRDD = intermediateListSource.map { sources =>
ForestChangeDiagnosticDF.readIntermidateRDD(sources, spark)
}
val saveIntermidateResults: RDD[ValidatedLocation[ForestChangeDiagnosticData]] => Unit = { rdd =>
val df = ForestChangeDiagnosticDF.getGridFeatureDataFrame(rdd, spark)
ForestChangeDiagnosticExport.export("intermediate", df, default.outputUrl, kwargs)
}

val fcdRDD = ForestChangeDiagnosticAnalysis(
featureRDD,
intermediateResultsRDD,
fireAlertRDD,
saveIntermidateResults,
kwargs
)

Expand Down
Loading

0 comments on commit f9c27da

Please sign in to comment.