From cbcc05e8ac78870ec2921ecbaf11508775f97eb8 Mon Sep 17 00:00:00 2001 From: Dan Scales Date: Mon, 16 Dec 2024 13:54:56 -0800 Subject: [PATCH] Adding in backup yield dataset as simple array of rows. We don't want to call data API from tasks and the backup yield dataset is only 25 megabytes, so we just broadcast it to all processors, and then look up in it when needed (infrequently) as an array (or maybe hashtable) of rows. --- .../summarystats/ghg/GHGCommand.scala | 24 ++++++++++++++++--- .../summarystats/ghg/GHGSummary.scala | 11 +++++++++ 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGCommand.scala b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGCommand.scala index 209f618a..3eedfa34 100644 --- a/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGCommand.scala +++ b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGCommand.scala @@ -15,14 +15,21 @@ object GHGCommand extends SummaryCommand with LazyLogging { val GHGYearStart: Int = 2020 val GHGYearEnd: Int = 2023 + val backupYieldOpt: Opts[NonEmptyList[String]] = Opts + .options[String]( + "backup_yield_url", + help = "URI of GADM features in TSV format" + ) + val ghgCommand: Opts[Unit] = Opts.subcommand( name = GHGAnalysis.name, help = "Compute greenhouse gas emissions factors for GFW Pro locations." ) { ( defaultOptions, - featureFilterOptions - ).mapN { (default, filterOptions) => + featureFilterOptions, + backupYieldOpt + ).mapN { (default, filterOptions, backupYieldUrl) => val kwargs = Map( "outputUrl" -> default.outputUrl, "noOutputPathSuffix" -> default.noOutputPathSuffix, @@ -35,11 +42,22 @@ object GHGCommand extends SummaryCommand with LazyLogging { val featureFilter = FeatureFilter.fromOptions(default.featureType, filterOptions) runAnalysis { implicit spark => + println("Starting read") + // Read in the backup yield file. Then we're arranging to broadcast a copy to + // each node once, rather than copying into each task. The broadcast makes + // sense (as opposed to a rasterization or spatial partitioning), because the + // file is currently only 26 megabytes. + val backupDF = spark.read + .options(Map("header" -> "true", "delimiter" -> ",", "escape" -> "\"")) + .csv(backupYieldUrl.toList: _*) + backupDF.printSchema() + val broadcastArray = spark.sparkContext.broadcast(backupDF.collect()) + println(s"Done read") val featureRDD = ValidatedFeatureRDD(default.featureUris, default.featureType, featureFilter, splitFeatures = true) val fcdRDD = GHGAnalysis( featureRDD, - kwargs + kwargs + ("backupYield" -> broadcastArray) ) val fcdDF = GHGDF.getFeatureDataFrame(fcdRDD, spark) diff --git a/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGSummary.scala b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGSummary.scala index 3e49c6f2..fba1ff97 100644 --- a/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGSummary.scala +++ b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGSummary.scala @@ -5,6 +5,8 @@ import geotrellis.raster._ import geotrellis.raster.summary.GridVisitor import org.globalforestwatch.summarystats.Summary import org.globalforestwatch.util.Geodesy +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.Row /** GHGRawData broken down by GHGRawDataGroup, which includes the loss year and crop yield */ case class GHGSummary( @@ -84,6 +86,15 @@ object GHGSummary { var cropYield = raster.tile.cocoYield.getData(col, row) if (cropYield == 0) { println("Empty cocoa yield") + val backupArray = kwargs("backupYield").asInstanceOf[Broadcast[Array[Row]]].value + for (r <- backupArray) { + if (r.getAs[String]("FIPS2") == "ZI10007" && r.getAs[String]("commodity") == "BANA") { + println(s"Found row $r") + } + } + //val r = backupDF.filter(col("FIPS2") === "AC01001" && col("commodity") == "BANA") + //r.show() + println("OK") } // Compute gross emissions Co2-equivalent due to tree loss at this pixel.