From 45e7578c58f20c0b98d8a52714af8fae3be32c6a Mon Sep 17 00:00:00 2001 From: ABLL526 Date: Tue, 12 Nov 2024 15:48:50 +0200 Subject: [PATCH 1/2] Feature 273 First Commit Feature 273 First Commit by Liam Leibrandt --- .github/CODEOWNERS | 2 +- database/README.md | 7 + .../postgres/runs/V0.3.0.1__get_ancestors.sql | 141 +++++ .../runs/GetAncestorsIntegrationTests.scala | 495 ++++++++++++++++++ ...PartitioningMainFlowIntegrationTests.scala | 2 + project/plugins.sbt | 4 +- publish.sbt | 6 + .../scala/za/co/absa/atum/server/Main.scala | 1 + .../controller/PartitioningController.scala | 7 + .../PartitioningControllerImpl.scala | 14 + .../runs/functions/GetAncestors.scala | 86 +++ .../absa/atum/server/api/http/Endpoints.scala | 13 + .../co/absa/atum/server/api/http/Routes.scala | 10 + .../repository/PartitioningRepository.scala | 6 + .../PartitioningRepositoryImpl.scala | 27 + .../api/service/PartitioningService.scala | 6 + .../api/service/PartitioningServiceImpl.scala | 11 + .../za/co/absa/atum/server/api/TestData.scala | 22 + .../PartitioningControllerUnitTests.scala | 26 + .../GetAncestorsIntegrationTests.scala | 47 ++ .../GetAncestorsEndpointV2UnitTests.scala | 118 +++++ .../PartitioningRepositoryUnitTests.scala | 32 ++ .../PartitioningServiceUnitTests.scala | 31 ++ 23 files changed, 1111 insertions(+), 3 deletions(-) create mode 100644 database/src/main/postgres/runs/V0.3.0.1__get_ancestors.sql create mode 100644 database/src/test/scala/za/co/absa/atum/database/runs/GetAncestorsIntegrationTests.scala create mode 100644 server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetAncestors.scala create mode 100644 server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetAncestorsIntegrationTests.scala create mode 100644 server/src/test/scala/za/co/absa/atum/server/api/http/GetAncestorsEndpointV2UnitTests.scala diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index cec71281c..b39c52df7 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1 +1 @@ -* @benedeki @lsulak @TebaleloS @Zejnilovic @dk1844 @salamonpavel +* @benedeki @lsulak @TebaleloS @Zejnilovic @dk1844 @salamonpavel @ABLL526 diff --git a/database/README.md b/database/README.md index ebdbe0801..963a16cdd 100644 --- a/database/README.md +++ b/database/README.md @@ -32,3 +32,10 @@ to remove them or sbt flywayBaseline ``` to set the current state as the baseline. + +```zsh +docker kill atum_db +docker rm atum_db +docker run --name=atum_db -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=atum_db -p 5432:5432 -d postgres:16 +sbt flywayMigrate +``` \ No newline at end of file diff --git a/database/src/main/postgres/runs/V0.3.0.1__get_ancestors.sql b/database/src/main/postgres/runs/V0.3.0.1__get_ancestors.sql new file mode 100644 index 000000000..ab63e40e5 --- /dev/null +++ b/database/src/main/postgres/runs/V0.3.0.1__get_ancestors.sql @@ -0,0 +1,141 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE OR REPLACE FUNCTION runs.get_ancestors( + IN i_partitioning_id BIGINT, + IN i_limit INT DEFAULT 5, + IN i_offset BIGINT DEFAULT 0, + OUT status INTEGER, + OUT status_text TEXT, + OUT ancestor_id BIGINT, + OUT partitioning JSONB, + OUT author TEXT, + OUT has_more BOOLEAN +) RETURNS SETOF record AS +$$ + ------------------------------------------------------------------------------- +-- +-- Function: runs.get_ancestors(1) +-- Returns Ancestors' partition ID for the given id +-- +-- Parameters: +-- i_partitioning_id - id that we asking the Ancestors for +-- i_limit - (optional) maximum number of partitionings to return, default is 5 +-- i_offset - (optional) offset to use for pagination, default is 0 +-- +-- Returns: +-- status - Status code +-- status_text - Status message +-- ancestor_id - ID of Ancestor partition +-- partitioning - partitioning data of ancestor +-- author - author of the Ancestor partitioning +-- has_more - Flag indicating if there are more partitionings available + +-- Status codes: +-- 11 - OK +-- 41 - Partitioning not found +-- +------------------------------------------------------------------------------- +DECLARE + flow_id BIGINT[]; + var BIGINT; + partitionCreateAt TIMESTAMP; + _has_more BOOLEAN; + +BEGIN + + PERFORM 1 FROM runs.partitionings WHERE id_partitioning = i_partitioning_id; + IF NOT FOUND THEN + status := 41; + status_text := 'Child Partitioning not found'; + has_more := FALSE; + RETURN NEXT; + RETURN; + END IF; + + SELECT created_at FROM runs.partitionings WHERE id_partitioning = i_partitioning_id INTO partitionCreateAt; + + flow_id := array( + SELECT fk_flow AS flow_id + FROM flows.partitioning_to_flow + WHERE fk_partitioning = i_partitioning_id + ); + + IF cardinality(flow_id) = 0 + THEN + status := 41; + status_text := 'Flow not found'; + has_more := FALSE; + RETURN NEXT; + RETURN; + END IF; + + FOREACH var IN ARRAY flow_id LOOP + IF i_limit IS NOT NULL THEN + SELECT count(*) > i_limit + FROM flows.partitioning_to_flow PTF + WHERE PTF.fk_flow = var + LIMIT i_limit + 1 OFFSET i_offset + INTO _has_more; + ELSE + _has_more := false; + END IF; + END LOOP; + + RETURN QUERY + SELECT + 11 AS status, + 'OK' AS status_text, + P.id_partitioning AS ancestor_id, + P.partitioning AS partitioning, + P.created_by AS author, + _has_more AS has_more + FROM + runs.partitionings P + INNER JOIN + flows.partitioning_to_flow PF + ON PF.fk_partitioning = P.id_partitioning + INNER JOIN + (SELECT * FROM flows.partitioning_to_flow OLD_PF WHERE OLD_PF.fk_partitioning = i_partitioning_id) PF2 + ON Pf2.fk_flow = PF.fk_flow + WHERE + P.id_partitioning < i_partitioning_id + AND + P.created_at <= partitionCreateAt + GROUP BY P.id_partitioning + ORDER BY + P.id_partitioning, + P.created_at DESC + LIMIT i_limit + OFFSET i_offset; + + IF FOUND THEN + status := 11; + status_text := 'OK'; + ELSE + status := 41; + status_text := 'Ancestor Partitioning not found'; + has_more := FALSE; + END IF; + RETURN NEXT; + RETURN; + +END; +$$ + LANGUAGE plpgsql VOLATILE SECURITY DEFINER; + +ALTER FUNCTION runs.get_ancestors(BIGINT, INT, BIGINT) OWNER TO atum_owner; +GRANT EXECUTE ON FUNCTION runs.get_ancestors(BIGINT, INT, BIGINT) TO atum_user; diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/GetAncestorsIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/runs/GetAncestorsIntegrationTests.scala new file mode 100644 index 000000000..ad1848163 --- /dev/null +++ b/database/src/test/scala/za/co/absa/atum/database/runs/GetAncestorsIntegrationTests.scala @@ -0,0 +1,495 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.database.runs + +import io.circe.parser.parse +import za.co.absa.balta.DBTestSuite +import za.co.absa.balta.classes.JsonBString + +class GetAncestorsIntegrationTests extends DBTestSuite { + + private val getAncestorsFn = "runs.get_ancestors" + private val partitioningsTable = "runs.partitionings" + + private val createFlowFn = "flows._create_flow" + private val addToParentFlowsFn = "flows._add_to_parent_flows" + + private val partitioning1 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyA", "keyB", "keyC"], + | "keysToValuesMap": { + | "keyA": "valueA", + | "keyB": "valueB", + | "keyC": "valueC" + | } + |} + |""".stripMargin + ) + + private val expectedPartitioning1 = parse(partitioning1.value).getOrElse(throw new Exception("Failed to parse JSON")) + + private val partitioning2 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyD", "keyE", "keyF"], + | "keysToValuesMap": { + | "keyD": "valueD", + | "keyE": "valueE", + | "keyF": "valueF" + | } + |} + |""".stripMargin + ) + + private val expectedPartitioning2 = parse(partitioning2.value).getOrElse(throw new Exception("Failed to parse JSON")) + + private val partitioning3 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyG", "keyH", "keyI"], + | "keysToValuesMap": { + | "keyG": "valueG", + | "keyH": "valueH", + | "keyI": "valueI" + | } + |} + |""".stripMargin + ) + + private val expectedPartitioning3 = parse(partitioning3.value).getOrElse(throw new Exception("Failed to parse JSON")) + + private val partitioning4 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyJ", "keyK", "keyL"], + | "keysToValuesMap": { + | "keyJ": "valueJ", + | "keyK": "valueK", + | "keyL": "valueL" + | } + |} + |""".stripMargin + ) + + private val expectedPartitioning4 = parse(partitioning4.value).getOrElse(throw new Exception("Failed to parse JSON")) + + private val partitioning5 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyM", "keyN", "keyO"], + | "keysToValuesMap": { + | "keyM": "valueM", + | "keyN": "valueN", + | "keyO": "valueO" + | } + |} + |""".stripMargin + ) + + private val expectedPartitioning5 = parse(partitioning5.value).getOrElse(throw new Exception("Failed to parse JSON")) + + private val partitioning6 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyP", "keyQ", "keyR"], + | "keysToValuesMap": { + | "keyP": "valueP", + | "keyQ": "valueQ", + | "keyR": "valueR" + | } + |} + |""".stripMargin + ) + + private val expectedPartitioning6 = parse(partitioning6.value).getOrElse(throw new Exception("Failed to parse JSON")) + + private val partitioning7 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyS", "keyT", "keyU"], + | "keysToValuesMap": { + | "keyS": "valueS", + | "keyT": "valueT", + | "keyU": "valueU" + | } + |} + |""".stripMargin + ) + + private val expectedPartitioning7 = parse(partitioning7.value).getOrElse(throw new Exception("Failed to parse JSON")) + + private val partitioning8 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyV", "keyW", "keyX"], + | "keysToValuesMap": { + | "keyV": "valueV", + | "keyW": "valueW", + | "keyX": "valueX" + | } + |} + |""".stripMargin + ) + + var flowIdOfPartitioning1: Long = _ + var flowIdOfPartitioning2: Long = _ + var flowIdOfPartitioning3: Long = _ + + test("Returns Ancestors for a given Partition ID") { + + table(partitioningsTable).insert(add("partitioning", partitioning1).add("created_by", "Grandpa")) + table(partitioningsTable).insert(add("partitioning", partitioning2).add("created_by", "Father")) + table(partitioningsTable).insert(add("partitioning", partitioning3).add("created_by", "Son")) + table(partitioningsTable).insert(add("partitioning", partitioning4).add("created_by", "Grandson")) + table(partitioningsTable).insert(add("partitioning", partitioning5).add("created_by", "Grandma")) + table(partitioningsTable).insert(add("partitioning", partitioning6).add("created_by", "Mother")) + table(partitioningsTable).insert(add("partitioning", partitioning7).add("created_by", "Daughter")) + table(partitioningsTable).insert(add("partitioning", partitioning8).add("created_by", "Granddaughter")) + + val partId1: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning1, "id_partitioning").get.get + + val partId2: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning2, "id_partitioning").get.get + + val partId3: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning3, "id_partitioning").get.get + + val partId4: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning4, "id_partitioning").get.get + + val partId5: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning5, "id_partitioning").get.get + + val partId6: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning6, "id_partitioning").get.get + + val partId7: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning7, "id_partitioning").get.get + + val partId8: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning8, "id_partitioning").get.get + + function(createFlowFn) + .setParam("i_fk_partitioning", partId1) + .setParam("i_by_user", "Grandpa") + .execute { queryResult => + flowIdOfPartitioning1 = queryResult.next().getLong("id_flow").get + } + + function(createFlowFn) + .setParam("i_fk_partitioning", partId2) + .setParam("i_by_user", "Father") + .execute { queryResult => + flowIdOfPartitioning2 = queryResult.next().getLong("id_flow").get + } + + function(createFlowFn) + .setParam("i_fk_partitioning", partId6) + .setParam("i_by_user", "Daughter") + .execute { queryResult => + flowIdOfPartitioning3 = queryResult.next().getLong("id_flow").get + } + + function(addToParentFlowsFn) + .setParam("i_fk_parent_partitioning", partId1) + .setParam("i_fk_partitioning", partId3) + .setParam("i_by_user", "Son") + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "Partitioning added to flows") + } + + function(addToParentFlowsFn) + .setParam("i_fk_parent_partitioning", partId2) + .setParam("i_fk_partitioning", partId4) + .setParam("i_by_user", "Grandson") + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "Partitioning added to flows") + } + + function(addToParentFlowsFn) + .setParam("i_fk_parent_partitioning", partId6) + .setParam("i_fk_partitioning", partId7) + .setParam("i_by_user", "GrandDaughter") + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "Partitioning added to flows") + } + + function(addToParentFlowsFn) + .setParam("i_fk_parent_partitioning", partId3) + .setParam("i_fk_partitioning", partId5) + .setParam("i_by_user", "GrandMa") + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "Partitioning added to flows") + } + + function(addToParentFlowsFn) + .setParam("i_fk_parent_partitioning", partId4) + .setParam("i_fk_partitioning", partId5) + .setParam("i_by_user", "GrandMa") + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "Partitioning added to flows") + } + + function(addToParentFlowsFn) + .setParam("i_fk_parent_partitioning", partId5) + .setParam("i_fk_partitioning", partId8) + .setParam("i_by_user", "Mother") + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "Partitioning added to flows") + } + + function(addToParentFlowsFn) + .setParam("i_fk_parent_partitioning", partId7) + .setParam("i_fk_partitioning", partId8) + .setParam("i_by_user", "Mother") + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "Partitioning added to flows") + } + + //TEST 1 Ancestors Partition + function(getAncestorsFn) + .setParam("i_partitioning_id", partId3) + .execute { queryResult => + val row = queryResult.next() + val returnedPartitioning = row.getJsonB("partitioning").get + val returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(partId1)) + assert(returnedPartitioningParsed == expectedPartitioning1) + assert(row.getString("author").contains("Grandpa")) + } + + //TEST multiple Ancestors Partitions + function(getAncestorsFn) + .setParam("i_partitioning_id", partId5) + .execute { queryResult => + var row = queryResult.next() + var returnedPartitioning = row.getJsonB("partitioning").get + var returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(partId1)) + assert(returnedPartitioningParsed == expectedPartitioning1) + assert(row.getString("author").contains("Grandpa")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(partId2)) + assert(returnedPartitioningParsed == expectedPartitioning2) + assert(row.getString("author").contains("Father")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(partId3)) + assert(returnedPartitioningParsed == expectedPartitioning3) + assert(row.getString("author").contains("Son")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(partId4)) + assert(returnedPartitioningParsed == expectedPartitioning4) + assert(row.getString("author").contains("Grandson")) + } + + //TEST Separate flow for Ancestors Partitions + function(getAncestorsFn) + .setParam("i_partitioning_id", partId7) + .execute { queryResult => + val row = queryResult.next() + val returnedPartitioning = row.getJsonB("partitioning").get + val returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(partId6)) + assert(returnedPartitioningParsed == expectedPartitioning6) + assert(row.getString("author").contains("Mother")) + } + + //TEST ALL flows for Ancestors Partitions + function(getAncestorsFn) + .setParam("i_partitioning_id", partId8) + .setParam("i_limit", 10) + .execute { queryResult => + var row = queryResult.next() + var returnedPartitioning = row.getJsonB("partitioning").get + var returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(partId1)) + assert(returnedPartitioningParsed == expectedPartitioning1) + assert(row.getString("author").contains("Grandpa")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(partId2)) + assert(returnedPartitioningParsed == expectedPartitioning2) + assert(row.getString("author").contains("Father")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(partId3)) + assert(returnedPartitioningParsed == expectedPartitioning3) + assert(row.getString("author").contains("Son")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(partId4)) + assert(returnedPartitioningParsed == expectedPartitioning4) + assert(row.getString("author").contains("Grandson")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(partId5)) + assert(returnedPartitioningParsed == expectedPartitioning5) + assert(row.getString("author").contains("Grandma")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(partId6)) + assert(returnedPartitioningParsed == expectedPartitioning6) + assert(row.getString("author").contains("Mother")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(partId7)) + assert(returnedPartitioningParsed == expectedPartitioning7) + assert(row.getString("author").contains("Daughter")) + } + } + + test("Child Partitioning not found") { + val nonExistentID = 9999L + + function(getAncestorsFn) + .setParam("i_partitioning_id", nonExistentID) + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(41)) + assert(row.getString("status_text").contains("Child Partitioning not found")) + assert(row.getJsonB("ancestor_id").isEmpty) + assert(row.getJsonB("partitioning").isEmpty) + assert(row.getString("author").isEmpty) + assert(!queryResult.hasNext) + } + } + + test("Flow not found") { + + table(partitioningsTable).insert(add("partitioning", partitioning5).add("created_by", "NO_FLOW")) + + val partId5: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning5, "id_partitioning").get.get + + function(getAncestorsFn) + .setParam("i_partitioning_id", partId5) + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(41)) + assert(row.getString("status_text").contains("Flow not found")) + assert(row.getJsonB("ancestor_id").isEmpty) + assert(row.getJsonB("partitioning").isEmpty) + assert(row.getString("author").isEmpty) + assert(!queryResult.hasNext) + } + } + + test("Ancestor Partitioning not found") { + + table(partitioningsTable).insert(add("partitioning", partitioning5).add("created_by", "NO_Ancestor")) + + val partId5: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning5, "id_partitioning").get.get + + function(createFlowFn) + .setParam("i_fk_partitioning", partId5) + .setParam("i_by_user", "Grandpa") + .execute { queryResult => + flowIdOfPartitioning1 = queryResult.next().getLong("id_flow").get + } + + function(getAncestorsFn) + .setParam("i_partitioning_id", partId5) + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(41)) + assert(row.getString("status_text").contains("Ancestor Partitioning not found")) + assert(row.getJsonB("ancestor_id").isEmpty) + assert(row.getJsonB("partitioning").isEmpty) + assert(row.getString("author").isEmpty) + assert(!queryResult.hasNext) + } + } +} diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMainFlowIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMainFlowIntegrationTests.scala index 2042cf146..3df4f733d 100644 --- a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMainFlowIntegrationTests.scala +++ b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMainFlowIntegrationTests.scala @@ -127,6 +127,8 @@ class GetPartitioningMainFlowIntegrationTests extends DBTestSuite { assert(row.getLong("id_flow").contains(1L)) assert(row.getString("flow_name").contains("Flow1")) assert(row.getString("created_by").contains("Sirius")) + print(row.getString("flow_name")) + assert(!queryResult.hasNext) } diff --git a/project/plugins.sbt b/project/plugins.sbt index 801174158..8a3070155 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -33,7 +33,7 @@ addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.2.0") lazy val ow2Version = "9.5" lazy val jacocoVersion = "0.8.11-absa.1" -def jacocoUrl(artifactName: String): String = s"https://github.com/AbsaOSS/jacoco/releases/download/$jacocoVersion/org.jacoco.$artifactName-$jacocoVersion.jar" +def jacocoUrl(artifactName: String): String = s"file:///C:/Users/ABLL526/Documents/GitHub/Jar-Store2/org.jacoco.$artifactName-$jacocoVersion.jar" def ow2Url(artifactName: String): String = s"https://repo1.maven.org/maven2/org/ow2/asm/$artifactName/$ow2Version/$artifactName-$ow2Version.jar" addSbtPlugin("com.jsuereth" %% "scala-arm" % "2.0" from "https://repo1.maven.org/maven2/com/jsuereth/scala-arm_2.11/2.0/scala-arm_2.11-2.0.jar") @@ -46,4 +46,4 @@ addSbtPlugin("org.ow2.asm" % "asm" % ow2Version from ow2Url("asm")) addSbtPlugin("org.ow2.asm" % "asm-commons" % ow2Version from ow2Url("asm-commons")) addSbtPlugin("org.ow2.asm" % "asm-tree" % ow2Version from ow2Url("asm-tree")) -addSbtPlugin("za.co.absa.sbt" % "sbt-jacoco" % "3.4.1-absa.4" from "https://github.com/AbsaOSS/sbt-jacoco/releases/download/3.4.1-absa.4/sbt-jacoco-3.4.1-absa.4.jar") +addSbtPlugin("za.co.absa.sbt" % "sbt-jacoco" % "3.4.1-absa.4" from "file:///C:/Users/ABLL526/Documents/GitHub/Jar-Store2/sbt-jacoco-3.4.1-absa.3.jar") diff --git a/publish.sbt b/publish.sbt index 486192d52..9bd2dadf2 100644 --- a/publish.sbt +++ b/publish.sbt @@ -58,6 +58,12 @@ ThisBuild / developers := List( email = "tebalelo.sekhula@absa.africa", url = url("https://github.com/TebaleloS") ), + Developer( + id = "ABLL526", + name = "Liam Leibrandt", + email = "liam.leibrandt@absa.africa", + url = url("https://github.com/ABLL526") + ), Developer( id = "salamonpavel", name = "Pavel Salamon", diff --git a/server/src/main/scala/za/co/absa/atum/server/Main.scala b/server/src/main/scala/za/co/absa/atum/server/Main.scala index c4ac42381..548bbad26 100644 --- a/server/src/main/scala/za/co/absa/atum/server/Main.scala +++ b/server/src/main/scala/za/co/absa/atum/server/Main.scala @@ -66,6 +66,7 @@ object Main extends ZIOAppDefault with Server { GetPartitioning.layer, GetFlowPartitionings.layer, GetPartitioningMainFlow.layer, + GetAncestors.layer, PostgresDatabaseProvider.layer, TransactorProvider.layer, AwsSecretsProviderImpl.layer, diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala index 059b52bec..e82e52151 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala @@ -60,4 +60,11 @@ trait PartitioningController { def getPartitioningMainFlow( partitioningId: Long ): IO[ErrorResponse, SingleSuccessResponse[FlowDTO]] + + def getAncestors( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long] + ): IO[ErrorResponse, PaginatedResponse[PartitioningWithIdDTO]] + } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala index 33c842710..7cc4e0151 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala @@ -145,6 +145,20 @@ class PartitioningControllerImpl(partitioningService: PartitioningService) ) } + override def getAncestors( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long] + ): IO[ErrorResponse, PaginatedResponse[PartitioningWithIdDTO]] = { + mapToPaginatedResponse( + limit.get, + offset.get, + serviceCall[PaginatedResult[PartitioningWithIdDTO], PaginatedResult[PartitioningWithIdDTO]]( + partitioningService.getAncestors(partitioningId, limit, offset) + ) + ) + } + } object PartitioningControllerImpl { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetAncestors.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetAncestors.scala new file mode 100644 index 000000000..d0790c3b1 --- /dev/null +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetAncestors.scala @@ -0,0 +1,86 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.database.runs.functions + +import doobie.implicits.toSqlInterpolator +import io.circe.{DecodingFailure, Json} +import za.co.absa.atum.model.dto.{PartitionDTO, PartitioningWithIdDTO} +import za.co.absa.atum.server.api.database.PostgresDatabaseProvider +import za.co.absa.atum.server.api.database.runs.Runs +import za.co.absa.atum.server.api.database.runs.functions.GetAncestors._ +import za.co.absa.atum.server.model.PartitioningForDB +import za.co.absa.db.fadb.DBSchema +import za.co.absa.db.fadb.doobie.DoobieEngine +import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWithAggStatus +import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator +import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling +import zio.{Task, URLayer, ZIO, ZLayer} + +import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbGet + +import scala.annotation.tailrec + +class GetAncestors(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) + extends DoobieMultipleResultFunctionWithAggStatus[GetAncestorsArgs, Option[ + GetAncestorsResult + ], Task](args => + Seq( + fr"${args.partitioningId}", + fr"${args.limit}", + fr"${args.offset}" + ) + ) + with StandardStatusHandling + with ByFirstErrorStatusAggregator { + + override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq("ancestor_id", "partitioning", "author", "has_more") +} + +object GetAncestors { + case class GetAncestorsArgs(partitioningId: Long, limit: Option[Int], offset: Option[Long]) + case class GetAncestorsResult(ancestor_id: Long, partitioningJson: Json, author: String, hasMore: Boolean) + + object GetAncestorsResult { + + @tailrec def resultsToPartitioningWithIdDTOs( + results: Seq[GetAncestorsResult], + acc: Seq[PartitioningWithIdDTO] + ): Either[DecodingFailure, Seq[PartitioningWithIdDTO]] = { + if (results.isEmpty) Right(acc) + else { + val head = results.head + val tail = results.tail + val decodingResult = head.partitioningJson.as[PartitioningForDB] + decodingResult match { + case Left(decodingFailure) => Left(decodingFailure) + case Right(partitioningForDB) => + val partitioningDTO = partitioningForDB.keys.map { key => + PartitionDTO(key, partitioningForDB.keysToValuesMap(key)) + } + resultsToPartitioningWithIdDTOs(tail, acc :+ PartitioningWithIdDTO(head.ancestor_id, partitioningDTO, head.author)) + } + } + } + + } + + val layer: URLayer[PostgresDatabaseProvider, GetAncestors] = ZLayer { + for { + dbProvider <- ZIO.service[PostgresDatabaseProvider] + } yield new GetAncestors()(Runs, dbProvider.dbEngine) + } +} diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala index 66a4efad9..0b4c726f6 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala @@ -173,6 +173,19 @@ trait Endpoints extends BaseEndpoints { .errorOutVariantPrepend(errorInDataOneOfVariant) } + protected val getAncestorsEndpointV2 + : PublicEndpoint[(Long, Option[Int], Option[Long]), ErrorResponse, PaginatedResponse[ + PartitioningWithIdDTO + ], Any] = { + apiV2.get + .in(V2Paths.Partitionings / path[Long]("partitioningId") / V2Paths.Partitionings) + .in(query[Option[Int]]("limit").default(Some(10)).validateOption(Validator.inRange(1, 1000))) + .in(query[Option[Long]]("offset").default(Some(0L)).validateOption(Validator.min(0L))) + .out(statusCode(StatusCode.Ok)) + .out(jsonBody[PaginatedResponse[PartitioningWithIdDTO]]) + .errorOutVariantPrepend(notFoundErrorOneOfVariant) + } + protected val zioMetricsEndpoint: PublicEndpoint[Unit, Unit, String, Any] = { endpoint.get.in(ZioMetrics).out(stringBody) } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala index 825e3ce1f..0069a7b0e 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala @@ -103,6 +103,16 @@ trait Routes extends Endpoints with ServerOptions { PartitioningController.getFlowPartitionings(flowId, limit, offset) } ), + createServerEndpoint[ + (Long, Option[Int], Option[Long]), + ErrorResponse, + PaginatedResponse[PartitioningWithIdDTO] + ]( + getAncestorsEndpointV2, + { case (partitioningId: Long, limit: Option[Int], offset: Option[Long]) => + PartitioningController.getAncestors(partitioningId, limit, offset) + } + ), createServerEndpoint(healthEndpoint, (_: Unit) => ZIO.unit) ) ZHttp4sServerInterpreter[HttpEnv.Env](http4sServerOptions(metricsInterceptorOption)).from(endpoints).toRoutes diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala index 45023a4e7..ecd109dab 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala @@ -57,5 +57,11 @@ trait PartitioningRepository { offset: Option[Long] ): IO[DatabaseError, PaginatedResult[PartitioningWithIdDTO]] + def getAncestors( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long] + ): IO[DatabaseError, PaginatedResult[PartitioningWithIdDTO]] + def getPartitioningMainFlow(partitioningId: Long): IO[DatabaseError, FlowDTO] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala index 1f2505375..26d9f8507 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala @@ -18,6 +18,7 @@ package za.co.absa.atum.server.api.repository import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings._ +import za.co.absa.atum.server.api.database.runs.functions.GetAncestors._ import za.co.absa.atum.server.api.database.runs.functions.CreateOrUpdateAdditionalData.CreateOrUpdateAdditionalDataArgs import za.co.absa.atum.server.api.database.runs.functions._ import za.co.absa.atum.server.api.database.flows.functions._ @@ -39,6 +40,7 @@ class PartitioningRepositoryImpl ( getPartitioningMeasuresByIdFn: GetPartitioningMeasuresById, getPartitioningFn: GetPartitioning, getFlowPartitioningsFn: GetFlowPartitionings, + getAncestorsFn: GetAncestors, getPartitioningMainFlowFn: GetPartitioningMainFlow ) extends PartitioningRepository with BaseRepository { @@ -158,6 +160,28 @@ class PartitioningRepositoryImpl ( } } + override def getAncestors( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long] + ): IO[DatabaseError, PaginatedResult[PartitioningWithIdDTO]] = { + dbMultipleResultCallWithAggregatedStatus( + getAncestorsFn(GetAncestorsArgs(partitioningId, limit, offset)), + "getAncestors" + ).map(_.flatten) + .flatMap { partitioningResults => + ZIO + .fromEither(GetAncestorsResult.resultsToPartitioningWithIdDTOs(partitioningResults, Seq.empty)) + .mapBoth( + error => GeneralDatabaseError(error.getMessage), + partitionings => { + if (partitioningResults.nonEmpty && partitioningResults.head.hasMore) ResultHasMore(partitionings) + else ResultNoMore(partitionings) + } + ) + } + } + override def getPartitioningMainFlow(partitioningId: Long): IO[DatabaseError, FlowDTO] = { dbSingleResultCallWithStatus( getPartitioningMainFlowFn(partitioningId), @@ -182,6 +206,7 @@ object PartitioningRepositoryImpl { with GetPartitioningMeasuresById with GetPartitioning with GetFlowPartitionings + with GetAncestors with GetPartitioningMainFlow, PartitioningRepository ] = ZLayer { @@ -197,6 +222,7 @@ object PartitioningRepositoryImpl { getPartitioning <- ZIO.service[GetPartitioning] getFlowPartitionings <- ZIO.service[GetFlowPartitionings] getPartitioningMainFlow <- ZIO.service[GetPartitioningMainFlow] + getAncestors <- ZIO.service[GetAncestors] } yield new PartitioningRepositoryImpl( createPartitioningIfNotExists, createPartitioning, @@ -208,6 +234,7 @@ object PartitioningRepositoryImpl { getPartitioningMeasuresById, getPartitioning, getFlowPartitionings, + getAncestors, getPartitioningMainFlow ) } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala index 78aeb4ad2..f583e1e83 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala @@ -56,4 +56,10 @@ trait PartitioningService { ): IO[ServiceError, PaginatedResult[PartitioningWithIdDTO]] def getPartitioningMainFlow(partitioningId: Long): IO[ServiceError, FlowDTO] + + def getAncestors( + flowId: Long, + limit: Option[Int], + offset: Option[Long] + ): IO[ServiceError, PaginatedResult[PartitioningWithIdDTO]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala index 3d9a46c26..5e16d4e6b 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala @@ -113,6 +113,17 @@ class PartitioningServiceImpl(partitioningRepository: PartitioningRepository) ) } + override def getAncestors( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long] + ): IO[ServiceError, PaginatedResult[PartitioningWithIdDTO]] = { + repositoryCall( + partitioningRepository.getAncestors(partitioningId, limit, offset), + "getAncestors" + ) + } + } object PartitioningServiceImpl { diff --git a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala index 7ac7ed79a..436ccd5fe 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala @@ -22,6 +22,7 @@ import za.co.absa.atum.model.dto.MeasureResultDTO.TypedValue import za.co.absa.atum.model.dto._ import za.co.absa.atum.model.{ResultValueType, dto} import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings.GetFlowPartitioningsResult +import za.co.absa.atum.server.api.database.runs.functions.GetAncestors.GetAncestorsResult import za.co.absa.atum.server.model.{CheckpointFromDB, CheckpointItemFromDB, MeasureFromDB, PartitioningFromDB} import java.time.ZonedDateTime @@ -85,6 +86,27 @@ trait TestData { hasMore = true ) + protected val getAncestorsResult1: GetAncestorsResult = GetAncestorsResult( + ancestor_id = 1111L, + partitioningJson = partitioningAsJson, + author = "author", + hasMore = false + ) + + protected val getAncestorsResult2: GetAncestorsResult = GetAncestorsResult( + ancestor_id = 1111L, + partitioningJson = partitioningAsJson, + author = "author", + hasMore = true + ) + + protected val getAncestorsResult3: GetAncestorsResult = GetAncestorsResult( + ancestor_id = 3333L, + partitioningJson = partitioningAsJson, + author = "author", + hasMore = false + ) + // Partitioning with ID DTO protected val partitioningWithIdDTO1: PartitioningWithIdDTO = PartitioningWithIdDTO( id = partitioningFromDB1.id, diff --git a/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala index de977bf87..8f33408e0 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala @@ -92,6 +92,13 @@ object PartitioningControllerUnitTests extends ZIOSpecDefault with TestData { when(partitioningServiceMock.getPartitioningMainFlow(999L)) .thenReturn(ZIO.fail(GeneralServiceError("boom!"))) + when(partitioningServiceMock.getAncestors(1111L, Some(1), Some(0))) + .thenReturn(ZIO.succeed(ResultHasMore(Seq(partitioningWithIdDTO1)))) + when(partitioningServiceMock.getAncestors(8888L, Some(1), Some(0))) + .thenReturn(ZIO.fail(GeneralServiceError("boom!"))) + when(partitioningServiceMock.getAncestors(9999L, Some(1), Some(0))) + .thenReturn(ZIO.fail(NotFoundServiceError("Child Partitioning not found"))) + private val partitioningServiceMockLayer = ZLayer.succeed(partitioningServiceMock) override def spec: Spec[TestEnvironment with Scope, Any] = { @@ -231,6 +238,25 @@ object PartitioningControllerUnitTests extends ZIOSpecDefault with TestData { ) } ), + suite("GetAncestorsSuite")( + test("Returns expected PaginatedResponse[PartitioningWithIdDTO] with more data available") { + for { + result <- PartitioningController.getAncestors(1111L, Some(1), Some(0)) + expected = PaginatedResponse(Seq(partitioningWithIdDTO1), Pagination(1, 0L, hasMore = true), uuid1) + actual = result.copy(requestId = uuid1) + } yield assertTrue(actual == expected) + }, + test("Returns expected InternalServerErrorResponse when service call fails with GeneralServiceError") { + assertZIO(PartitioningController.getAncestors(8888L, Some(1), Some(0)).exit)( + failsWithA[InternalServerErrorResponse] + ) + }, + test("Returns expected NotFoundErrorResponse when service call fails with NotFoundServiceError") { + assertZIO(PartitioningController.getAncestors(9999L, Some(1), Some(0)).exit)( + failsWithA[NotFoundErrorResponse] + ) + } + ), suite("GetPartitioningMainFlowSuite")( test("Returns expected FlowDTO") { for { diff --git a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetAncestorsIntegrationTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetAncestorsIntegrationTests.scala new file mode 100644 index 000000000..631fe095b --- /dev/null +++ b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetAncestorsIntegrationTests.scala @@ -0,0 +1,47 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.database.runs.functions + +import za.co.absa.atum.server.ConfigProviderTest +import za.co.absa.atum.server.api.TestTransactorProvider +import za.co.absa.atum.server.api.database.PostgresDatabaseProvider +import za.co.absa.atum.server.api.database.runs.functions.GetAncestors.GetAncestorsArgs +import za.co.absa.db.fadb.exceptions.DataNotFoundException +import za.co.absa.db.fadb.status.FunctionStatus +import zio.{Scope, ZIO} +import zio.test.{Spec, TestEnvironment, assertTrue} +import zio.interop.catz.asyncInstance + +object GetAncestorsIntegrationTests extends ConfigProviderTest { + + override def spec: Spec[Unit with TestEnvironment with Scope, Any] = { + suite("GetAncestorsIntegrationTests")( + test("Retrieve Ancestors' partitions for a given id") { + val partitioningID: Long = 1111L + for { + getAncestors <- ZIO.service[GetAncestors] + result <- getAncestors(GetAncestorsArgs(partitioningID, None, None)) + + } yield assertTrue(result == Left(DataNotFoundException(FunctionStatus(41, "Child Partitioning not found")))) + } + ) + }.provide( + GetAncestors.layer, + PostgresDatabaseProvider.layer, + TestTransactorProvider.layerWithRollback + ) +} diff --git a/server/src/test/scala/za/co/absa/atum/server/api/http/GetAncestorsEndpointV2UnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/http/GetAncestorsEndpointV2UnitTests.scala new file mode 100644 index 000000000..714ac4ac6 --- /dev/null +++ b/server/src/test/scala/za/co/absa/atum/server/api/http/GetAncestorsEndpointV2UnitTests.scala @@ -0,0 +1,118 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.http + +import org.mockito.Mockito.{mock, when} +import sttp.client3.circe.asJson +import sttp.client3.testing.SttpBackendStub +import sttp.client3.{UriContext, basicRequest} +import sttp.model.StatusCode +import sttp.tapir.server.stub.TapirStubInterpreter +import sttp.tapir.ztapir.{RIOMonadError, RichZEndpoint} +import za.co.absa.atum.model.dto.PartitioningWithIdDTO +import za.co.absa.atum.server.api.TestData +import za.co.absa.atum.server.api.controller.PartitioningController +import za.co.absa.atum.server.model.{InternalServerErrorResponse, NotFoundErrorResponse, Pagination} +import za.co.absa.atum.server.model.SuccessResponse.PaginatedResponse +import zio.test.Assertion.equalTo +import zio.test.{Spec, TestEnvironment, ZIOSpecDefault, assertZIO} +import zio.{Scope, ZIO, ZLayer} + +object GetAncestorsEndpointV2UnitTests extends ZIOSpecDefault with Endpoints with TestData { + + private val partitioningControllerMock = mock(classOf[PartitioningController]) + + when(partitioningControllerMock.getAncestors(1111L, Some(1), Some(0))) + .thenReturn( + ZIO.succeed( + PaginatedResponse(Seq.empty, Pagination(1, 0, hasMore = true), uuid1) + ) + ) + when(partitioningControllerMock.getAncestors(8888L, Some(1), Some(0))) + .thenReturn( + ZIO.fail( + NotFoundErrorResponse("Child Partitioning not found") + ) + ) + when(partitioningControllerMock.getAncestors(9999L, Some(1), Some(0))) + .thenReturn( + ZIO.fail( + InternalServerErrorResponse("internal server error") + ) + ) + + private val partitioningControllerMockLayer = ZLayer.succeed(partitioningControllerMock) + + private val getAncestorsServerEndpoint = + getAncestorsEndpointV2.zServerLogic({ case (partitioningId: Long, limit: Option[Int], offset: Option[Long]) => + PartitioningController.getAncestors(partitioningId, limit: Option[Int], offset: Option[Long]) + }) + + override def spec: Spec[TestEnvironment with Scope, Any] = { + val backendStub = TapirStubInterpreter(SttpBackendStub.apply(new RIOMonadError[PartitioningController])) + .whenServerEndpoint(getAncestorsServerEndpoint) + .thenRunLogic() + .backend() + + suite("GetAncestorsEndpointSuite")( + test("Returns an expected PaginatedResponse") { + val request = basicRequest + .get(uri"https://test.com/api/v2/partitionings/1111/partitionings?limit=1&offset=0") + .response(asJson[PaginatedResponse[PartitioningWithIdDTO]]) + + val response = request + .send(backendStub) + + val body = response.map(_.body) + val statusCode = response.map(_.code) + + assertZIO(body <&> statusCode)( + equalTo( + Right(PaginatedResponse(Seq.empty[PartitioningWithIdDTO], Pagination(1, 0, hasMore = true), uuid1)), + StatusCode.Ok + ) + ) + }, + test("Returns a NotFoundErrorResponse") { + val request = basicRequest + .get(uri"https://test.com/api/v2/partitionings/8888/partitionings?limit=1&offset=0") + .response(asJson[NotFoundErrorResponse]) + + val response = request + .send(backendStub) + + val statusCode = response.map(_.code) + + assertZIO(statusCode)(equalTo(StatusCode.NotFound)) + }, + test("Returns an InternalServerErrorResponse") { + val request = basicRequest + .get(uri"https://test.com/api/v2/partitionings/9999/partitionings?limit=1&offset=0") + .response(asJson[InternalServerErrorResponse]) + + val response = request + .send(backendStub) + + val statusCode = response.map(_.code) + + assertZIO(statusCode)(equalTo(StatusCode.InternalServerError)) + } + ) + + }.provide(partitioningControllerMockLayer) + +} diff --git a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala index 9ee0e0c21..b3a0b43e9 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala @@ -21,6 +21,8 @@ import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataItemDTO, Part import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings.GetFlowPartitioningsArgs +import za.co.absa.atum.server.api.database.runs.functions.GetAncestors +import za.co.absa.atum.server.api.database.runs.functions.GetAncestors.{GetAncestorsArgs, GetAncestorsResult} import za.co.absa.atum.server.api.database.runs.functions.CreateOrUpdateAdditionalData.CreateOrUpdateAdditionalDataArgs import za.co.absa.atum.server.api.database.runs.functions._ import za.co.absa.atum.server.api.exception.DatabaseError @@ -115,6 +117,18 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { private val getPartitioningByIdByIdMockLayer = ZLayer.succeed(getPartitioningByIdByIdMock) + // Get Ancestors By Id Mocks + private val getAncestorsMock = mock(classOf[GetAncestors]) + + when(getAncestorsMock.apply(GetAncestorsArgs(2222L, Some(1), Some(1L))) + ).thenReturn(ZIO.right(Seq(Row(FunctionStatus(11, "OK"), Some(getAncestorsResult1))))) + when(getAncestorsMock.apply(GetAncestorsArgs(9999L, Some(1), Some(1L)))) + .thenReturn(ZIO.left(DataNotFoundException(FunctionStatus(41, "Child Partitioning not found")))) + when(getAncestorsMock.apply(GetAncestorsArgs(8888L, Some(1), Some(1L))) + ).thenReturn(ZIO.fail(new Exception("boom!"))) + + private val getAncestorsMockLayer = ZLayer.succeed(getAncestorsMock) + // GetPartitioningAdditionalDataV2 private val getPartitioningAdditionalDataV2Mock = mock(classOf[GetPartitioningAdditionalDataV2]) @@ -315,6 +329,23 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { ) } ), + suite("GetAncestorsSuite")( + test("Returns expected ResultNoMore[PartitioningWithIdDTO]") { + for { + result <- PartitioningRepository.getAncestors(2222L, Some(1), Some(1L)) + } yield assertTrue(result == ResultNoMore(Seq(partitioningWithIdDTO1))) + }, + test("Returns expected NotFoundDatabaseError") { + assertZIO(PartitioningRepository.getAncestors(9999L, Some(1), Some(1L)).exit)( + failsWithA[NotFoundDatabaseError] + ) + }, + test("Returns expected GeneralDatabaseError") { + assertZIO(PartitioningRepository.getAncestors(8888L, Some(1), Some(1L)).exit)( + failsWithA[GeneralDatabaseError] + ) + } + ), suite("GetPartitioningMeasuresByIdSuite")( test("Returns expected Seq") { for { @@ -407,6 +438,7 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { getPartitioningAdditionalDataMockLayer, createOrUpdateAdditionalDataMockLayer, getPartitioningByIdByIdMockLayer, + getAncestorsMockLayer, getPartitioningAdditionalDataV2MockLayer, getPartitioningMockLayer, getPartitioningMeasuresV2MockLayer, diff --git a/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala index fb2e02559..965be3fa1 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala @@ -73,6 +73,15 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { when(partitioningRepositoryMock.getPartitioningById(8888L)) .thenReturn(ZIO.fail(NotFoundDatabaseError("Partitioning not found"))) + when(partitioningRepositoryMock.getAncestors(1111L, Some(1), Some(1L))) + .thenReturn(ZIO.succeed(ResultHasMore(Seq(partitioningWithIdDTO1)))) + when(partitioningRepositoryMock.getAncestors(2222L, Some(1), Some(1L))) + .thenReturn(ZIO.succeed(ResultNoMore(Seq(partitioningWithIdDTO1)))) + when(partitioningRepositoryMock.getAncestors(8888L, Some(1), Some(1L))) + .thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) + when(partitioningRepositoryMock.getAncestors(9999L, Some(1), Some(1L))) + .thenReturn(ZIO.fail(NotFoundDatabaseError("Child Partitioning not found"))) + when(partitioningRepositoryMock.getPartitioningMeasuresById(1L)) .thenReturn(ZIO.succeed(Seq(measureDTO1, measureDTO2))) when(partitioningRepositoryMock.getPartitioningMeasuresById(2L)) @@ -226,6 +235,28 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { ) } ), + suite("GetAncestorsSuite")( + test("Returns expected Right with ResultHasMore[PartitioningWithIdDTO]") { + for { + result <- PartitioningService.getAncestors(1111L, Some(1), Some(1L)) + } yield assertTrue(result == ResultHasMore(Seq(partitioningWithIdDTO1))) + }, + test("Returns expected Right with ResultNoMore[PartitioningWithIdDTO]") { + for { + result <- PartitioningService.getAncestors(2222L, Some(1), Some(1L)) + } yield assertTrue(result == ResultNoMore(Seq(partitioningWithIdDTO1))) + }, + test("Returns expected GeneralServiceError when database error occurs") { + assertZIO(PartitioningService.getAncestors(8888L, Some(1), Some(1L)).exit)( + failsWithA[GeneralServiceError] + ) + }, + test("Returns expected NotFoundServiceError when child partition does not exist") { + assertZIO(PartitioningService.getAncestors(9999L, Some(1), Some(1L)).exit)( + failsWithA[NotFoundServiceError] + ) + } + ), suite("GetPartitioningMeasuresByIdSuite")( test("Returns expected Right with Seq[MeasureDTO]") { for { From 97c45811f28484b8c53782f6dbdfcc7882be7d22 Mon Sep 17 00:00:00 2001 From: ABLL526 Date: Wed, 13 Nov 2024 15:49:38 +0200 Subject: [PATCH 2/2] Updated Commit Updated commit, updated tests. --- .../co/absa/atum/server/api/http/Routes.scala | 36 +++-- .../za/co/absa/atum/server/api/TestData.scala | 9 +- .../GetAncestorsEndpointV2UnitTests.scala | 4 +- .../PartitioningRepositoryUnitTests.scala | 123 ++++++++---------- 4 files changed, 80 insertions(+), 92 deletions(-) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala index 0069a7b0e..d1ceba6a0 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala @@ -25,10 +25,10 @@ import sttp.tapir.server.interceptor.metrics.MetricsRequestInterceptor import sttp.tapir.swagger.bundle.SwaggerInterpreter import sttp.tapir.ztapir._ import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataPatchDTO, CheckpointV2DTO, PartitioningWithIdDTO} +import za.co.absa.atum.model.envelopes.{ErrorResponse, StatusResponse} import za.co.absa.atum.server.api.controller.{CheckpointController, FlowController, PartitioningController} import za.co.absa.atum.server.config.{HttpMonitoringConfig, JvmMonitoringConfig} -import za.co.absa.atum.server.model.ErrorResponse -import za.co.absa.atum.server.model.SuccessResponse._ +import za.co.absa.atum.model.envelopes.SuccessResponse._ import zio._ import zio.interop.catz._ import zio.metrics.connectors.prometheus.PrometheusPublisher @@ -57,7 +57,7 @@ trait Routes extends Endpoints with ServerOptions { createServerEndpoint(postPartitioningEndpointV2, PartitioningController.postPartitioning), createServerEndpoint( getPartitioningAdditionalDataEndpointV2, - PartitioningController.getPartitioningAdditionalDataV2 + PartitioningController.getPartitioningAdditionalData ), createServerEndpoint[ (Long, AdditionalDataPatchDTO), @@ -90,6 +90,14 @@ trait Routes extends Endpoints with ServerOptions { CheckpointController.getPartitioningCheckpoints(partitioningId, limit, offset, checkpointName) } ), + createServerEndpoint[ + (Long, Option[Int], Option[Long], Option[String]), + ErrorResponse, + PaginatedResponse[CheckpointV2DTO] + ](getFlowCheckpointsEndpointV2, { + case (flowId: Long, limit: Option[Int], offset: Option[Long], checkpointName: Option[String]) => + FlowController.getFlowCheckpoints(flowId, limit, offset, checkpointName) + }), createServerEndpoint(getPartitioningByIdEndpointV2, PartitioningController.getPartitioningByIdV2), createServerEndpoint(getPartitioningMeasuresEndpointV2, PartitioningController.getPartitioningMeasuresV2), createServerEndpoint(getPartitioningMainFlowEndpointV2, PartitioningController.getPartitioningMainFlow), @@ -113,7 +121,7 @@ trait Routes extends Endpoints with ServerOptions { PartitioningController.getAncestors(partitioningId, limit, offset) } ), - createServerEndpoint(healthEndpoint, (_: Unit) => ZIO.unit) + createServerEndpoint(healthEndpoint, (_: Unit) => ZIO.succeed(StatusResponse.up)) ) ZHttp4sServerInterpreter[HttpEnv.Env](http4sServerOptions(metricsInterceptorOption)).from(endpoints).toRoutes } @@ -127,14 +135,16 @@ trait Routes extends Endpoints with ServerOptions { // postCheckpointEndpointV2, createPartitioningEndpointV1, // postPartitioningEndpointV2, - // patchPartitioningAdditionalDataEndpointV2, + patchPartitioningAdditionalDataEndpointV2, // getPartitioningCheckpointsEndpointV2, // getPartitioningCheckpointEndpointV2, // getPartitioningMeasuresEndpointV2, - // getPartitioningEndpointV2, + getPartitioningEndpointV2, // getPartitioningMeasuresEndpointV2, // getFlowPartitioningsEndpointV2, - // getPartitioningMainFlowEndpointV2 + // getPartitioningMainFlowEndpointV2, + // getFlowCheckpointsEndpointV2, + healthEndpoint ) ZHttp4sServerInterpreter[HttpEnv.Env](http4sServerOptions(None)) .from(SwaggerInterpreter().fromEndpoints[HttpEnv.F](endpoints, "Atum API", "1.0")) @@ -149,16 +159,16 @@ trait Routes extends Endpoints with ServerOptions { } private def createServerEndpoint[I, E, O]( - endpoint: PublicEndpoint[I, E, O, Any], - logic: I => ZIO[HttpEnv.Env, E, O] - ): ZServerEndpoint[HttpEnv.Env, Any] = { + endpoint: PublicEndpoint[I, E, O, Any], + logic: I => ZIO[HttpEnv.Env, E, O] + ): ZServerEndpoint[HttpEnv.Env, Any] = { endpoint.zServerLogic(logic).widen[HttpEnv.Env] } protected def allRoutes( - httpMonitoringConfig: HttpMonitoringConfig, - jvmMonitoringConfig: JvmMonitoringConfig - ): HttpRoutes[HttpEnv.F] = { + httpMonitoringConfig: HttpMonitoringConfig, + jvmMonitoringConfig: JvmMonitoringConfig + ): HttpRoutes[HttpEnv.F] = { createAllServerRoutes(httpMonitoringConfig) <+> createSwaggerRoutes <+> (if (httpMonitoringConfig.enabled) http4sMetricsRoutes else HttpRoutes.empty[HttpEnv.F]) <+> diff --git a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala index 6a7050e98..33f88ce55 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala @@ -92,7 +92,7 @@ trait TestData { ) protected val getAncestorsResult1: GetAncestorsResult = GetAncestorsResult( - ancestor_id = 1111L, + ancestor_id = 1L, partitioningJson = partitioningAsJson, author = "author", hasMore = false @@ -105,13 +105,6 @@ trait TestData { hasMore = true ) - protected val getAncestorsResult3: GetAncestorsResult = GetAncestorsResult( - ancestor_id = 3333L, - partitioningJson = partitioningAsJson, - author = "author", - hasMore = false - ) - // Partitioning with ID DTO protected val partitioningWithIdDTO1: PartitioningWithIdDTO = PartitioningWithIdDTO( id = partitioningFromDB1.id, diff --git a/server/src/test/scala/za/co/absa/atum/server/api/http/GetAncestorsEndpointV2UnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/http/GetAncestorsEndpointV2UnitTests.scala index 714ac4ac6..d467befd6 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/http/GetAncestorsEndpointV2UnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/http/GetAncestorsEndpointV2UnitTests.scala @@ -24,10 +24,10 @@ import sttp.model.StatusCode import sttp.tapir.server.stub.TapirStubInterpreter import sttp.tapir.ztapir.{RIOMonadError, RichZEndpoint} import za.co.absa.atum.model.dto.PartitioningWithIdDTO +import za.co.absa.atum.model.envelopes.{InternalServerErrorResponse, NotFoundErrorResponse, Pagination} import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.controller.PartitioningController -import za.co.absa.atum.server.model.{InternalServerErrorResponse, NotFoundErrorResponse, Pagination} -import za.co.absa.atum.server.model.SuccessResponse.PaginatedResponse +import za.co.absa.atum.model.envelopes.SuccessResponse.PaginatedResponse import zio.test.Assertion.equalTo import zio.test.{Spec, TestEnvironment, ZIOSpecDefault, assertZIO} import zio.{Scope, ZIO, ZLayer} diff --git a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala index b3a0b43e9..104fdc460 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala @@ -22,7 +22,7 @@ import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings.GetFlowPartitioningsArgs import za.co.absa.atum.server.api.database.runs.functions.GetAncestors -import za.co.absa.atum.server.api.database.runs.functions.GetAncestors.{GetAncestorsArgs, GetAncestorsResult} +import za.co.absa.atum.server.api.database.runs.functions.GetAncestors.GetAncestorsArgs import za.co.absa.atum.server.api.database.runs.functions.CreateOrUpdateAdditionalData.CreateOrUpdateAdditionalDataArgs import za.co.absa.atum.server.api.database.runs.functions._ import za.co.absa.atum.server.api.exception.DatabaseError @@ -34,7 +34,7 @@ import zio._ import zio.interop.catz.asyncInstance import zio.test.Assertion.failsWithA import zio.test._ -import za.co.absa.atum.server.model.{AdditionalDataFromDB, AdditionalDataItemFromDB, PartitioningForDB} +import za.co.absa.atum.server.model.{AdditionalDataItemFromDB, PartitioningForDB} object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { @@ -96,19 +96,10 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { private val getPartitioningMeasuresMockLayer = ZLayer.succeed(getPartitioningMeasuresMock) - // Get Partitioning Additional Data Mocks - private val getPartitioningAdditionalDataMock = mock(classOf[GetPartitioningAdditionalData]) - - when(getPartitioningAdditionalDataMock.apply(partitioningDTO1)) - .thenReturn(ZIO.right(Seq(Row(FunctionStatus(0, "success"), AdditionalDataFromDB(Some("key"), Some("value")))))) - when(getPartitioningAdditionalDataMock.apply(partitioningDTO2)).thenReturn(ZIO.fail(new Exception("boom!"))) - - private val getPartitioningAdditionalDataMockLayer = ZLayer.succeed(getPartitioningAdditionalDataMock) - // Get Partitioning By Id Mocks private val getPartitioningByIdByIdMock = mock(classOf[GetPartitioningById]) - when(getPartitioningByIdByIdMock.apply(1111L)) + when(getPartitioningByIdByIdMock.apply(1L)) .thenReturn(ZIO.right(Row(FunctionStatus(11, "OK"), Some(partitioningFromDB1)))) when(getPartitioningByIdByIdMock.apply(9999L)) .thenReturn(ZIO.fail(DataNotFoundException(FunctionStatus(41, "Partitioning not found")))) @@ -117,30 +108,18 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { private val getPartitioningByIdByIdMockLayer = ZLayer.succeed(getPartitioningByIdByIdMock) - // Get Ancestors By Id Mocks - private val getAncestorsMock = mock(classOf[GetAncestors]) - - when(getAncestorsMock.apply(GetAncestorsArgs(2222L, Some(1), Some(1L))) - ).thenReturn(ZIO.right(Seq(Row(FunctionStatus(11, "OK"), Some(getAncestorsResult1))))) - when(getAncestorsMock.apply(GetAncestorsArgs(9999L, Some(1), Some(1L)))) - .thenReturn(ZIO.left(DataNotFoundException(FunctionStatus(41, "Child Partitioning not found")))) - when(getAncestorsMock.apply(GetAncestorsArgs(8888L, Some(1), Some(1L))) - ).thenReturn(ZIO.fail(new Exception("boom!"))) - - private val getAncestorsMockLayer = ZLayer.succeed(getAncestorsMock) - - // GetPartitioningAdditionalDataV2 - private val getPartitioningAdditionalDataV2Mock = mock(classOf[GetPartitioningAdditionalDataV2]) + // GetPartitioningAdditionalData + private val getPartitioningAdditionalDataMock = mock(classOf[GetPartitioningAdditionalData]) - when(getPartitioningAdditionalDataV2Mock.apply(1L)).thenReturn( + when(getPartitioningAdditionalDataMock.apply(1L)).thenReturn( ZIO.right(Seq(Row(FunctionStatus(0, "success"), Some(AdditionalDataItemFromDB("key", Some("value"), "author"))))) ) - when(getPartitioningAdditionalDataV2Mock.apply(2L)).thenReturn(ZIO.fail(new Exception("boom!"))) - when(getPartitioningAdditionalDataV2Mock.apply(3L)).thenReturn( + when(getPartitioningAdditionalDataMock.apply(2L)).thenReturn(ZIO.fail(new Exception("boom!"))) + when(getPartitioningAdditionalDataMock.apply(3L)).thenReturn( ZIO.left(DataNotFoundException(FunctionStatus(41, "not found"))) ) - private val getPartitioningAdditionalDataV2MockLayer = ZLayer.succeed(getPartitioningAdditionalDataV2Mock) + private val getPartitioningAdditionalDataMockLayer = ZLayer.succeed(getPartitioningAdditionalDataMock) // GetPartitioningMeasuresById private val getPartitioningMeasuresV2Mock = mock(classOf[GetPartitioningMeasuresById]) @@ -170,16 +149,30 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { private val getFlowPartitioningsMock = mock(classOf[GetFlowPartitionings]) when(getFlowPartitioningsMock.apply(GetFlowPartitioningsArgs(1L, Some(10), Some(0))) - ).thenReturn(ZIO.right(Seq(Row(FunctionStatus(11, "OK"), Some(getFlowPartitioningsResult1))))) + ).thenReturn(ZIO.right(Seq(Row(FunctionStatus(11, "OK"), Some(getFlowPartitioningsResult1))))) when(getFlowPartitioningsMock.apply(GetFlowPartitioningsArgs(2L, Some(10), Some(0))) ).thenReturn(ZIO.right(Seq(Row(FunctionStatus(11, "OK"), Some(getFlowPartitioningsResult2))))) when(getFlowPartitioningsMock.apply(GetFlowPartitioningsArgs(0L, None, None))) .thenReturn(ZIO.left(DataNotFoundException(FunctionStatus(41, "Flow not found")))) when(getFlowPartitioningsMock.apply(GetFlowPartitioningsArgs(3L, Some(10), Some(0))) - ).thenReturn(ZIO.fail(new Exception("boom!"))) + ).thenReturn(ZIO.fail(new Exception("boom!"))) private val getFlowPartitioningsMockLayer = ZLayer.succeed(getFlowPartitioningsMock) + // Get Ancestors By Id Mocks + private val getAncestorsMock = mock(classOf[GetAncestors]) + + when(getAncestorsMock.apply(GetAncestorsArgs(1L, Some(10), Some(0))) + ).thenReturn(ZIO.right(Seq(Row(FunctionStatus(11, "OK"), Some(getAncestorsResult1))))) + when(getAncestorsMock.apply(GetAncestorsArgs(1111L, Some(10), Some(0))) + ).thenReturn(ZIO.right(Seq(Row(FunctionStatus(11, "OK"), Some(getAncestorsResult2))))) + when(getAncestorsMock.apply(GetAncestorsArgs(9999L, Some(10), Some(0))) + ).thenReturn(ZIO.left(DataNotFoundException(FunctionStatus(41, "Child Partitioning not found")))) + when(getAncestorsMock.apply(GetAncestorsArgs(8888L, Some(10), Some(0))) + ).thenReturn(ZIO.fail(new Exception("boom!"))) + + private val getAncestorsMockLayer = ZLayer.succeed(getAncestorsMock) + // Create Partitioning Mocks private val getPartitioningMainFlowMock = mock(classOf[GetPartitioningMainFlow]) @@ -278,32 +271,20 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { } ), suite("GetPartitioningAdditionalDataSuite")( - test("Returns expected Right with Map") { - for { - result <- PartitioningRepository.getPartitioningAdditionalData(partitioningDTO1) - } yield assertTrue(result.get("key").contains(Some("value")) && result.size == 1) - }, - test("Returns expected Left with DatabaseError") { - assertZIO(PartitioningRepository.getPartitioningAdditionalData(partitioningDTO2).exit)( - failsWithA[DatabaseError] - ) - } - ), - suite("GetPartitioningAdditionalDataV2Suite")( test("Returns expected AdditionalDataDTO instance") { for { - result <- PartitioningRepository.getPartitioningAdditionalDataV2(1L) + result <- PartitioningRepository.getPartitioningAdditionalData(1L) } yield assertTrue( result == AdditionalDataDTO(Map.from(Seq("key" -> Some(AdditionalDataItemDTO(Some("value"), "author"))))) ) }, test("Returns expected DatabaseError") { - assertZIO(PartitioningRepository.getPartitioningAdditionalDataV2(2L).exit)( + assertZIO(PartitioningRepository.getPartitioningAdditionalData(2L).exit)( failsWithA[GeneralDatabaseError] ) }, test("Returns expected NotFoundDatabaseError") { - assertZIO(PartitioningRepository.getPartitioningAdditionalDataV2(3L).exit)( + assertZIO(PartitioningRepository.getPartitioningAdditionalData(3L).exit)( failsWithA[NotFoundDatabaseError] ) } @@ -311,7 +292,7 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { suite("GetPartitioningByIdSuite")( test("GetPartitioningById - Returns expected PartitioningWithIdDTO") { for { - result <- PartitioningRepository.getPartitioningById(1111L) + result <- PartitioningRepository.getPartitioningById(1L) } yield assertTrue(result == partitioningWithIdDTO1) }, test("GetPartitioningById - Returns expected DataNotFoundException") { @@ -329,23 +310,6 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { ) } ), - suite("GetAncestorsSuite")( - test("Returns expected ResultNoMore[PartitioningWithIdDTO]") { - for { - result <- PartitioningRepository.getAncestors(2222L, Some(1), Some(1L)) - } yield assertTrue(result == ResultNoMore(Seq(partitioningWithIdDTO1))) - }, - test("Returns expected NotFoundDatabaseError") { - assertZIO(PartitioningRepository.getAncestors(9999L, Some(1), Some(1L)).exit)( - failsWithA[NotFoundDatabaseError] - ) - }, - test("Returns expected GeneralDatabaseError") { - assertZIO(PartitioningRepository.getAncestors(8888L, Some(1), Some(1L)).exit)( - failsWithA[GeneralDatabaseError] - ) - } - ), suite("GetPartitioningMeasuresByIdSuite")( test("Returns expected Seq") { for { @@ -399,7 +363,7 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { test("Returns expected ResultHasMore[PartitioningWithIdDTO]") { for { result <- PartitioningRepository.getFlowPartitionings(2L, Some(10), Some(0)) - } yield assertTrue(result == ResultHasMore(Seq(partitioningWithIdDTO1))) + } yield assertTrue(result == ResultHasMore(Seq(partitioningWithIdDTO2))) }, test("Returns expected NotFoundDatabaseError") { assertZIO(PartitioningRepository.getFlowPartitionings(0L, None, None).exit)( @@ -412,6 +376,28 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { ) } ), + suite("GetAncestorsSuite")( + test("Returns expected ResultNoMore[PartitioningWithIdDTO]") { + for { + result <- PartitioningRepository.getAncestors(1L, Some(10), Some(0)) + } yield assertTrue(result == ResultNoMore(Seq(partitioningWithIdDTO1))) + }, + test("Returns expected ResultHasMore[PartitioningWithIdDTO]") { + for { + result <- PartitioningRepository.getAncestors(1111L, Some(10), Some(0)) + } yield assertTrue(result == ResultHasMore(Seq(partitioningWithIdDTO2))) + }, + test("Returns expected NotFoundDatabaseError") { + assertZIO(PartitioningRepository.getAncestors(9999L, Some(10), Some(0)).exit)( + failsWithA[NotFoundDatabaseError] + ) + }, + test("Returns expected GeneralDatabaseError") { + assertZIO(PartitioningRepository.getAncestors(8888L, Some(10), Some(0)).exit)( + failsWithA[GeneralDatabaseError] + ) + } + ), suite("GetPartitioningSuite")( test("GetPartitioning - Returns expected PartitioningWithIdDTO") { for { @@ -435,14 +421,13 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { createPartitioningIfNotExistsMockLayer, createPartitioningMockLayer, getPartitioningMeasuresMockLayer, - getPartitioningAdditionalDataMockLayer, createOrUpdateAdditionalDataMockLayer, getPartitioningByIdByIdMockLayer, - getAncestorsMockLayer, - getPartitioningAdditionalDataV2MockLayer, + getPartitioningAdditionalDataMockLayer, getPartitioningMockLayer, getPartitioningMeasuresV2MockLayer, getFlowPartitioningsMockLayer, + getAncestorsMockLayer, getPartitioningMainFlowMockLayer )