Skip to content

Commit

Permalink
#99 eventually is back for compile scope - to demonstrate sample meas…
Browse files Browse the repository at this point in the history
…urements _INFO output
  • Loading branch information
dk1844 committed Aug 2, 2021
1 parent f6ea32e commit a4ba9f7
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 27 deletions.
2 changes: 1 addition & 1 deletion examples/atum-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<version>${scalatest.version}</version>
<scope>test</scope>
<scope>compile</scope>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ import java.nio.file.{Files, Paths}
import org.apache.hadoop.fs.FileSystem
import org.apache.log4j.LogManager
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.scalatest.concurrent.Eventually
import za.co.absa.atum.AtumImplicits._

object SampleMeasurements1 {
import scala.concurrent.duration.DurationInt

object SampleMeasurements1 extends Eventually {

private val log = LogManager.getLogger(this.getClass)

Expand Down Expand Up @@ -55,11 +58,10 @@ object SampleMeasurements1 {
spark.disableControlMeasuresTracking()
spark.close()

if (!Files.exists(Paths.get("data/output/stage1_job_results/_INFO"))) {
throw new Exception("_INFO file not found at data/output/stage1_job_results")
} else {
log.info("File data/output/stage1_job_results/_INFO found.")
eventually(timeout(scaled(10.seconds)), interval(scaled(500.millis))) {
if (!Files.exists(Paths.get("data/output/stage1_job_results/_INFO"))) {
throw new Exception("_INFO file not found at data/output/stage1_job_results")
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@ import java.nio.file.{Files, Paths}
import org.apache.hadoop.fs.FileSystem
import org.apache.log4j.LogManager
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.scalatest.concurrent.Eventually
import za.co.absa.atum.AtumImplicits._
import za.co.absa.atum.examples.SampleMeasurements1.{eventually, interval, scaled, timeout}

object SampleMeasurements2 {
import scala.concurrent.duration.DurationInt

object SampleMeasurements2 extends Eventually {

private val log = LogManager.getLogger(this.getClass)

Expand Down Expand Up @@ -63,10 +67,10 @@ object SampleMeasurements2 {
spark.disableControlMeasuresTracking()
spark.close()

if (!Files.exists(Paths.get("data/output/stage2_job_results/_INFO"))) {
throw new Exception("_INFO file not found at data/output/stage2_job_results")
} else {
log.info("File data/output/stage2_job_results/_INFO found.")
eventually(timeout(scaled(10.seconds)), interval(scaled(500.millis))) {
if (!Files.exists(Paths.get("data/output/stage2_job_results/_INFO"))) {
throw new Exception("_INFO file not found at data/output/stage2_job_results")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ import java.nio.file.{Files, Paths}
import org.apache.hadoop.fs.FileSystem
import org.apache.log4j.LogManager
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.scalatest.concurrent.Eventually
import za.co.absa.atum.AtumImplicits._
import za.co.absa.atum.examples.SampleMeasurements2.{eventually, interval, scaled, timeout}
import za.co.absa.atum.model.ControlMeasure
import za.co.absa.atum.utils.{BuildProperties, FileUtils, SerializationUtils}

import scala.concurrent.duration.DurationInt

object SampleMeasurements3 {
object SampleMeasurements3 extends Eventually {
case class MyBuildProperties(projectName: String, buildVersion: String) extends BuildProperties

private val log = LogManager.getLogger(this.getClass)
Expand Down Expand Up @@ -55,25 +58,24 @@ object SampleMeasurements3 {
.write.mode(SaveMode.Overwrite)
.parquet("data/output/stage3_job_results")


spark.disableControlMeasuresTracking()
spark.close()

if (!Files.exists(Paths.get("data/output/stage3_job_results/_INFO"))) {
throw new Exception("_INFO file not found at data/output/stage3_job_results")
} else {
log.info("File data/output/stage3_job_results/_INFO found. Checking its content...")
}
eventually(timeout(scaled(10.seconds)), interval(scaled(500.millis))) {
if (!Files.exists(Paths.get("data/output/stage3_job_results/_INFO"))) {
throw new Exception("_INFO file not found at data/output/stage3_job_results")
}

val jsonInfoFile = FileUtils.readFileToString("data/output/stage3_job_results/_INFO")
val measureObject1: ControlMeasure = SerializationUtils.fromJson[ControlMeasure](jsonInfoFile)
val checkpoint = measureObject1.checkpoints.filter(_.name == "checkpoint1").head
val jsonInfoFile = FileUtils.readFileToString("data/output/stage3_job_results/_INFO")
val measureObject1: ControlMeasure = SerializationUtils.fromJson[ControlMeasure](jsonInfoFile)
val checkpoint = measureObject1.checkpoints.filter(_.name == "checkpoint1").head

if (!checkpoint.software.contains("MySoftware") || !checkpoint.version.contains("v007")) {
throw new Exception(s"Software or Version was not set properly. Got name ${checkpoint.software} and version ${checkpoint.version}")
} else {
log.info("_INFO file correctly contained custom SW Name and version.")
}
if (!checkpoint.software.contains("MySoftware") || !checkpoint.version.contains("v007")) {
throw new Exception(s"Software or Version was not set properly. Got name ${checkpoint.software} and version ${checkpoint.version}")
} else {
log.info("_INFO file correctly contained custom SW Name and version.")
}

}
}
}

0 comments on commit a4ba9f7

Please sign in to comment.