Skip to content

Commit

Permalink
16148: send reports from the submissions directly to FHIR Convert (#1…
Browse files Browse the repository at this point in the history
…6339)

* 16148: send reports from the submissions directly to FHIR Convert

* fixup! 16148: send reports from the submissions directly to FHIR Convert

* 16148: delete old FHIR receiver code

* fixup! 16148: send reports from the submissions directly to FHIR Convert

* fixup! 16148: send reports from the submissions directly to FHIR Convert

* fixup! 16148: send reports from the submissions directly to FHIR Convert

* fixup! 16148: send reports from the submissions directly to FHIR Convert

* fixup! 16148: send reports from the submissions directly to FHIR Convert

* fixup! 16148: send reports from the submissions directly to FHIR Convert

* fixup! 16148: send reports from the submissions directly to FHIR Convert
  • Loading branch information
mkalish authored Nov 1, 2024
1 parent 4ccf2a8 commit d7551d3
Show file tree
Hide file tree
Showing 24 changed files with 913 additions and 1,729 deletions.

Large diffs are not rendered by default.

140 changes: 134 additions & 6 deletions prime-router/src/main/kotlin/cli/ProcessFhirCommands.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,22 @@ import gov.cdc.prime.router.Hl7Configuration
import gov.cdc.prime.router.Metadata
import gov.cdc.prime.router.MimeFormat
import gov.cdc.prime.router.Receiver
import gov.cdc.prime.router.Report
import gov.cdc.prime.router.ReportStreamFilter
import gov.cdc.prime.router.Topic
import gov.cdc.prime.router.azure.BlobAccess
import gov.cdc.prime.router.azure.ConditionStamper
import gov.cdc.prime.router.azure.LookupTableConditionMapper
import gov.cdc.prime.router.azure.db.enums.TaskAction
import gov.cdc.prime.router.azure.db.tables.pojos.ReportFile
import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService
import gov.cdc.prime.router.azure.observability.event.ItemEventData
import gov.cdc.prime.router.azure.observability.event.ReportEventData
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventName
import gov.cdc.prime.router.azure.observability.event.ReportStreamItemEventBuilder
import gov.cdc.prime.router.azure.observability.event.ReportStreamItemProcessingErrorEventBuilder
import gov.cdc.prime.router.azure.observability.event.ReportStreamReportEventBuilder
import gov.cdc.prime.router.azure.observability.event.ReportStreamReportProcessingErrorEventBuilder
import gov.cdc.prime.router.cli.CommandUtilities.Companion.abort
import gov.cdc.prime.router.cli.helpers.HL7DiffHelper
import gov.cdc.prime.router.common.Environment
Expand Down Expand Up @@ -374,12 +386,16 @@ class ProcessFhirCommands : CliktCommand(
// this is just for logging so it is fine to just make it up
UUID.randomUUID().toString()
}
val result = FHIRReceiverFilter().evaluateObservationConditionFilters(
receiver,
bundle,
ActionLogger(),
trackingId
)
// TODO: https://github.com/CDCgov/prime-reportstream/issues/16407
val result =
FHIRReceiverFilter(
reportStreamEventService = NoopReportStreamEventService()
).evaluateObservationConditionFilters(
receiver,
bundle,
ActionLogger(),
trackingId
)
if (result is ReceiverFilterEvaluationResult.Success) {
return result.bundle
} else {
Expand Down Expand Up @@ -848,4 +864,116 @@ class FhirPathCommand : CliktCommand(
stringValue.append("\n}\n")
return stringValue.toString()
}
}

