-
Notifications
You must be signed in to change notification settings - Fork 8
/
build.sbt
304 lines (274 loc) · 12.5 KB
/
build.sbt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
import Dependencies._
name := "treecoverloss"
organization := "org.globalforestwatch"
licenses := Seq(
"Apache-2.0" -> url("http://www.apache.org/licenses/LICENSE-2.0.html")
)
scalaVersion := Version.scala
ThisBuild / scalaVersion := Version.scala
scalacOptions ++= Seq(
"-deprecation",
"-unchecked",
"-feature",
"-language:implicitConversions",
"-language:reflectiveCalls",
"-language:higherKinds",
"-language:postfixOps",
"-language:existentials",
"-language:experimental.macros",
"-Ypartial-unification", // Required by Cats
"-Ywarn-unused-import",
"-Yrangepos"
)
publishMavenStyle := true
Test / publishArtifact := false
pomIncludeRepository := { _ =>
false
}
addCompilerPlugin(
"org.spire-math" % "kind-projector" % "0.9.4" cross CrossVersion.binary
)
addCompilerPlugin(
"org.scalamacros" %% "paradise" % "2.1.1" cross CrossVersion.full
)
resolvers ++= Seq(
"GeoSolutions" at "https://maven.geo-solutions.it/",
"LT-releases" at "https://repo.locationtech.org/content/groups/releases",
"LT-snapshots" at "https://repo.eclipse.org/content/groups/snapshots",
"OSGeo" at "https://repo.osgeo.org/repository/release/",
"Open Source Geospatial Foundation Repository" at "https://repo.osgeo.org/repository/release/",
"Apache Software Foundation Snapshots" at "https://repository.apache.org/content/groups/snapshots",
"Java.net repository" at "https://download.java.net/maven/2",
"Artifacts" at "https://mvnrepository.com/artifact",
"Sonatype Releases" at "https://oss.sonatype.org/content/repositories/snapshots/",
"jitpack" at "https://jitpack.io",
"maven2" at "https://repo1.maven.org/maven2",
// "Geotools Wrapper" at "https://mvnrepository.com/artifact/org.datasyslab/geotools-wrapper",
"Geotools Metadata" at "https://mvnrepository.com/artifact/org.geotools/gt-metadata",
Resolver.bintrayRepo("azavea", "geotrellis")
)
libraryDependencies ++= Seq(
sparkCore,
sparkSQL,
sparkHive,
frameless,
hadoopAws,
hadoopCommon,
hadoopMapReduceClientCore,
logging,
decline,
paranamer,
scalatest % Test,
scalactic % Test,
geotrellisSpark,
geotrellisSparkTestKit % Test,
sparkFastTests % Test,
geotrellisS3,
geotrellisGdal,
geotrellisGdalWarp, //May need to comment out to run with GDAL 3.1.2. For carbonflux package run using model v1.4.0, didn't have this line
sedonaCore,
sedonaSQL,
breeze,
breezeNatives,
breezeViz,
sparkDaria,
"org.datasyslab" % "geotools-wrapper" % "geotools-24.1",
"org.wololo" % "jts2geojson" % "0.14.3",
jts
)
dependencyOverrides += "com.google.guava" % "guava" % "20.0"
assembly / assemblyShadeRules := {
val shadePackage = "org.globalforestwatch.shaded"
Seq(
ShadeRule.rename("shapeless.**" -> s"$shadePackage.shapeless.@1").inAll,
ShadeRule.rename("cats.kernel.**" -> s"$shadePackage.cats.kernel.@1").inAll
)
}
// auto imports for local SBT console
// can be used with `test:console` command
console / initialCommands :=
"""
import java.net._
//import geotrellis.raster._
//import geotrellis.vector._
//import geotrellis.vector.io._
//import geotrellis.spark._
//import geotrellis.spark.tiling._
//import geotrellis.contrib.vlm._
//import geotrellis.contrib.vlm.gdal._
//import geotrellis.contrib.vlm.geotiff._
//import geotrellis.vector.io.wkt.WKT
import org.apache.spark.rdd._
import org.apache.spark.sql._
import org.apache.spark.{SparkConf, SparkContext}
import org.globalforestwatch.summarystats.treecoverloss._
import org.globalforestwatch.util._
//
//val conf = new SparkConf().
//setAppName("Tree Cover Loss Console").
//set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
////.set("spark.kryo.registrator", "geotrellis.spark.io.kryo.KryoRegistrator")
//
//implicit val spark: SparkSession = SparkSession.builder.config(conf).getOrCreate
//implicit val sc: SparkContext = spark.sparkContext
"""
// settings for local testing
Compile / run := Defaults.runTask(Compile / fullClasspath, Compile / run / mainClass, Compile / run / runner).evaluated
Compile / runMain := Defaults.runMainTask(Compile / fullClasspath , Compile / runMain / runner)
Test / fork := true
Test / parallelExecution := false
Test / testOptions += Tests.Argument("-oD")
Test / javaOptions ++= Seq("-Xms1024m", "-Xmx8144m", "-Djts.overlay=ng")
Test / envVars := Map("AWS_REQUEST_PAYER" -> "requester")
// Settings for sbt-assembly plugin which builds fat jars for use by spark jobs
assembly / test := {}
assembly / assemblyMergeStrategy := {
case "reference.conf" => MergeStrategy.concat
case "application.conf" => MergeStrategy.concat
// both GeoSpark and Geotrellis bring in this library, need to use GeoSpark version
case PathList("org", "geotools", xs@_*) => MergeStrategy.first
case PathList("META-INF", xs@_*) =>
xs match {
case ("MANIFEST.MF" :: Nil) => MergeStrategy.discard
// Concatenate everything in the services directory to keep GeoTools happy.
case ("services" :: _ :: Nil) =>
MergeStrategy.concat
// Concatenate these to keep JAI happy.
case ("javax.media.jai.registryFile.jai" :: Nil) |
("registryFile.jai" :: Nil) | ("registryFile.jaiext" :: Nil) =>
MergeStrategy.concat
case (name :: Nil) => {
// Must exclude META-INF/*.([RD]SA|SF) to avoid "Invalid signature file digest for Manifest main attributes" exception.
if (name.endsWith(".RSA") || name.endsWith(".DSA") || name.endsWith(
".SF"
))
MergeStrategy.discard
else
MergeStrategy.first
}
case _ => MergeStrategy.first
}
case _ => MergeStrategy.first
}
// Settings from sbt-lighter plugin that will automate creating and submitting this job to EMR
import sbtlighter._
import com.amazonaws.services.elasticmapreduce.model.Tag
// Always check Spot prices for instance type and select subnet based on best price
// GFW subnet zone us-east-1a: subnet-00335589f5f424283
// GFW subnet zone us-east-1b: subnet-8c2b5ea1
// GFW subnet zone us-east-1c: subnet-08458452c1d05713b
// GFW subnet zone us-east-1d: subnet-116d9a4a
// GFW subnet zone us-east-1e: subnet-037b97cff4493e3a1
// GFW subnet zone us-east-1f: subnet-0360516ee122586ff
sparkEmrRelease := "emr-6.1.0"
sparkAwsRegion := "us-east-1"
sparkEmrApplications := Seq("Spark", "Zeppelin", "Ganglia")
sparkS3JarFolder := "s3://wri-users/dgibbs/geotrellis/jars"
sparkS3LogUri := Some("s3://wri-users/dgibbs/geotrellis/logs")
sparkSubnetId := Some("subnet-8c2b5ea1")
sparkSecurityGroupIds := Seq("sg-00ca15563a40c5687", "sg-6c6a5911")
sparkInstanceCount := 201 // 201 for carbonflux and carbon_sensitivity
//sparkInstanceCount := 10 // for running test areas in EMR
sparkMasterType := "r4.2xlarge"
sparkCoreType := "r4.2xlarge"
sparkMasterEbsSize := Some(10)
sparkCoreEbsSize := Some(10)
//sparkMasterPrice := Some(3.0320)
sparkCorePrice := Some(0.532)
sparkClusterName := s"geotrellis-treecoverloss"
sparkEmrServiceRole := "EMR_DefaultRole"
sparkInstanceRole := "EMR_EC2_DefaultRole"
sparkJobFlowInstancesConfig := sparkJobFlowInstancesConfig.value.withEc2KeyName(
"dgibbs_wri"
)
// For carbonflux runs
sparkEmrBootstrap := List(
BootstrapAction(
"Install GDAL 3.1.2 dependencies",
"s3://gfw-pipelines/geotrellis/bootstrap/gdal.sh",
"3.1.2"
)
)
//// For other runs
//sparkEmrBootstrap := List(
// BootstrapAction(
// "Install GDAL 3.8.3 dependencies",
// "s3://gfw-pipelines/geotrellis/bootstrap/gdal-3.8.3.sh",
// "3.8.3"
// )
//)
sparkRunJobFlowRequest := sparkRunJobFlowRequest.value
.withTags(new Tag("Project", "Global Forest Watch"))
.withTags(new Tag("Job", "Carbon Flux Analysis Geotrellis"))
.withTags(new Tag("Project Lead", "David Gibbs"))
.withTags(new Tag("Name", "geotrellis-treecoverloss"))
sparkEmrConfigs := List(
// reference to example by geotrellis: https://github.com/geotrellis/geotrellis-spark-job.g8/blob/master/src/main/g8/build.sbt#L70-L91
// EmrConfig("spark").withProperties("maximizeResourceAllocation" -> "true"),
EmrConfig("emrfs-site").withProperties("fs.s3.useRequesterPaysHeader" -> "true"),
EmrConfig("spark-defaults").withProperties(
// https://aws.amazon.com/blogs/big-data/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/
// Best practice 1: Choose the right type of instance for each of the node types in an Amazon EMR cluster.
// Doing this is one key to success in running any Spark application on Amazon EMR.
//
// Best practice 2: Set spark.dynamicAllocation.enabled to true only if the numbers are properly determined
// for spark.dynamicAllocation.initialExecutors/minExecutors/maxExecutors parameters. Otherwise,
// set spark.dynamicAllocation.enabled to false and control the driver memory, executor memory,
// and CPU parameters yourself. To do this, calculate and set these properties manually for each
// application (see the example following).
// spark.executor.cores -> always 5 cCPU
// Number of executors = (total number of virtual cores per instance - 1)/ spark.executors.cores //9
// Total executor memory = total RAM per instance / number of executors per instance //42
// spark.executors.memory -> total executor memory * 0.90
// spark.yarn.executor.memoryOverhead -> total executor memory * 0.10
// spark.driver.memory = spark.executors.memory
// spark.driver.cores= spark.executors.cores
// spark.executor.instances = (number of executors per instance * number of core instances) minus 1 for the driver
// spark.default.parallelism = spark.executor.instances * spark.executors.cores * 2
// spark.sql.shuffle.partitions = spark.default.parallelism
"spark.dynamicAllocation.enabled" -> "false",
"spark.executor.cores" -> "1", //5",
"spark.executor.memory" -> "5652m", //37G
"spark.executor.memoryOverhead" -> "2g", //5G
"spark.driver.cores" -> "1",
"spark.driver.memory" -> "6652m",
"spark.executor.instances" -> "1599", // 1599 for carbonflux and carbon_sensitivity
"spark.default.parallelism" -> "15990", // 15990 for carbonflux and carbon_sensitivity
"spark.sql.shuffle.partitions" -> "15990", // 15990 for carbonflux and carbon_sensitivity
"spark.shuffle.spill.compress" -> "true",
"spark.driver.maxResultSize" -> "3G",
"spark.shuffle.compress" -> "true",
"spark.yarn.appMasterEnv.LD_LIBRARY_PATH" -> "/usr/local/miniconda/lib/:/usr/local/lib",
"spark.rdd.compress" -> "true",
"spark.shuffle.service.enabled" -> "true",
"spark.executorEnv.LD_LIBRARY_PATH" -> "/usr/local/miniconda/lib/:/usr/local/lib",
"spark.dynamicAllocation.enabled" -> "true",
// // Use these GC strategy as default
// "spark.driver.defaultJavaOptions" -> "-XX:+UseParallelGC -XX:+UseParallelOldGC -XX:OnOutOfMemoryError='kill -9 %p'",
// "spark.executor.defaultJavaOptions" -> "-XX:+UseParallelGC -XX:+UseParallelOldGC -XX:OnOutOfMemoryError='kill -9 %p'",
// "spark.kryoserializer.buffer.max" -> "2047m",
// Best practice 4: Always set up a garbage collector when handling large volume of data through Spark.
// Use these GC strategy to avoid java.lang.OutOfMemoryError: GC overhead limit exceeded
"spark.executor.defaultJavaOptions" -> "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.driver.defaultJavaOptions" -> "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
// set this environment variable for GDAL to use request payer method for S3 files.
// Added some of these to resolve 503 errors using 200 worker instances.
"spark.executorEnv.AWS_REQUEST_PAYER" -> "requester",
"spark.yarn.appMasterEnv.AWS_REQUEST_PAYER" -> "requester",
"spark.executorEnv.GDAL_HTTP_MAX_RETRY" -> "10",
"spark.yarn.appMasterEnv.GDAL_HTTP_MAX_RETRY" -> "3",
"spark.executorEnv.GDAL_DISABLE_READDIR_ON_OPEN" -> "EMPTY_DIR",
"spark.yarn.appMasterEnv.GDAL_DISABLE_READDIR_ON_OPEN" -> "EMPTY_DIR",
"spark.executorEnv.GDAL_HTTP_RETRY_DELAY" -> "10",
"spark.yarn.appMasterEnv.GDAL_HTTP_RETRY_DELAY" -> "10"
),
// EmrConfig("spark-env").withProperties(
// "LD_LIBRARY_PATH" -> "/usr/local/lib"
// ),
EmrConfig("yarn-site").withProperties(
// Best practice 5: Always set the virtual and physical memory check flag to false.
"yarn.resourcemanager.am.max-attempts" -> "1",
"yarn.nodemanager.vmem-check-enabled" -> "false",
"yarn.nodemanager.pmem-check-enabled" -> "false"
)
)