-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#188: new Flow service in API v2 #195
#188: new Flow service in API v2 #195
Conversation
…support and new DTO
…e/188-server-part-of-get-flow-checkpoints # Conflicts: # server/src/test/scala/za/co/absa/atum/server/api/TestData.scala
JaCoCo server module code coverage report - scala 2.13.11Build Failed |
…a model and tests
… (will need refactoring)
…e/188-server-part-of-get-flow-checkpoints
} yield MeasureResultDTO(mainValue, supportValues) | ||
|
||
measureResultOrErr match { | ||
case Left(err) => throw err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@salamonpavel I was thinking what to do here, perhaps this is not 'best-zio-practice' - would you have a better idea how to handle this? Also, check FlowServiceImpl
where it is used please
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can and should return Either instead of throwing exception which in this version of code in not handled properly. I would go with this code (only small change of yours).
def toCheckpointDTO(
partitioning: PartitioningDTO,
checkpointQueryResult: CheckpointFromDB
): Either[DecodingFailure, CheckpointDTO] = {
val measureResultOrErr = checkpointQueryResult.measurementValue.as[MeasureResultDTO]
measureResultOrErr match {
case Left(err) => Left(err)
case Right(measureResult) =>
Right(
CheckpointDTO(
id = checkpointQueryResult.idCheckpoint,
name = checkpointQueryResult.checkpointName,
author = checkpointQueryResult.author,
measuredByAtumAgent = checkpointQueryResult.measuredByAtumAgent,
partitioning = partitioning,
processStartTime = checkpointQueryResult.checkpointStartTime,
processEndTime = checkpointQueryResult.checkpointEndTime,
measurements = Set(
MeasurementDTO(
measure = MeasureDTO(
measureName = checkpointQueryResult.measureName,
measuredColumns = checkpointQueryResult.measuredColumns
),
result = measureResult
)
)
)
)
}
}
|
||
object CheckpointFromDB { | ||
|
||
private def extractMainValue(json: Json): Either[Error, MeasureResultDTO.TypedValue] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is not needed. You can directly deserialize into MeasureResultDTO
in toCheckpointDTO
method.
checkpointQueryResult.measurementValue.as[MeasureResultDTO]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, that's true now. I had a few iterations of this code - thanks! Will change it
json.as[MeasureResultDTO].map(_.mainValue) | ||
} | ||
|
||
private def extractSupportValues(json: Json): Either[Error, Map[String, MeasureResultDTO.TypedValue]] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is not needed. You can directly deserialize into MeasureResultDTO
in toCheckpointDTO
method.
checkpointQueryResult.measurementValue.as[MeasureResultDTO]
class FlowServiceImpl(flowRepository: FlowRepository) | ||
extends FlowService with BaseService { | ||
|
||
override def getFlowCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[ServiceError, Seq[CheckpointDTO]] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this version the exception coming from the deserialization is not handled. In some other comment I am mentioning you should return rather either in the serde code. Then you could create zios from those eithers and collect them. See bellow two version of the same, one with sequential processing (the one with foreach), another one with parallel processing (collectAll). I would personally choose the foreach variant.
def getFlowCheckpointsCollectAll(checkpointQueryDTO: CheckpointQueryDTO): IO[ServiceError, Seq[CheckpointDTO]] = {
for {
checkpointsFromDB <- repositoryCall(flowRepository.getFlowCheckpoints(checkpointQueryDTO), "getFlowCheckpoints")
checkpointDTOs <- ZIO.collectAll {
checkpointsFromDB.map { checkpointFromDB =>
ZIO.fromEither(CheckpointFromDB.toCheckpointDTO(checkpointQueryDTO.partitioning, checkpointFromDB))
.mapError(error => ServiceError(error.getMessage))
}
}
} yield checkpointDTOs
}
def getFlowCheckpointsForeach(checkpointQueryDTO: CheckpointQueryDTO): IO[ServiceError, Seq[CheckpointDTO]] = {
for {
checkpointsFromDB <- repositoryCall(flowRepository.getFlowCheckpoints(checkpointQueryDTO), "getFlowCheckpoints")
checkpointDTOs <- ZIO.foreach(checkpointsFromDB) { checkpointFromDB =>
ZIO.fromEither(CheckpointFromDB.toCheckpointDTO(checkpointQueryDTO.partitioning, checkpointFromDB))
.mapError(error => ServiceError(error.getMessage))
}
} yield checkpointDTOs
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually now reading the documentation I can see that you could also use foreachPar for parallel processing. So the difference between foreach(Par) and collectAll is mainly in the fact that collectAll takes sequence of effects on the input whereas foreach takes a normal collection and function to convert the elements into zio. The return value is the same, and both return failed effect if any of the zios fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, it's quite educative, I'll give it some reading.
I think I'll stick with the sequential processing - the only 'parallelism' would be what happens in toCheckpointDTO
and that's not that 'slow' (i.e. I like performance / multiprocessing optimizations where it significantly impacts performance, on the other hand if it doesn't, parallelism can introduce additional overhead and, god forbids, debugging of problems is a bit more difficult)
|
||
implicit val encodeResultValueType: Encoder[MeasureResultDTO.ResultValueType] = Encoder.encodeString.contramap { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the motivation to place json related encoders/decoders alongside doobie implicits?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand the question, those JSON related SerDe code was already there & I needed those MeasureResult DTOs to be serialized/deserialized as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DoobieImplicits object is there for defining Put/Get/Read/Write instances for Doobie. Then we have PlayJsonImplicits for Reads/Writes/Format type classes for Play Json. And what you have defined is actually related to Circe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aaah. Yes, I'm sorry I understand now. I'll move them to CirceImplicits.scala
I know that I could move them directly to CheckpointFromDB.scala
, but I anticipate that @TebaleloS will create a bunch of them later as well, so it might be a good idea for them to be centralized
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's customary to place them in companion objects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved into companion object of a given DTO, thanks for the recommendation
|
||
override def sql(values: CheckpointQueryDTO)(implicit read: Read[CheckpointFromDB]): Fragment = { | ||
val partitioning = PartitioningForDB.fromSeqPartitionDTO(values.partitioning) | ||
val partitioningNormalized = Json.toJson(partitioning).toString |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could already serialize it into Json from Circe instead of using String derived by play json.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't want to do that in this PR, it's been open for far too long and we have a ticket for this already. Let's make it later & in one bunch, not in pieces in these feature PRs I think
@@ -54,6 +54,15 @@ trait Endpoints extends BaseEndpoints { | |||
.out(jsonBody[AdditionalDataSubmitDTO]) | |||
} | |||
|
|||
protected val getFlowCheckpointsEndpoint | |||
: PublicEndpoint[CheckpointQueryDTO, ErrorResponse, Seq[CheckpointDTO], Any] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's merge #199 before this PR so you can incorporate the envelope.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, happy to do that - I approved #199 just now
import zio.test.Assertion.failsWithA | ||
import zio.test._ | ||
|
||
object FlowControllerIntegrationTests extends ZIOSpecDefault with TestData { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a unit test and should be executed as such. Please rename to FlowControllerUnitTests
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
import zio.test._ | ||
import zio.test.junit.ZTestJUnitRunner | ||
|
||
@RunWith(classOf[ZTestJUnitRunner]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the annotation and make it an object. Also as above, it's a unit test and should be executed as such. Please rename the object to FlowRepositoryUnitTests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I'll actually make these changes in the whole repo. I was not paying particular attention to it, but it's time to change it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lsulak
Maybe you could also rename all other test files where the suffix 'IntegrationTests' was incorrectly used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
naturally, I also did it already :)
import zio.test._ | ||
import zio.test.junit.ZTestJUnitRunner | ||
|
||
@RunWith(classOf[ZTestJUnitRunner]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR showcase high-quality code that adheres to best practices and coding standards. However, it would be beneficial to add automated tests for newly created endpoint. While manual testing via tools like Postman is useful for exploratory testing and very specific scenarios, it cannot replace automated tests.
Thanks, appreciate it! Re API tests: actually it's a good idea. Ticket here: #210 for the future as it's quite difficult to implement all the good ideas immediately :D |
…e/188-server-part-of-get-flow-checkpoints # Conflicts: # database/src/main/postgres/flows/V1.9.1__get_flow_checkpoints.sql # project/Dependencies.scala # server/src/main/scala/za/co/absa/atum/server/Constants.scala # server/src/main/scala/za/co/absa/atum/server/api/database/DoobieImplicits.scala # server/src/main/scala/za/co/absa/atum/server/api/http/BaseEndpoints.scala # server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala # server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala # server/src/main/scala/za/co/absa/atum/server/api/repository/BaseRepository.scala # server/src/main/scala/za/co/absa/atum/server/api/service/BaseService.scala # server/src/main/scala/za/co/absa/atum/server/model/CheckpointFromDB.scala # server/src/main/scala/za/co/absa/atum/server/model/PlayJsonImplicits.scala # server/src/test/scala/za/co/absa/atum/server/api/TestData.scala # server/src/test/scala/za/co/absa/atum/server/api/controller/CheckpointControllerUnitTests.scala # server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala
…-merge conflict resolution
Release notes
|
There are also these unit tests for endpoints that could be implemented also for the new endpoint. |
…ject of a given DTO
I personally don't like covering everything with all types of tests - but in this case I'll add them, perhaps it's a good enough balance to have at least 1 test for each endpoint type / service, and since here I introduced 'flow' service / functionality, it might be nice to have it. |
Flow
Closes #188
Release notes: