Skip to content

Commit

Permalink
#75 adding the ability to prepend a checkpoint + unit test (#76)
Browse files Browse the repository at this point in the history
* #75 adding the ability to prepend a checkpoint inside a ControlMeasure + unit test, readme update
  • Loading branch information
dk1844 authored May 4, 2021
1 parent 1de2131 commit dfe2c1f
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 0 deletions.
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,25 @@ val controlMeasure: ControlMeasure =
println("Generated control measure is: " + controlMeasure.asJson)
```

#### Customizing ControlMeasure
In case you need to change the data the ControlMeasure holds, you can do this in a usual scala way - the model comprises
of case classes, so the built-in copy methods are available, e.g. `cm.copy(metadata = cm.metadata.copy(sourceApplication = "UpdatedAppName"))`

However, to make things slightly easier in the checkpoint department, ControlMeasure's helper method `withPrecedingCheckpoint()`
to prepend a custom checkpoint has been added shifting existing checkpoint behind while also increasing their order:

```scala
import za.co.absa.atum.model._

val cm: ControlMeasure = ... // existing ControlMeasure with 2 checkpoints
val cpToBePrepended: Checkpoint = Checkpoint(..., order = 1, ...)

cm.checkpoints.map(_.order) // List(1, 2)
cpToBePrepended.order // 1
val updatedCm = cm.withPrecedingCheckpoint(cpToBePrepended)
updatedCm.checkpoints.map(_.order) // List(1, 2, 3)
```

#### Writing an _INFO file with the ControlMeasure to HDFS
```scala
import org.apache.hadoop.fs.{FileSystem, Path}
Expand Down
16 changes: 16 additions & 0 deletions model/src/main/scala/za/co/absa/atum/model/ControlMeasure.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,20 @@ case class ControlMeasure
) {
def asJson: String = SerializationUtils.asJson(this)
def asJsonPretty: String = SerializationUtils.asJsonPretty(this)

/**
* A new ControlMeasure will be constructed with the supplied `checkpoint1` as the new first checkpoint (as-is,
* e.g. its order value is neither checked nor adjusted).
* Any existing checkpoints will be shifted behind with their order indices increased by 1.
*
* @param checkpoint1 a new checkpoint preceding all the existing
*/
def withPrecedingCheckpoint(checkpoint1: Checkpoint): ControlMeasure = {
val shiftedCheckpoints = checkpoints.map { cp =>
cp.copy(order = cp.order + 1)
}

this.copy(checkpoints = checkpoint1 :: shiftedCheckpoints)

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.model

import org.scalatest.matchers.should.Matchers

class ControlMeasureSpec extends org.scalatest.flatspec.AnyFlatSpec with Matchers {

private val cp1 = Checkpoint("prependingCp", None, None, "01-01-2020 07:00:00", "01-01-2020 07:00:10", "wf1", 1, List(
Measurement("control1", "someControlType", "column1", "1234")
))

"ControlMeasure" should "get a new Checkpoint with no checkpoints" in {
val cm = getTestingControlMeasure(0)
cm.checkpoints should have length 0

val updatedCm = cm.withPrecedingCheckpoint(cp1)

// cp1 prepended as-is
val expectedCm: ControlMeasure = ControlMeasure(
ControlMeasureMetadata("AtumTest", "CZ", "Snapshot", "example_input.csv", "public", 1, "01-01-2020", Map.empty),
runUniqueId = None,
checkpoints = List(Checkpoint("prependingCp", None, None, "01-01-2020 07:00:00", "01-01-2020 07:00:10", "wf1", 1, List(
Measurement("control1", "someControlType", "column1", "1234"))
)))

updatedCm shouldBe expectedCm
}

"ControlMeasure" should "get a new Checkpoint with existing checkpoints being shifted back with their order, too" in {
val cm = getTestingControlMeasure(2)
cm.checkpoints should have length 2
cm.checkpoints.map(_.order) shouldBe Seq(1,2)

val updatedCm = cm.withPrecedingCheckpoint(cp1)

// cp1 prepended as-is
updatedCm.checkpoints should have length 3
updatedCm.checkpoints.head shouldBe cp1
updatedCm.checkpoints.tail.map(_.order) shouldBe Seq(2,3) // existing order shifted back
}

private def getTestingControlMeasure(cpCount: Int): ControlMeasure = {
require(cpCount >= 0 && cpCount < 10)
val testingCheckpoints = Range(0, cpCount).map(_ + 1) // starting with order: 1
.map { order =>
Checkpoint(s"orig-cp$order", None, None, s"01-01-2020 0$order:00:00", s"01-01-2020 0$order:00:10", "wf1", order, List(
Measurement("control1", "someControlType", "column1", "1234")
))
}

ControlMeasure(
ControlMeasureMetadata("AtumTest", "CZ", "Snapshot", "example_input.csv", "public", 1, "01-01-2020", Map.empty),
runUniqueId = None,
checkpoints = testingCheckpoints.toList
)
}

}

0 comments on commit dfe2c1f

Please sign in to comment.