// This exists only because ProcessFhirCommands instantiates a FHIRReceiverFilter to access a function that likely could be
// made static
// TODO: https://github.com/CDCgov/prime-reportstream/issues/16407
class NoopReportStreamEventService : IReportStreamEventService {
override fun sendQueuedEvents() {
throw NotImplementedError()
}

override fun sendReportEvent(
eventName: ReportStreamEventName,
childReport: Report,
pipelineStepName: TaskAction,
shouldQueue: Boolean,
initializer: ReportStreamReportEventBuilder.() -> Unit,
) {
throw NotImplementedError()
}

override fun sendReportEvent(
eventName: ReportStreamEventName,
childReport: ReportFile,
pipelineStepName: TaskAction,
shouldQueue: Boolean,
initializer: ReportStreamReportEventBuilder.() -> Unit,
) {
throw NotImplementedError()
}

override fun sendReportProcessingError(
eventName: ReportStreamEventName,
childReport: ReportFile,
pipelineStepName: TaskAction,
error: String,
shouldQueue: Boolean,
initializer: ReportStreamReportProcessingErrorEventBuilder.() -> Unit,
) {
throw NotImplementedError()
}

override fun sendReportProcessingError(
eventName: ReportStreamEventName,
childReport: Report,
pipelineStepName: TaskAction,
error: String,
shouldQueue: Boolean,
initializer: ReportStreamReportProcessingErrorEventBuilder.() -> Unit,
) {
throw NotImplementedError()
}

override fun sendItemEvent(
eventName: ReportStreamEventName,
childReport: Report,
pipelineStepName: TaskAction,
shouldQueue: Boolean,
initializer: ReportStreamItemEventBuilder.() -> Unit,
) {
throw NotImplementedError()
}

override fun sendItemEvent(
eventName: ReportStreamEventName,
childReport: ReportFile,
pipelineStepName: TaskAction,
shouldQueue: Boolean,
initializer: ReportStreamItemEventBuilder.() -> Unit,
) {
throw NotImplementedError()
}

override fun sendItemProcessingError(
eventName: ReportStreamEventName,
childReport: ReportFile,
pipelineStepName: TaskAction,
error: String,
shouldQueue: Boolean,
initializer: ReportStreamItemProcessingErrorEventBuilder.() -> Unit,
) {
throw NotImplementedError()
}

override fun sendItemProcessingError(
eventName: ReportStreamEventName,
childReport: Report,
pipelineStepName: TaskAction,
error: String,
shouldQueue: Boolean,
initializer: ReportStreamItemProcessingErrorEventBuilder.() -> Unit,
) {
throw NotImplementedError()
}

override fun getReportEventData(
childReportId: UUID,
childBodyUrl: String,
parentReportId: UUID?,
pipelineStepName: TaskAction,
topic: Topic?,
): ReportEventData {
throw NotImplementedError()
}

override fun getItemEventData(
childItemIndex: Int,
parentReportId: UUID,
parentItemIndex: Int,
trackingId: String?,
): ItemEventData {
throw NotImplementedError()
}
}
85 changes: 72 additions & 13 deletions prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,32 @@ import com.microsoft.azure.functions.annotation.FunctionName
import com.microsoft.azure.functions.annotation.QueueTrigger
import com.microsoft.azure.functions.annotation.StorageAccount
import gov.cdc.prime.reportstream.shared.QueueMessage
import gov.cdc.prime.reportstream.shared.Submission
import gov.cdc.prime.router.ActionLogger
import gov.cdc.prime.router.azure.ActionHistory
import gov.cdc.prime.router.azure.DataAccessTransaction
import gov.cdc.prime.router.azure.DatabaseAccess
import gov.cdc.prime.router.azure.QueueAccess
import gov.cdc.prime.router.azure.SubmissionTableService
import gov.cdc.prime.router.azure.WorkflowEngine
import gov.cdc.prime.router.azure.db.enums.TaskAction
import gov.cdc.prime.router.azure.observability.event.AzureEventService
import gov.cdc.prime.router.azure.observability.event.AzureEventServiceImpl
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventName
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventProperties
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventService
import gov.cdc.prime.router.common.BaseEngine
import gov.cdc.prime.router.fhirengine.engine.FHIRConverter
import gov.cdc.prime.router.fhirengine.engine.FHIRDestinationFilter
import gov.cdc.prime.router.fhirengine.engine.FHIREngine
import gov.cdc.prime.router.fhirengine.engine.FHIRReceiver
import gov.cdc.prime.router.fhirengine.engine.FHIRReceiverFilter
import gov.cdc.prime.router.fhirengine.engine.FHIRTranslator
import gov.cdc.prime.router.fhirengine.engine.FhirReceiveQueueMessage
import gov.cdc.prime.router.fhirengine.engine.FhirConvertSubmissionQueueMessage
import gov.cdc.prime.router.fhirengine.engine.PrimeRouterQueueMessage
import gov.cdc.prime.router.fhirengine.engine.ReportPipelineMessage
import gov.cdc.prime.router.fhirengine.engine.SubmissionSenderNotFound
import gov.cdc.prime.router.history.db.ReportGraph
import gov.cdc.prime.router.report.ReportService
import org.apache.commons.lang3.StringUtils
import org.apache.logging.log4j.kotlin.Logging
import org.jooq.exception.DataAccessException
Expand All @@ -33,23 +40,41 @@ class FHIRFunctions(
private val actionLogger: ActionLogger = ActionLogger(),
private val databaseAccess: DatabaseAccess = BaseEngine.databaseAccessSingleton,
private val queueAccess: QueueAccess = QueueAccess,
private val submissionTableService: SubmissionTableService = SubmissionTableService.getInstance(),
val reportService: ReportService = ReportService(ReportGraph(databaseAccess), databaseAccess),
val azureEventService: AzureEventService = AzureEventServiceImpl(),
val reportStreamEventService: ReportStreamEventService =
ReportStreamEventService(databaseAccess, azureEventService, reportService),
) : Logging {

/**
* An azure function for ingesting and recording submissions
*/
@FunctionName("receive-fhir")
@FunctionName("convert-from-submissions-fhir")
@StorageAccount("AzureWebJobsStorage")
fun receive(
@QueueTrigger(name = "message", queueName = QueueMessage.elrReceiveQueueName)
fun convertFromSubmissions(
@QueueTrigger(name = "message", queueName = QueueMessage.elrSubmissionConvertQueueName)
message: String,
// Number of times this message has been dequeued
@BindingName("DequeueCount") dequeueCount: Int = 1,
) {
logger.info(
"message consumed from elr-fhir-receive queue"
"message consumed from ${QueueMessage.elrSubmissionConvertQueueName} queue"
)
process(message, dequeueCount, FHIRReceiver(), ActionHistory(TaskAction.receive))
process(
message,
dequeueCount,
FHIRConverter(reportStreamEventService = reportStreamEventService),
ActionHistory(TaskAction.convert)
)
val messageContent = readMessage("convert", message, dequeueCount)
val tableEntity = Submission(
messageContent.reportId.toString(),
"Accepted",
messageContent.blobURL,
actionLogger.errors.takeIf { it.isNotEmpty() }?.map { it.detail.message }?.toString()
)
submissionTableService.insertSubmission(tableEntity)
}

/**
Expand All @@ -63,7 +88,12 @@ class FHIRFunctions(
// Number of times this message has been dequeued
@BindingName("DequeueCount") dequeueCount: Int = 1,
) {
process(message, dequeueCount, FHIRConverter(), ActionHistory(TaskAction.convert))
process(
message,
dequeueCount,
FHIRConverter(reportStreamEventService = reportStreamEventService),
ActionHistory(TaskAction.convert)
)
}

/**
Expand All @@ -77,7 +107,12 @@ class FHIRFunctions(
// Number of times this message has been dequeued
@BindingName("DequeueCount") dequeueCount: Int = 1,
) {
process(message, dequeueCount, FHIRDestinationFilter(), ActionHistory(TaskAction.destination_filter))
process(
message,
dequeueCount,
FHIRDestinationFilter(reportStreamEventService = reportStreamEventService),
ActionHistory(TaskAction.destination_filter)
)
}

/**
Expand All @@ -91,7 +126,12 @@ class FHIRFunctions(
// Number of times this message has been dequeued
@BindingName("DequeueCount") dequeueCount: Int = 1,
) {
process(message, dequeueCount, FHIRReceiverFilter(), ActionHistory(TaskAction.receiver_filter))
process(
message,
dequeueCount,
FHIRReceiverFilter(reportStreamEventService = reportStreamEventService),
ActionHistory(TaskAction.receiver_filter)
)
}

/**
Expand All @@ -105,7 +145,12 @@ class FHIRFunctions(
// Number of times this message has been dequeued
@BindingName("DequeueCount") dequeueCount: Int = 1,
) {
process(message, dequeueCount, FHIRTranslator(), ActionHistory(TaskAction.translate))
process(
message,
dequeueCount,
FHIRTranslator(reportStreamEventService = reportStreamEventService),
ActionHistory(TaskAction.translate)
)
}

/**
Expand Down Expand Up @@ -149,13 +194,27 @@ class FHIRFunctions(
recordResults(message, actionHistory, txn)
results
}

reportStreamEventService.sendQueuedEvents()
return newMessages
} catch (ex: DataAccessException) {
// This is the one exception type that we currently will allow for retrying as there are occasional
// DB connectivity issues that are resolved without intervention
logger.error(ex)
throw ex
} catch (ex: SubmissionSenderNotFound) {
// This is a specific error case that can occur while handling a report via the new Submission service
// In a situation that the sender is not found there is not enough information to record a report event
// and we want a poison queue message to be immediately added so that the configuration can be fixed
logger.error(ex)
val tableEntity = Submission(
ex.reportId.toString(),
"Rejected",
ex.blobURL,
actionLogger.errors.takeIf { it.isNotEmpty() }?.map { it.detail.message }?.toString()
)
submissionTableService.insertSubmission(tableEntity)
queueAccess.sendMessage("${messageContent.messageQueueName}-poison", message)
return emptyList()
} catch (ex: Exception) {
// We're catching anything else that occurs because the most likely cause is a code or configuration error
// that will not be resolved if the message is automatically retried
Expand Down Expand Up @@ -186,7 +245,7 @@ class FHIRFunctions(

return when (val queueMessage = QueueMessage.deserialize(message)) {
is QueueMessage.ReceiveQueueMessage -> {
FhirReceiveQueueMessage(
FhirConvertSubmissionQueueMessage(
queueMessage.reportId,
queueMessage.blobURL,
queueMessage.digest,
Expand Down
Loading

0 comments on commit d7551d3

Please sign in to comment.