diff --git a/prime-router/src/main/kotlin/azure/observability/event/ReportStreamEventService.kt b/prime-router/src/main/kotlin/azure/observability/event/ReportStreamEventService.kt index 42509ce0748..36ae8a908f7 100644 --- a/prime-router/src/main/kotlin/azure/observability/event/ReportStreamEventService.kt +++ b/prime-router/src/main/kotlin/azure/observability/event/ReportStreamEventService.kt @@ -1,7 +1,6 @@ package gov.cdc.prime.router.azure.observability.event import gov.cdc.prime.router.Report -import gov.cdc.prime.router.ReportId import gov.cdc.prime.router.Topic import gov.cdc.prime.router.azure.DatabaseAccess import gov.cdc.prime.router.azure.db.enums.TaskAction @@ -22,18 +21,28 @@ import java.util.UUID */ interface IReportStreamEventService { + /** + * Sends any events that have been queued up if the client specified sending them can be deferred. + * + * This is useful in contexts where the events should only be sent after all the business logic has + * executed and the DB transaction has been committed. + */ + fun sendQueuedEvents() + /** * Creates a report event from an [Report] * * @param eventName the business event value from [ReportStreamEventName] * @param childReport the report that is getting emitted from the pipeline step * @param pipelineStepName the pipeline step that is emitting the event + * @param shouldQueue whether to send the event immediately or defer it to be sent later * @param initializer additional data to initialize the creation of the event. See [AbstractReportStreamEventBuilder] */ fun sendReportEvent( eventName: ReportStreamEventName, childReport: Report, pipelineStepName: TaskAction, + shouldQueue: Boolean = false, initializer: ReportStreamReportEventBuilder.() -> Unit, ) @@ -43,12 +52,14 @@ interface IReportStreamEventService { * @param eventName the business event value from [ReportStreamEventName] * @param childReport the report that is getting emitted from the pipeline step * @param pipelineStepName the pipeline step that is emitting the event + * @param shouldQueue whether to send the event immediately or defer it to be sent later * @param initializer additional data to initialize the creation of the event. See [AbstractReportStreamEventBuilder] */ fun sendReportEvent( eventName: ReportStreamEventName, childReport: ReportFile, pipelineStepName: TaskAction, + shouldQueue: Boolean = false, initializer: ReportStreamReportEventBuilder.() -> Unit, ) @@ -59,6 +70,7 @@ interface IReportStreamEventService { * @param childReport the report that is getting emitted from the pipeline step * @param pipelineStepName the pipeline step that is emitting the event * @param error the error description + * @param shouldQueue whether to send the event immediately or defer it to be sent later * @param initializer additional data to initialize the creation of the event. See [AbstractReportStreamEventBuilder] */ fun sendReportProcessingError( @@ -66,6 +78,7 @@ interface IReportStreamEventService { childReport: ReportFile, pipelineStepName: TaskAction, error: String, + shouldQueue: Boolean = false, initializer: ReportStreamReportProcessingErrorEventBuilder.() -> Unit, ) @@ -76,6 +89,7 @@ interface IReportStreamEventService { * @param childReport the report that is getting emitted from the pipeline step * @param pipelineStepName the pipeline step that is emitting the event * @param error the error description + * @param shouldQueue whether to send the event immediately or defer it to be sent later * @param initializer additional data to initialize the creation of the event. See [AbstractReportStreamEventBuilder] */ fun sendReportProcessingError( @@ -83,25 +97,7 @@ interface IReportStreamEventService { childReport: Report, pipelineStepName: TaskAction, error: String, - initializer: ReportStreamReportProcessingErrorEventBuilder.() -> Unit, - ) - - /** - * Creates a general processing error event. This is not associated with a report or item. - * - * @param eventName the business event value from [ReportStreamEventName] - * @param pipelineStepName the pipeline step that is emitting the event - * @param error the error description - * @param submissionId the report id for the incoming report - * @param bodyUrl the blob url for the incoming report - * @param initializer additional data to initialize the creation of the event. See [AbstractReportStreamEventBuilder] - */ - fun sendSubmissionProcessingError( - eventName: ReportStreamEventName, - pipelineStepName: TaskAction, - error: String, - submissionId: ReportId, - bodyUrl: String, + shouldQueue: Boolean = false, initializer: ReportStreamReportProcessingErrorEventBuilder.() -> Unit, ) @@ -111,12 +107,14 @@ interface IReportStreamEventService { * @param eventName the business event value from [ReportStreamEventName] * @param childReport the report that is getting emitted from the pipeline step * @param pipelineStepName the pipeline step that is emitting the event + * @param shouldQueue whether to send the event immediately or defer it to be sent later * @param initializer additional data to initialize the creation of the event. See [AbstractReportStreamEventBuilder] */ fun sendItemEvent( eventName: ReportStreamEventName, childReport: Report, pipelineStepName: TaskAction, + shouldQueue: Boolean = false, initializer: ReportStreamItemEventBuilder.() -> Unit, ) @@ -126,12 +124,14 @@ interface IReportStreamEventService { * @param eventName the business event value from [ReportStreamEventName] * @param childReport the report that is getting emitted from the pipeline step * @param pipelineStepName the pipeline step that is emitting the event + * @param shouldQueue whether to send the event immediately or defer it to be sent later * @param initializer additional data to initialize the creation of the event. See [AbstractReportStreamEventBuilder] */ fun sendItemEvent( eventName: ReportStreamEventName, childReport: ReportFile, pipelineStepName: TaskAction, + shouldQueue: Boolean = false, initializer: ReportStreamItemEventBuilder.() -> Unit, ) @@ -142,6 +142,7 @@ interface IReportStreamEventService { * @param childReport the report that is getting emitted from the pipeline step * @param pipelineStepName the pipeline step that is emitting the event * @param error the error description + * @param shouldQueue whether to send the event immediately or defer it to be sent later * @param initializer additional data to initialize the creation of the event. See [AbstractReportStreamEventBuilder] */ fun sendItemProcessingError( @@ -149,6 +150,7 @@ interface IReportStreamEventService { childReport: ReportFile, pipelineStepName: TaskAction, error: String, + shouldQueue: Boolean = false, initializer: ReportStreamItemProcessingErrorEventBuilder.() -> Unit, ) @@ -159,6 +161,7 @@ interface IReportStreamEventService { * @param childReport the report that is getting emitted from the pipeline step * @param pipelineStepName the pipeline step that is emitting the event * @param error the error description + * @param shouldQueue whether to send the event immediately or defer it to be sent later * @param initializer additional data to initialize the creation of the event. See [AbstractReportStreamEventBuilder] */ fun sendItemProcessingError( @@ -166,6 +169,7 @@ interface IReportStreamEventService { childReport: Report, pipelineStepName: TaskAction, error: String, + shouldQueue: Boolean = false, initializer: ReportStreamItemProcessingErrorEventBuilder.() -> Unit, ) @@ -216,13 +220,23 @@ class ReportStreamEventService( private val reportService: ReportService, ) : IReportStreamEventService { + private val builtEvents = mutableListOf>() + + override fun sendQueuedEvents() { + builtEvents.forEach { + it.send() + } + builtEvents.clear() + } + override fun sendReportEvent( eventName: ReportStreamEventName, childReport: Report, pipelineStepName: TaskAction, + shouldQueue: Boolean, initializer: ReportStreamReportEventBuilder.() -> Unit, ) { - ReportStreamReportEventBuilder( + val builder = ReportStreamReportEventBuilder( this, azureEventService, eventName, @@ -232,16 +246,22 @@ class ReportStreamEventService( pipelineStepName ).apply( initializer - ).send() + ) + if (shouldQueue) { + builtEvents.add(builder) + } else { + builder.send() + } } override fun sendReportEvent( eventName: ReportStreamEventName, childReport: ReportFile, pipelineStepName: TaskAction, + shouldQueue: Boolean, initializer: ReportStreamReportEventBuilder.() -> Unit, ) { - ReportStreamReportEventBuilder( + val builder = ReportStreamReportEventBuilder( this, azureEventService, eventName, @@ -251,7 +271,13 @@ class ReportStreamEventService( pipelineStepName ).apply( initializer - ).send() + ) + + if (shouldQueue) { + builtEvents.add(builder) + } else { + builder.send() + } } override fun sendReportProcessingError( @@ -259,9 +285,10 @@ class ReportStreamEventService( childReport: ReportFile, pipelineStepName: TaskAction, error: String, + shouldQueue: Boolean, initializer: ReportStreamReportProcessingErrorEventBuilder.() -> Unit, ) { - ReportStreamReportProcessingErrorEventBuilder( + val builder = ReportStreamReportProcessingErrorEventBuilder( this, azureEventService, eventName, @@ -272,7 +299,13 @@ class ReportStreamEventService( error ).apply( initializer - ).send() + ) + + if (shouldQueue) { + builtEvents.add(builder) + } else { + builder.send() + } } override fun sendReportProcessingError( @@ -280,9 +313,10 @@ class ReportStreamEventService( childReport: Report, pipelineStepName: TaskAction, error: String, + shouldQueue: Boolean, initializer: ReportStreamReportProcessingErrorEventBuilder.() -> Unit, ) { - ReportStreamReportProcessingErrorEventBuilder( + val builder = ReportStreamReportProcessingErrorEventBuilder( this, azureEventService, eventName, @@ -293,38 +327,23 @@ class ReportStreamEventService( error ).apply( initializer - ).send() - } + ) - override fun sendSubmissionProcessingError( - eventName: ReportStreamEventName, - pipelineStepName: TaskAction, - error: String, - submissionId: ReportId, - bodyUrl: String, - initializer: ReportStreamReportProcessingErrorEventBuilder.() -> Unit, - ) { - ReportStreamReportProcessingErrorEventBuilder( - this, - azureEventService, - eventName, - submissionId, - bodyUrl, - theTopic = null, - pipelineStepName, - error - ).apply( - initializer - ).send() + if (shouldQueue) { + builtEvents.add(builder) + } else { + builder.send() + } } override fun sendItemEvent( eventName: ReportStreamEventName, childReport: Report, pipelineStepName: TaskAction, + shouldQueue: Boolean, initializer: ReportStreamItemEventBuilder.() -> Unit, ) { - ReportStreamItemEventBuilder( + val builder = ReportStreamItemEventBuilder( this, azureEventService, eventName, @@ -332,16 +351,23 @@ class ReportStreamEventService( childReport.bodyURL, childReport.schema.topic, pipelineStepName - ).apply(initializer).send() + ).apply(initializer) + + if (shouldQueue) { + builtEvents.add(builder) + } else { + builder.send() + } } override fun sendItemEvent( eventName: ReportStreamEventName, childReport: ReportFile, pipelineStepName: TaskAction, + shouldQueue: Boolean, initializer: ReportStreamItemEventBuilder.() -> Unit, ) { - ReportStreamItemEventBuilder( + val builder = ReportStreamItemEventBuilder( this, azureEventService, eventName, @@ -349,7 +375,13 @@ class ReportStreamEventService( childReport.bodyUrl, childReport.schemaTopic, pipelineStepName - ).apply(initializer).send() + ).apply(initializer) + + if (shouldQueue) { + builtEvents.add(builder) + } else { + builder.send() + } } override fun sendItemProcessingError( @@ -357,9 +389,10 @@ class ReportStreamEventService( childReport: ReportFile, pipelineStepName: TaskAction, error: String, + shouldQueue: Boolean, initializer: ReportStreamItemProcessingErrorEventBuilder.() -> Unit, ) { - ReportStreamItemProcessingErrorEventBuilder( + val builder = ReportStreamItemProcessingErrorEventBuilder( this, azureEventService, eventName, @@ -368,7 +401,13 @@ class ReportStreamEventService( childReport.schemaTopic, pipelineStepName, error - ).apply(initializer).send() + ).apply(initializer) + + if (shouldQueue) { + builtEvents.add(builder) + } else { + builder.send() + } } override fun sendItemProcessingError( @@ -376,9 +415,10 @@ class ReportStreamEventService( childReport: Report, pipelineStepName: TaskAction, error: String, + shouldQueue: Boolean, initializer: ReportStreamItemProcessingErrorEventBuilder.() -> Unit, ) { - ReportStreamItemProcessingErrorEventBuilder( + val builder = ReportStreamItemProcessingErrorEventBuilder( this, azureEventService, eventName, @@ -387,7 +427,13 @@ class ReportStreamEventService( childReport.schema.topic, pipelineStepName, error - ).apply(initializer).send() + ).apply(initializer) + + if (shouldQueue) { + builtEvents.add(builder) + } else { + builder.send() + } } override fun getReportEventData( @@ -398,13 +444,10 @@ class ReportStreamEventService( topic: Topic?, ): ReportEventData { val submittedReportIds = if (parentReportId != null) { - val rootReports = reportService.getRootReports(parentReportId) - rootReports.ifEmpty { - listOf(dbAccess.fetchReportFile(parentReportId)) - } + reportService.getRootReports(parentReportId) } else { emptyList() - }.map { it.reportId } + }.map { it.reportId }.ifEmpty { if (parentReportId != null) listOf(parentReportId) else emptyList() } return ReportEventData( childReportId, @@ -425,8 +468,10 @@ class ReportStreamEventService( trackingId: String?, ): ItemEventData { val submittedIndex = reportService.getRootItemIndex(parentReportId, parentItemIndex) ?: parentItemIndex + val rootReport = - reportService.getRootReports(parentReportId).firstOrNull() ?: dbAccess.fetchReportFile(parentReportId) + reportService.getRootReports(parentReportId).firstOrNull() ?: dbAccess.fetchReportFile(parentReportId) + return ItemEventData( childItemIndex, parentItemIndex, diff --git a/prime-router/src/main/kotlin/cli/ProcessFhirCommands.kt b/prime-router/src/main/kotlin/cli/ProcessFhirCommands.kt index 07ae7ace0cb..694601cace9 100644 --- a/prime-router/src/main/kotlin/cli/ProcessFhirCommands.kt +++ b/prime-router/src/main/kotlin/cli/ProcessFhirCommands.kt @@ -21,10 +21,22 @@ import gov.cdc.prime.router.Hl7Configuration import gov.cdc.prime.router.Metadata import gov.cdc.prime.router.MimeFormat import gov.cdc.prime.router.Receiver +import gov.cdc.prime.router.Report import gov.cdc.prime.router.ReportStreamFilter +import gov.cdc.prime.router.Topic import gov.cdc.prime.router.azure.BlobAccess import gov.cdc.prime.router.azure.ConditionStamper import gov.cdc.prime.router.azure.LookupTableConditionMapper +import gov.cdc.prime.router.azure.db.enums.TaskAction +import gov.cdc.prime.router.azure.db.tables.pojos.ReportFile +import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService +import gov.cdc.prime.router.azure.observability.event.ItemEventData +import gov.cdc.prime.router.azure.observability.event.ReportEventData +import gov.cdc.prime.router.azure.observability.event.ReportStreamEventName +import gov.cdc.prime.router.azure.observability.event.ReportStreamItemEventBuilder +import gov.cdc.prime.router.azure.observability.event.ReportStreamItemProcessingErrorEventBuilder +import gov.cdc.prime.router.azure.observability.event.ReportStreamReportEventBuilder +import gov.cdc.prime.router.azure.observability.event.ReportStreamReportProcessingErrorEventBuilder import gov.cdc.prime.router.cli.CommandUtilities.Companion.abort import gov.cdc.prime.router.cli.helpers.HL7DiffHelper import gov.cdc.prime.router.common.Environment @@ -374,12 +386,16 @@ class ProcessFhirCommands : CliktCommand( // this is just for logging so it is fine to just make it up UUID.randomUUID().toString() } - val result = FHIRReceiverFilter().evaluateObservationConditionFilters( - receiver, - bundle, - ActionLogger(), - trackingId - ) + // TODO: https://github.com/CDCgov/prime-reportstream/issues/16407 + val result = + FHIRReceiverFilter( + reportStreamEventService = NoopReportStreamEventService() + ).evaluateObservationConditionFilters( + receiver, + bundle, + ActionLogger(), + trackingId + ) if (result is ReceiverFilterEvaluationResult.Success) { return result.bundle } else { @@ -848,4 +864,116 @@ class FhirPathCommand : CliktCommand( stringValue.append("\n}\n") return stringValue.toString() } +} + +// This exists only because ProcessFhirCommands instantiates a FHIRReceiverFilter to access a function that likely could be +// made static +// TODO: https://github.com/CDCgov/prime-reportstream/issues/16407 +class NoopReportStreamEventService : IReportStreamEventService { + override fun sendQueuedEvents() { + throw NotImplementedError() + } + + override fun sendReportEvent( + eventName: ReportStreamEventName, + childReport: Report, + pipelineStepName: TaskAction, + shouldQueue: Boolean, + initializer: ReportStreamReportEventBuilder.() -> Unit, + ) { + throw NotImplementedError() + } + + override fun sendReportEvent( + eventName: ReportStreamEventName, + childReport: ReportFile, + pipelineStepName: TaskAction, + shouldQueue: Boolean, + initializer: ReportStreamReportEventBuilder.() -> Unit, + ) { + throw NotImplementedError() + } + + override fun sendReportProcessingError( + eventName: ReportStreamEventName, + childReport: ReportFile, + pipelineStepName: TaskAction, + error: String, + shouldQueue: Boolean, + initializer: ReportStreamReportProcessingErrorEventBuilder.() -> Unit, + ) { + throw NotImplementedError() + } + + override fun sendReportProcessingError( + eventName: ReportStreamEventName, + childReport: Report, + pipelineStepName: TaskAction, + error: String, + shouldQueue: Boolean, + initializer: ReportStreamReportProcessingErrorEventBuilder.() -> Unit, + ) { + throw NotImplementedError() + } + + override fun sendItemEvent( + eventName: ReportStreamEventName, + childReport: Report, + pipelineStepName: TaskAction, + shouldQueue: Boolean, + initializer: ReportStreamItemEventBuilder.() -> Unit, + ) { + throw NotImplementedError() + } + + override fun sendItemEvent( + eventName: ReportStreamEventName, + childReport: ReportFile, + pipelineStepName: TaskAction, + shouldQueue: Boolean, + initializer: ReportStreamItemEventBuilder.() -> Unit, + ) { + throw NotImplementedError() + } + + override fun sendItemProcessingError( + eventName: ReportStreamEventName, + childReport: ReportFile, + pipelineStepName: TaskAction, + error: String, + shouldQueue: Boolean, + initializer: ReportStreamItemProcessingErrorEventBuilder.() -> Unit, + ) { + throw NotImplementedError() + } + + override fun sendItemProcessingError( + eventName: ReportStreamEventName, + childReport: Report, + pipelineStepName: TaskAction, + error: String, + shouldQueue: Boolean, + initializer: ReportStreamItemProcessingErrorEventBuilder.() -> Unit, + ) { + throw NotImplementedError() + } + + override fun getReportEventData( + childReportId: UUID, + childBodyUrl: String, + parentReportId: UUID?, + pipelineStepName: TaskAction, + topic: Topic?, + ): ReportEventData { + throw NotImplementedError() + } + + override fun getItemEventData( + childItemIndex: Int, + parentReportId: UUID, + parentItemIndex: Int, + trackingId: String?, + ): ItemEventData { + throw NotImplementedError() + } } \ No newline at end of file diff --git a/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt b/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt index 0f7a31b560e..cd3e59e2d3d 100644 --- a/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt +++ b/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt @@ -5,25 +5,32 @@ import com.microsoft.azure.functions.annotation.FunctionName import com.microsoft.azure.functions.annotation.QueueTrigger import com.microsoft.azure.functions.annotation.StorageAccount import gov.cdc.prime.reportstream.shared.QueueMessage +import gov.cdc.prime.reportstream.shared.Submission import gov.cdc.prime.router.ActionLogger import gov.cdc.prime.router.azure.ActionHistory import gov.cdc.prime.router.azure.DataAccessTransaction import gov.cdc.prime.router.azure.DatabaseAccess import gov.cdc.prime.router.azure.QueueAccess +import gov.cdc.prime.router.azure.SubmissionTableService import gov.cdc.prime.router.azure.WorkflowEngine import gov.cdc.prime.router.azure.db.enums.TaskAction +import gov.cdc.prime.router.azure.observability.event.AzureEventService +import gov.cdc.prime.router.azure.observability.event.AzureEventServiceImpl import gov.cdc.prime.router.azure.observability.event.ReportStreamEventName import gov.cdc.prime.router.azure.observability.event.ReportStreamEventProperties +import gov.cdc.prime.router.azure.observability.event.ReportStreamEventService import gov.cdc.prime.router.common.BaseEngine import gov.cdc.prime.router.fhirengine.engine.FHIRConverter import gov.cdc.prime.router.fhirengine.engine.FHIRDestinationFilter import gov.cdc.prime.router.fhirengine.engine.FHIREngine -import gov.cdc.prime.router.fhirengine.engine.FHIRReceiver import gov.cdc.prime.router.fhirengine.engine.FHIRReceiverFilter import gov.cdc.prime.router.fhirengine.engine.FHIRTranslator -import gov.cdc.prime.router.fhirengine.engine.FhirReceiveQueueMessage +import gov.cdc.prime.router.fhirengine.engine.FhirConvertSubmissionQueueMessage import gov.cdc.prime.router.fhirengine.engine.PrimeRouterQueueMessage import gov.cdc.prime.router.fhirengine.engine.ReportPipelineMessage +import gov.cdc.prime.router.fhirengine.engine.SubmissionSenderNotFound +import gov.cdc.prime.router.history.db.ReportGraph +import gov.cdc.prime.router.report.ReportService import org.apache.commons.lang3.StringUtils import org.apache.logging.log4j.kotlin.Logging import org.jooq.exception.DataAccessException @@ -33,23 +40,41 @@ class FHIRFunctions( private val actionLogger: ActionLogger = ActionLogger(), private val databaseAccess: DatabaseAccess = BaseEngine.databaseAccessSingleton, private val queueAccess: QueueAccess = QueueAccess, + private val submissionTableService: SubmissionTableService = SubmissionTableService.getInstance(), + val reportService: ReportService = ReportService(ReportGraph(databaseAccess), databaseAccess), + val azureEventService: AzureEventService = AzureEventServiceImpl(), + val reportStreamEventService: ReportStreamEventService = + ReportStreamEventService(databaseAccess, azureEventService, reportService), ) : Logging { /** * An azure function for ingesting and recording submissions */ - @FunctionName("receive-fhir") + @FunctionName("convert-from-submissions-fhir") @StorageAccount("AzureWebJobsStorage") - fun receive( - @QueueTrigger(name = "message", queueName = QueueMessage.elrReceiveQueueName) + fun convertFromSubmissions( + @QueueTrigger(name = "message", queueName = QueueMessage.elrSubmissionConvertQueueName) message: String, // Number of times this message has been dequeued @BindingName("DequeueCount") dequeueCount: Int = 1, ) { logger.info( - "message consumed from elr-fhir-receive queue" + "message consumed from ${QueueMessage.elrSubmissionConvertQueueName} queue" ) - process(message, dequeueCount, FHIRReceiver(), ActionHistory(TaskAction.receive)) + process( + message, + dequeueCount, + FHIRConverter(reportStreamEventService = reportStreamEventService), + ActionHistory(TaskAction.convert) + ) + val messageContent = readMessage("convert", message, dequeueCount) + val tableEntity = Submission( + messageContent.reportId.toString(), + "Accepted", + messageContent.blobURL, + actionLogger.errors.takeIf { it.isNotEmpty() }?.map { it.detail.message }?.toString() + ) + submissionTableService.insertSubmission(tableEntity) } /** @@ -63,7 +88,12 @@ class FHIRFunctions( // Number of times this message has been dequeued @BindingName("DequeueCount") dequeueCount: Int = 1, ) { - process(message, dequeueCount, FHIRConverter(), ActionHistory(TaskAction.convert)) + process( + message, + dequeueCount, + FHIRConverter(reportStreamEventService = reportStreamEventService), + ActionHistory(TaskAction.convert) + ) } /** @@ -77,7 +107,12 @@ class FHIRFunctions( // Number of times this message has been dequeued @BindingName("DequeueCount") dequeueCount: Int = 1, ) { - process(message, dequeueCount, FHIRDestinationFilter(), ActionHistory(TaskAction.destination_filter)) + process( + message, + dequeueCount, + FHIRDestinationFilter(reportStreamEventService = reportStreamEventService), + ActionHistory(TaskAction.destination_filter) + ) } /** @@ -91,7 +126,12 @@ class FHIRFunctions( // Number of times this message has been dequeued @BindingName("DequeueCount") dequeueCount: Int = 1, ) { - process(message, dequeueCount, FHIRReceiverFilter(), ActionHistory(TaskAction.receiver_filter)) + process( + message, + dequeueCount, + FHIRReceiverFilter(reportStreamEventService = reportStreamEventService), + ActionHistory(TaskAction.receiver_filter) + ) } /** @@ -105,7 +145,12 @@ class FHIRFunctions( // Number of times this message has been dequeued @BindingName("DequeueCount") dequeueCount: Int = 1, ) { - process(message, dequeueCount, FHIRTranslator(), ActionHistory(TaskAction.translate)) + process( + message, + dequeueCount, + FHIRTranslator(reportStreamEventService = reportStreamEventService), + ActionHistory(TaskAction.translate) + ) } /** @@ -149,13 +194,27 @@ class FHIRFunctions( recordResults(message, actionHistory, txn) results } - + reportStreamEventService.sendQueuedEvents() return newMessages } catch (ex: DataAccessException) { // This is the one exception type that we currently will allow for retrying as there are occasional // DB connectivity issues that are resolved without intervention logger.error(ex) throw ex + } catch (ex: SubmissionSenderNotFound) { + // This is a specific error case that can occur while handling a report via the new Submission service + // In a situation that the sender is not found there is not enough information to record a report event + // and we want a poison queue message to be immediately added so that the configuration can be fixed + logger.error(ex) + val tableEntity = Submission( + ex.reportId.toString(), + "Rejected", + ex.blobURL, + actionLogger.errors.takeIf { it.isNotEmpty() }?.map { it.detail.message }?.toString() + ) + submissionTableService.insertSubmission(tableEntity) + queueAccess.sendMessage("${messageContent.messageQueueName}-poison", message) + return emptyList() } catch (ex: Exception) { // We're catching anything else that occurs because the most likely cause is a code or configuration error // that will not be resolved if the message is automatically retried @@ -186,7 +245,7 @@ class FHIRFunctions( return when (val queueMessage = QueueMessage.deserialize(message)) { is QueueMessage.ReceiveQueueMessage -> { - FhirReceiveQueueMessage( + FhirConvertSubmissionQueueMessage( queueMessage.reportId, queueMessage.blobURL, queueMessage.digest, diff --git a/prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt b/prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt index 9527ef4dc19..e5cd8908412 100644 --- a/prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt +++ b/prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt @@ -16,6 +16,7 @@ import gov.cdc.prime.reportstream.shared.QueueMessage import gov.cdc.prime.router.ActionLogDetail import gov.cdc.prime.router.ActionLogScope import gov.cdc.prime.router.ActionLogger +import gov.cdc.prime.router.ClientSource import gov.cdc.prime.router.ErrorCode import gov.cdc.prime.router.InvalidReportMessage import gov.cdc.prime.router.Metadata @@ -23,6 +24,7 @@ import gov.cdc.prime.router.MimeFormat import gov.cdc.prime.router.Options import gov.cdc.prime.router.Report import gov.cdc.prime.router.SettingsProvider +import gov.cdc.prime.router.Topic import gov.cdc.prime.router.UnmappableConditionMessage import gov.cdc.prime.router.azure.ActionHistory import gov.cdc.prime.router.azure.BlobAccess @@ -39,6 +41,7 @@ import gov.cdc.prime.router.azure.observability.context.MDCUtils import gov.cdc.prime.router.azure.observability.context.withLoggingContext import gov.cdc.prime.router.azure.observability.event.AzureEventService import gov.cdc.prime.router.azure.observability.event.AzureEventServiceImpl +import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService import gov.cdc.prime.router.azure.observability.event.ReportStreamEventName import gov.cdc.prime.router.azure.observability.event.ReportStreamEventProperties import gov.cdc.prime.router.fhirengine.translation.HL7toFhirTranslator @@ -57,6 +60,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils import org.hl7.fhir.r4.model.Bundle import org.jooq.Field import java.time.OffsetDateTime +import java.util.UUID import java.util.stream.Collectors import java.util.stream.Stream @@ -75,7 +79,8 @@ class FHIRConverter( blob: BlobAccess = BlobAccess(), azureEventService: AzureEventService = AzureEventServiceImpl(), reportService: ReportService = ReportService(), -) : FHIREngine(metadata, settings, db, blob, azureEventService, reportService) { + reportStreamEventService: IReportStreamEventService, +) : FHIREngine(metadata, settings, db, blob, azureEventService, reportService, reportStreamEventService) { override val finishedField: Field = Tables.TASK.PROCESSED_AT @@ -83,6 +88,112 @@ class FHIRConverter( override val taskAction: TaskAction = TaskAction.convert + /** + * This object serves the purpose of consolidating the information needed to process a report + * through the convert step regardless of whether it comes from a [FhirConvertQueueMessage] + * or [FhirConvertSubmissionQueueMessage] + * + * @param reportId the report ID + * @param topic the topic the sender published to + * @param schemaName the FHIR transform to apply + * @param blobURL the URL for the blob to convert + * @param blobDigest the digest of the blob contents + */ + data class FHIRConvertInput( + val reportId: UUID, + val topic: Topic, + val schemaName: String, + val blobURL: String, + val blobDigest: String, + val blobSubFolderName: String, + ) { + + companion object { + + private val clientIdHeader = "client_id" + + /** + * Converts a [FhirConvertQueueMessage] into the input to the convert processing + * + * @param message the queue message + * @param actionHistory action history for recording details on the input report + */ + fun fromFhirConvertQueueMessage( + message: FhirConvertQueueMessage, + actionHistory: ActionHistory, + ): FHIRConvertInput { + val reportId = message.reportId + val topic = message.topic + val schemaName = message.schemaName + val blobUrl = message.blobURL + val blobDigest = message.digest + val blobSubFolderName = message.blobSubFolderName + actionHistory.trackExistingInputReport(reportId) + return FHIRConvertInput( + reportId, + topic, + schemaName, + blobUrl, + blobDigest, + blobSubFolderName + ) + } + + /** + * Converts a [FhirConvertSubmissionQueueMessage] into the input to the convert processing + * + * @param message the queue message + * @param actionHistory action history for recording details on the input report + * @param settings [SettingsProvider] for looking up the sender + */ + fun fromFHIRConvertSubmissionQueueMessage( + message: FhirConvertSubmissionQueueMessage, + actionHistory: ActionHistory, + settings: SettingsProvider, + ): FHIRConvertInput { + val reportId = message.reportId + val blobUrl = message.blobURL + val blobDigest = message.digest + val blobSubFolderName = message.blobSubFolderName + + val clientId = message.headers[clientIdHeader] + val sender = clientId?.takeIf { it.isNotBlank() }?.let { settings.findSender(it) } + if (sender == null) { + throw SubmissionSenderNotFound(clientId ?: "", reportId, blobUrl) + } + val topic = sender.topic + val schemaName = sender.schemaName + + val format = Report.getFormatFromBlobURL(blobUrl) + val report = Report( + sender.format, + listOf(ClientSource(organization = sender.organizationName, client = sender.name)), + 1, + nextAction = TaskAction.convert, + topic = sender.topic, + id = reportId, + bodyURL = blobUrl + ) + // This tracking is required so that the external report (coming from the submission service) + // is properly recorded in the report file table with the correct sender + actionHistory.trackExternalInputReport( + report, + BlobAccess.BlobInfo(format, blobUrl, blobDigest.toByteArray()) + ) + actionHistory.trackActionSenderInfo(sender.fullName) + + return FHIRConvertInput( + reportId, + topic, + schemaName, + blobUrl, + blobDigest, + blobSubFolderName + ) + } + } + } + /** * Accepts a [message] in either HL7 or FHIR format * HL7 messages will be converted into FHIR. @@ -97,7 +208,21 @@ class FHIRConverter( actionHistory: ActionHistory, ): List = when (message) { is FhirConvertQueueMessage -> { - fhirEngineRunResults(message, message.schemaName, actionLogger, actionHistory) + val input = FHIRConvertInput.fromFhirConvertQueueMessage(message, actionHistory) + + fhirEngineRunResults( + input, + actionLogger, + actionHistory + ) + } + is FhirConvertSubmissionQueueMessage -> { + val input = FHIRConvertInput.fromFHIRConvertSubmissionQueueMessage(message, actionHistory, settings) + fhirEngineRunResults( + input, + actionLogger, + actionHistory + ) } else -> { throw RuntimeException( @@ -107,21 +232,19 @@ class FHIRConverter( } private fun fhirEngineRunResults( - queueMessage: FhirConvertQueueMessage, - schemaName: String, + input: FHIRConvertInput, actionLogger: ActionLogger, actionHistory: ActionHistory, ): List { val contextMap = mapOf( MDCUtils.MDCProperty.ACTION_NAME to actionHistory.action.actionName.name, - MDCUtils.MDCProperty.REPORT_ID to queueMessage.reportId, - MDCUtils.MDCProperty.TOPIC to queueMessage.topic, - MDCUtils.MDCProperty.BLOB_URL to queueMessage.blobURL + MDCUtils.MDCProperty.REPORT_ID to input.reportId, + MDCUtils.MDCProperty.TOPIC to input.topic, + MDCUtils.MDCProperty.BLOB_URL to input.blobURL ) withLoggingContext(contextMap) { - actionLogger.setReportId(queueMessage.reportId) - actionHistory.trackExistingInputReport(queueMessage.reportId) - val format = Report.getFormatFromBlobURL(queueMessage.blobURL) + actionLogger.setReportId(input.reportId) + val format = Report.getFormatFromBlobURL(input.blobURL) logger.info("Starting FHIR Convert step") // This line is a workaround for a defect in the hapi-fhir library @@ -135,7 +258,7 @@ class FHIRConverter( // TODO: https://github.com/CDCgov/prime-reportstream/issues/14287 FhirPathUtils - val processedItems = process(format, queueMessage, actionLogger) + val processedItems = process(format, input.blobURL, input.blobDigest, input.topic, actionLogger) // processedItems can be empty in three scenarios: // - the blob had no contents, i.e. an empty file was submitted @@ -146,7 +269,7 @@ class FHIRConverter( "Applied sender transform and routed" ) { val transformer = getTransformerFromSchema( - schemaName + input.schemaName ) maybeParallelize( @@ -162,10 +285,10 @@ class FHIRConverter( MimeFormat.FHIR, emptyList(), parentItemLineageData = listOf( - Report.ParentItemLineageData(queueMessage.reportId, itemIndex.toInt() + 1) + Report.ParentItemLineageData(input.reportId, itemIndex.toInt() + 1) ), metadata = this.metadata, - topic = queueMessage.topic, + topic = input.topic, nextAction = TaskAction.none ) val noneEvent = ProcessEvent( @@ -182,14 +305,15 @@ class FHIRConverter( report, TaskAction.convert, processedItem.validationError!!.message, + shouldQueue = true ) { - parentReportId(queueMessage.reportId) + parentReportId(input.reportId) parentItemIndex(itemIndex.toInt() + 1) params( mapOf( ReportStreamEventProperties.ITEM_FORMAT to format, ReportStreamEventProperties.VALIDATION_PROFILE - to queueMessage.topic.validator.validatorProfileName + to input.topic.validator.validatorProfileName ) ) } @@ -205,10 +329,10 @@ class FHIRConverter( MimeFormat.FHIR, emptyList(), parentItemLineageData = listOf( - Report.ParentItemLineageData(queueMessage.reportId, itemIndex.toInt() + 1) + Report.ParentItemLineageData(input.reportId, itemIndex.toInt() + 1) ), metadata = this.metadata, - topic = queueMessage.topic, + topic = input.topic, nextAction = TaskAction.destination_filter ) @@ -227,7 +351,7 @@ class FHIRConverter( MimeFormat.FHIR, bodyBytes, report.id.toString(), - queueMessage.blobSubFolderName, + input.blobSubFolderName, routeEvent.eventAction ) report.bodyURL = blobInfo.blobUrl @@ -248,9 +372,10 @@ class FHIRConverter( reportEventService.sendItemEvent( ReportStreamEventName.ITEM_ACCEPTED, report, - TaskAction.convert + TaskAction.convert, + shouldQueue = true ) { - parentReportId(queueMessage.reportId) + parentReportId(input.reportId) parentItemIndex(itemIndex.toInt() + 1) trackingId(bundle) params( @@ -270,8 +395,8 @@ class FHIRConverter( report.id, blobInfo.blobUrl, BlobUtils.digestToString(blobInfo.digest), - queueMessage.blobSubFolderName, - queueMessage.topic + input.blobSubFolderName, + input.topic ) ) } @@ -283,7 +408,7 @@ class FHIRConverter( emptyList(), 0, metadata = this.metadata, - topic = queueMessage.topic, + topic = input.topic, nextAction = TaskAction.none ) actionHistory.trackEmptyReport(report) @@ -293,7 +418,7 @@ class FHIRConverter( TaskAction.convert, "Submitted report was either empty or could not be parsed into HL7" ) { - parentReportId(queueMessage.reportId) + parentReportId(input.reportId) params( mapOf( ReportStreamEventProperties.ITEM_FORMAT to format @@ -325,12 +450,14 @@ class FHIRConverter( */ internal fun process( format: MimeFormat, - queueMessage: FhirConvertQueueMessage, + blobURL: String, + blobDigest: String, + topic: Topic, actionLogger: ActionLogger, routeReportWithInvalidItems: Boolean = true, ): List> { - val validator = queueMessage.topic.validator - val rawReport = BlobAccess.downloadBlob(queueMessage.blobURL, queueMessage.digest) + val validator = topic.validator + val rawReport = BlobAccess.downloadBlob(blobURL, blobDigest) return if (rawReport.isBlank()) { actionLogger.error(InvalidReportMessage("Provided raw data is empty.")) emptyList() @@ -344,7 +471,7 @@ class FHIRConverter( "format" to format.name ) ) { - getBundlesFromRawHL7(rawReport, validator, queueMessage.topic.hl7ParseConfiguration) + getBundlesFromRawHL7(rawReport, validator, topic.hl7ParseConfiguration) } } catch (ex: ParseFailureError) { actionLogger.error( @@ -628,4 +755,15 @@ class FHIRConverter( } else { null } +} + +/** + * Exception generated if the sender ID from a message generated by Submissions service cannot be found + * + * @param senderId the full name of the missing sender, will be empty string if no sender was on the message + * @param reportId the unique identifier for the report which can be located in the azure table + * @param blobURL the blob URL for the report + */ +class SubmissionSenderNotFound(senderId: String, val reportId: UUID, val blobURL: String) : RuntimeException() { + override val message = "No sender was found for: $senderId" } \ No newline at end of file diff --git a/prime-router/src/main/kotlin/fhirengine/engine/FHIRDestinationFilter.kt b/prime-router/src/main/kotlin/fhirengine/engine/FHIRDestinationFilter.kt index 9f92a2b85b4..5be29b4626e 100644 --- a/prime-router/src/main/kotlin/fhirengine/engine/FHIRDestinationFilter.kt +++ b/prime-router/src/main/kotlin/fhirengine/engine/FHIRDestinationFilter.kt @@ -26,6 +26,7 @@ import gov.cdc.prime.router.azure.observability.context.MDCUtils import gov.cdc.prime.router.azure.observability.context.withLoggingContext import gov.cdc.prime.router.azure.observability.event.AzureEventService import gov.cdc.prime.router.azure.observability.event.AzureEventServiceImpl +import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService import gov.cdc.prime.router.azure.observability.event.ReportStreamEventName import gov.cdc.prime.router.azure.observability.event.ReportStreamEventProperties import gov.cdc.prime.router.fhirengine.translation.hl7.utils.CustomContext @@ -50,7 +51,8 @@ class FHIRDestinationFilter( blob: BlobAccess = BlobAccess(), azureEventService: AzureEventService = AzureEventServiceImpl(), reportService: ReportService = ReportService(), -) : FHIREngine(metadata, settings, db, blob, azureEventService, reportService) { + reportStreamEventService: IReportStreamEventService, +) : FHIREngine(metadata, settings, db, blob, azureEventService, reportService, reportStreamEventService) { override val finishedField: Field = Tables.TASK.DESTINATION_FILTERED_AT override val engineType: String = "DestinationFilter" diff --git a/prime-router/src/main/kotlin/fhirengine/engine/FHIREngine.kt b/prime-router/src/main/kotlin/fhirengine/engine/FHIREngine.kt index dd0d051c889..d32ad4269c7 100644 --- a/prime-router/src/main/kotlin/fhirengine/engine/FHIREngine.kt +++ b/prime-router/src/main/kotlin/fhirengine/engine/FHIREngine.kt @@ -11,6 +11,7 @@ import gov.cdc.prime.router.azure.BlobAccess import gov.cdc.prime.router.azure.DataAccessTransaction import gov.cdc.prime.router.azure.DatabaseAccess import gov.cdc.prime.router.azure.Event +import gov.cdc.prime.router.azure.SubmissionTableService import gov.cdc.prime.router.azure.db.enums.TaskAction import gov.cdc.prime.router.azure.observability.event.AzureEventService import gov.cdc.prime.router.azure.observability.event.AzureEventServiceImpl @@ -39,11 +40,7 @@ abstract class FHIREngine( val blob: BlobAccess = BlobAccess(), val azureEventService: AzureEventService = AzureEventServiceImpl(), val reportService: ReportService = ReportService(ReportGraph(db), db), - val reportEventService: IReportStreamEventService = ReportStreamEventService( - db, - azureEventService, - reportService - ), + val reportEventService: IReportStreamEventService, ) : BaseEngine() { /** @@ -65,6 +62,7 @@ abstract class FHIREngine( var azureEventService: AzureEventService? = null, var reportService: ReportService? = null, var reportEventService: IReportStreamEventService? = null, + var submissionTableService: SubmissionTableService? = null, ) { /** * Set the metadata instance. @@ -110,6 +108,10 @@ abstract class FHIREngine( this.reportEventService = reportEventService } + fun submissionTableService(submissionTableService: SubmissionTableService) = apply { + this.submissionTableService = submissionTableService + } + /** * Build the fhir engine instance. * @return the fhir engine instance @@ -123,21 +125,18 @@ abstract class FHIREngine( // create the correct FHIREngine type for the action being taken return when (taskAction) { - TaskAction.receive -> FHIRReceiver( - metadata ?: Metadata.getInstance(), - settingsProvider!!, - databaseAccess ?: databaseAccessSingleton, - blobAccess ?: BlobAccess(), - azureEventService ?: AzureEventServiceImpl(), - reportService ?: ReportService(), - ) TaskAction.process -> FHIRConverter( metadata ?: Metadata.getInstance(), settingsProvider!!, databaseAccess ?: databaseAccessSingleton, blobAccess ?: BlobAccess(), azureEventService ?: AzureEventServiceImpl(), - reportService ?: ReportService() + reportService ?: ReportService(), + ReportStreamEventService( + databaseAccess ?: databaseAccessSingleton, + azureEventService ?: AzureEventServiceImpl(), + reportService ?: ReportService() + ) ) TaskAction.destination_filter -> FHIRDestinationFilter( metadata ?: Metadata.getInstance(), @@ -145,7 +144,12 @@ abstract class FHIREngine( databaseAccess ?: databaseAccessSingleton, blobAccess ?: BlobAccess(), azureEventService ?: AzureEventServiceImpl(), - reportService ?: ReportService() + reportService ?: ReportService(), + ReportStreamEventService( + databaseAccess ?: databaseAccessSingleton, + azureEventService ?: AzureEventServiceImpl(), + reportService ?: ReportService() + ) ) TaskAction.receiver_filter -> FHIRReceiverFilter( metadata ?: Metadata.getInstance(), @@ -154,13 +158,24 @@ abstract class FHIREngine( blobAccess ?: BlobAccess(), azureEventService ?: AzureEventServiceImpl(), reportService ?: ReportService(), + ReportStreamEventService( + databaseAccess ?: databaseAccessSingleton, + azureEventService ?: AzureEventServiceImpl(), + reportService ?: ReportService() + ) ) TaskAction.translate -> FHIRTranslator( metadata ?: Metadata.getInstance(), settingsProvider!!, databaseAccess ?: databaseAccessSingleton, blobAccess ?: BlobAccess(), - azureEventService ?: AzureEventServiceImpl() + azureEventService ?: AzureEventServiceImpl(), + reportService ?: ReportService(), + ReportStreamEventService( + databaseAccess ?: databaseAccessSingleton, + azureEventService ?: AzureEventServiceImpl(), + reportService ?: ReportService() + ) ) else -> throw NotImplementedError("Invalid action type for FHIR engine") } diff --git a/prime-router/src/main/kotlin/fhirengine/engine/FHIRReceiver.kt b/prime-router/src/main/kotlin/fhirengine/engine/FHIRReceiver.kt deleted file mode 100644 index 9bea5e891ae..00000000000 --- a/prime-router/src/main/kotlin/fhirengine/engine/FHIRReceiver.kt +++ /dev/null @@ -1,379 +0,0 @@ -package gov.cdc.prime.router.fhirengine.engine - -import ca.uhn.hl7v2.model.Message -import com.microsoft.azure.functions.HttpStatus -import gov.cdc.prime.reportstream.shared.QueueMessage -import gov.cdc.prime.reportstream.shared.Submission -import gov.cdc.prime.router.ActionLogger -import gov.cdc.prime.router.ClientSource -import gov.cdc.prime.router.CustomerStatus -import gov.cdc.prime.router.InvalidParamMessage -import gov.cdc.prime.router.InvalidReportMessage -import gov.cdc.prime.router.Metadata -import gov.cdc.prime.router.MimeFormat -import gov.cdc.prime.router.Options -import gov.cdc.prime.router.Report -import gov.cdc.prime.router.Sender -import gov.cdc.prime.router.SettingsProvider -import gov.cdc.prime.router.azure.ActionHistory -import gov.cdc.prime.router.azure.BlobAccess -import gov.cdc.prime.router.azure.DatabaseAccess -import gov.cdc.prime.router.azure.Event -import gov.cdc.prime.router.azure.ProcessEvent -import gov.cdc.prime.router.azure.SubmissionTableService -import gov.cdc.prime.router.azure.db.Tables -import gov.cdc.prime.router.azure.db.enums.TaskAction -import gov.cdc.prime.router.azure.observability.context.MDCUtils -import gov.cdc.prime.router.azure.observability.context.withLoggingContext -import gov.cdc.prime.router.azure.observability.event.AzureEventService -import gov.cdc.prime.router.azure.observability.event.AzureEventServiceImpl -import gov.cdc.prime.router.azure.observability.event.ReportStreamEventName -import gov.cdc.prime.router.azure.observability.event.ReportStreamEventProperties -import gov.cdc.prime.router.common.AzureHttpUtils.getSenderIP -import gov.cdc.prime.router.fhirengine.utils.FhirTranscoder -import gov.cdc.prime.router.fhirengine.utils.HL7Reader -import gov.cdc.prime.router.report.ReportService -import org.jooq.Field -import java.time.OffsetDateTime - -/** - * FHIRReceiver is responsible for processing messages from the elr-fhir-receive azure queue - * and storing them for the next step in the pipeline. - * - * @param metadata Mockable metadata instance. - * @param settings Mockable settings provider. - * @param db Mockable database access. - * @param blob Mockable blob storage access. - * @param azureEventService Service for handling Azure events. - * @param reportService Service for handling report-related operations. - * @param submissionTableService Service for inserting to the submission azure storage table. - */ -class FHIRReceiver( - metadata: Metadata = Metadata.getInstance(), - settings: SettingsProvider = this.settingsProviderSingleton, - db: DatabaseAccess = this.databaseAccessSingleton, - blob: BlobAccess = BlobAccess(), - azureEventService: AzureEventService = AzureEventServiceImpl(), - reportService: ReportService = ReportService(), - val submissionTableService: SubmissionTableService = SubmissionTableService.getInstance(), -) : FHIREngine(metadata, settings, db, blob, azureEventService, reportService) { - - override val finishedField: Field = Tables.TASK.PROCESSED_AT - - override val engineType: String = "Receive" - override val taskAction: TaskAction = TaskAction.receive - - private val clientIdHeader = "client_id" - private val contentTypeHeader = "content-type" - - /** - * Processes a message of type [QueueMessage]. This message can be in either HL7 or FHIR format and will be placed - * on a queue for further processing. - * - * @param message The incoming message to be logged and processed. - * @param actionLogger Logger to track actions and errors. - * @param actionHistory Tracks the history of actions performed. - * @return A list of results from the FHIR engine run. - */ - override fun doWork( - message: T, - actionLogger: ActionLogger, - actionHistory: ActionHistory, - ): List = when (message) { - is FhirReceiveQueueMessage -> processFhirReceiveQueueMessage(message, actionLogger, actionHistory) - else -> throw RuntimeException("Message was not a FhirReceive and cannot be processed: $message") - } - - /** - * Processes the FHIR receive queue message. - * - * @param queueMessage The queue message containing details about the report. - * @param actionLogger The logger used to track actions and errors. - * @param actionHistory The action history related to receiving the report. - * @return A list of FHIR engine run results. - */ - private fun processFhirReceiveQueueMessage( - queueMessage: FhirReceiveQueueMessage, - actionLogger: ActionLogger, - actionHistory: ActionHistory, - ): List { - val contextMap = createLoggingContextMap(queueMessage, actionHistory) - // Use the logging context for tracing - withLoggingContext(contextMap) { - actionLogger.setReportId(queueMessage.reportId) - val sender = getSender(queueMessage, actionLogger, actionHistory) ?: return emptyList() - - // Process the message if no errors occurred - return handleSuccessfulProcessing(queueMessage, sender, actionLogger, actionHistory) - } - } - - /** - * Creates the logging context map. - * - * @param queueMessage The queue message containing details about the report. - * @param actionHistory The action history related to receiving the report. - * @return The logging context map. - */ - private fun createLoggingContextMap( - queueMessage: FhirReceiveQueueMessage, - actionHistory: ActionHistory, - ): Map = mapOf( - MDCUtils.MDCProperty.ACTION_NAME to actionHistory.action.actionName.name, - MDCUtils.MDCProperty.REPORT_ID to queueMessage.reportId, - MDCUtils.MDCProperty.BLOB_URL to queueMessage.blobURL, - ) - - /** - * Retrieves the sender based on the queue message and logs any relevant errors. - * - * @param queueMessage The queue message containing details about the report. - * @param actionLogger The logger used to track actions and errors. - * @param actionHistory The action history related to receiving the report. - * @return The sender, or null if the sender was not found or is inactive. - */ - private fun getSender( - queueMessage: FhirReceiveQueueMessage, - actionLogger: ActionLogger, - actionHistory: ActionHistory, - ): Sender? { - val clientId = queueMessage.headers[clientIdHeader] - val sender = clientId?.takeIf { it.isNotBlank() }?.let { settings.findSender(it) } - - actionHistory.trackActionParams(queueMessage.headers.toString()) - - // Handle case where sender is not found - return if (sender == null) { - // Send an error event - reportEventService.sendSubmissionProcessingError( - ReportStreamEventName.REPORT_NOT_RECEIVABLE, - TaskAction.receive, - "Sender is not found in matching client id: ${queueMessage.headers[clientIdHeader]}.", - queueMessage.reportId, - queueMessage.blobURL - ) { - params( - actionLogger.errors.associateBy { ReportStreamEventProperties.PROCESSING_ERROR } - .plus( - mapOf( - ReportStreamEventProperties.REQUEST_PARAMETERS to queueMessage.headers.toString(), - ) - ) - ) - } - - // Insert the rejection into the submission table - val submission = - Submission( - queueMessage.reportId.toString(), "Rejected", - queueMessage.blobURL, - "Sender not found matching client_id: ${queueMessage.headers[clientIdHeader]}" - ) - submissionTableService.insertSubmission(submission) - null - } else { - // Handle case where sender is inactive - if (sender.customerStatus == CustomerStatus.INACTIVE) { - // Track the action result and log the error - actionHistory.trackActionResult(HttpStatus.NOT_ACCEPTABLE) - actionLogger.error( - InvalidParamMessage("Sender has customer status INACTIVE: " + queueMessage.headers[clientIdHeader]) - ) - } - - // Track sender information - actionHistory.trackActionSenderInfo(sender.fullName, queueMessage.headers["payloadname"]) - actionHistory.trackActionResult(HttpStatus.CREATED) - sender - } - } - - /** - * Handles successful processing of the queue message. - * - * @param queueMessage The queue message containing details about the report. - * @param sender The sender information. - * @param actionHistory The action history related to receiving the report. - * @return A list of FHIR engine run results. - */ - private fun handleSuccessfulProcessing( - queueMessage: FhirReceiveQueueMessage, - sender: Sender, - actionLogger: ActionLogger, - actionHistory: ActionHistory, - ): List { - // Get content from blob storage and create report - val report = validateSubmissionMessage(sender, actionLogger, queueMessage) ?: return emptyList() - - // Determine the mime format of the message - val mimeFormat = - MimeFormat.valueOfFromMimeType( - queueMessage.headers[contentTypeHeader]?.substringBefore(';') ?: "" - ) - - val blobInfo = BlobAccess.BlobInfo( - mimeFormat, - queueMessage.blobURL, - queueMessage.digest.toByteArray() - ) - - actionHistory.trackExternalInputReport( - report, - blobInfo - ) - - // Send an event indicating the report was received - reportEventService.sendReportEvent( - eventName = ReportStreamEventName.REPORT_RECEIVED, - childReport = report, - pipelineStepName = TaskAction.receive - ) { - params( - listOfNotNull( - ReportStreamEventProperties.REQUEST_PARAMETERS to queueMessage.headers.toString(), - ReportStreamEventProperties.SENDER_NAME to sender.fullName, - ReportStreamEventProperties.FILE_LENGTH to queueMessage.headers["content-length"].toString(), - getSenderIP(queueMessage.headers)?.let { ReportStreamEventProperties.SENDER_IP to it }, - ReportStreamEventProperties.ITEM_FORMAT to mimeFormat - ).toMap() - ) - } - - // Insert the acceptance into the submissions table - val tableEntity = Submission( - queueMessage.reportId.toString(), - "Accepted", - queueMessage.blobURL, - actionLogger.errors.takeIf { it.isNotEmpty() }?.map { it.detail.message }?.toString() - ) - submissionTableService.insertSubmission(tableEntity) - - return if (actionLogger.errors.isNotEmpty()) { - // Send an event indicating the report was received - reportEventService.sendReportProcessingError( - ReportStreamEventName.REPORT_NOT_PROCESSABLE, - report, - TaskAction.receive, - "Submitted report was either empty or could not be parsed." - ) { - params( - actionLogger.errors.associateBy { ReportStreamEventProperties.PROCESSING_ERROR } - .plus( - mapOf( - ReportStreamEventProperties.REQUEST_PARAMETERS to queueMessage.headers.toString(), - ) - ) - ) - } - emptyList() - } else { - // Create a route event - val routeEvent = ProcessEvent(Event.EventAction.CONVERT, report.id, Options.None, emptyMap(), emptyList()) - - // Return the result of the FHIR engine run - listOf( - FHIREngineRunResult( - routeEvent, - report, - queueMessage.blobURL, - FhirConvertQueueMessage( - report.id, - queueMessage.blobURL, - queueMessage.digest, - queueMessage.blobSubFolderName, - sender.topic, - sender.schemaName - ) - ) - ) - } - } - - private fun validateSubmissionMessage( - sender: Sender, - actionLogger: ActionLogger, - queueMessage: FhirReceiveQueueMessage, - ): Report? { - val rawReport = BlobAccess.downloadBlob(queueMessage.blobURL, queueMessage.digest) - return if (rawReport.isBlank()) { - actionLogger.error(InvalidReportMessage("Provided raw data is empty.")) - null - } else { - val report: Report - val sources = listOf(ClientSource(organization = sender.organizationName, client = sender.name)) - - when (sender.format) { - MimeFormat.HL7 -> { - val messages: List = HL7Reader(actionLogger).getMessages(rawReport) - val isBatch: Boolean = HL7Reader(actionLogger).isBatch(rawReport, messages.size) - // create a Report for this incoming HL7 message to use for tracking in the database - - report = Report( - if (isBatch) MimeFormat.HL7_BATCH else MimeFormat.HL7, - sources, - messages.size, - metadata = metadata, - nextAction = TaskAction.convert, - topic = sender.topic, - id = queueMessage.reportId, - bodyURL = queueMessage.blobURL - ) - - // check for valid message type - messages.forEachIndexed { idx, element -> - MessageType.validateMessageType(element, actionLogger, idx + 1) - } - } - - MimeFormat.FHIR -> { - val bundles = FhirTranscoder.getBundles(rawReport, actionLogger) - report = Report( - MimeFormat.FHIR, - sources, - bundles.size, - metadata = metadata, - nextAction = TaskAction.convert, - topic = sender.topic, - id = queueMessage.reportId, - bodyURL = queueMessage.blobURL - ) - } - - else -> { - actionLogger.error(InvalidReportMessage("Unsupported sender format: ${sender.format}")) - reportEventService.sendSubmissionProcessingError( - ReportStreamEventName.REPORT_NOT_PROCESSABLE, - TaskAction.receive, - "Unsupported sender format ${sender.format}.", - queueMessage.reportId, - queueMessage.blobURL - ) { - params( - actionLogger.errors.associateBy { ReportStreamEventProperties.PROCESSING_ERROR } - .plus( - mapOf( - ReportStreamEventProperties.REQUEST_PARAMETERS to queueMessage.headers.toString(), - ReportStreamEventProperties.SENDER_NAME to sender.fullName, - ReportStreamEventProperties.FILE_LENGTH to queueMessage.headers["content-length"] - .toString(), - ReportStreamEventProperties.SENDER_IP to (getSenderIP(queueMessage.headers) ?: ""), - ReportStreamEventProperties.ITEM_FORMAT to sender.format - ) - ) - ) - } - // Insert the acceptance into the submissions table - val tableEntity = Submission( - queueMessage.reportId.toString(), - "Rejected", - queueMessage.blobURL, - actionLogger.errors.takeIf { it.isNotEmpty() }?.map { it.detail.message }?.toString() - ) - submissionTableService.insertSubmission(tableEntity) - throw IllegalStateException("Unsupported sender format: ${sender.format}") - } - } - report - } - } -} \ No newline at end of file diff --git a/prime-router/src/main/kotlin/fhirengine/engine/FHIRReceiverFilter.kt b/prime-router/src/main/kotlin/fhirengine/engine/FHIRReceiverFilter.kt index aa11ba34f13..4f9a0998923 100644 --- a/prime-router/src/main/kotlin/fhirengine/engine/FHIRReceiverFilter.kt +++ b/prime-router/src/main/kotlin/fhirengine/engine/FHIRReceiverFilter.kt @@ -31,6 +31,7 @@ import gov.cdc.prime.router.azure.observability.context.withLoggingContext import gov.cdc.prime.router.azure.observability.event.AzureEventService import gov.cdc.prime.router.azure.observability.event.AzureEventServiceImpl import gov.cdc.prime.router.azure.observability.event.AzureEventUtils +import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService import gov.cdc.prime.router.azure.observability.event.ReportStreamEventName import gov.cdc.prime.router.azure.observability.event.ReportStreamEventProperties import gov.cdc.prime.router.codes @@ -64,7 +65,9 @@ class FHIRReceiverFilter( blob: BlobAccess = BlobAccess(), azureEventService: AzureEventService = AzureEventServiceImpl(), reportService: ReportService = ReportService(ReportGraph(db), db), -) : FHIREngine(metadata, settings, db, blob, azureEventService, reportService) { + reportStreamEventService: IReportStreamEventService, +) : FHIREngine(metadata, settings, db, blob, azureEventService, reportService, reportStreamEventService) { + override val finishedField: Field = Tables.TASK.RECEIVER_FILTERED_AT override val engineType: String = "ReceiverFilter" diff --git a/prime-router/src/main/kotlin/fhirengine/engine/FHIRTranslator.kt b/prime-router/src/main/kotlin/fhirengine/engine/FHIRTranslator.kt index 5d319720ddd..674ea533330 100644 --- a/prime-router/src/main/kotlin/fhirengine/engine/FHIRTranslator.kt +++ b/prime-router/src/main/kotlin/fhirengine/engine/FHIRTranslator.kt @@ -25,6 +25,7 @@ import gov.cdc.prime.router.azure.observability.context.MDCUtils import gov.cdc.prime.router.azure.observability.context.withLoggingContext import gov.cdc.prime.router.azure.observability.event.AzureEventService import gov.cdc.prime.router.azure.observability.event.AzureEventServiceImpl +import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService import gov.cdc.prime.router.common.Environment import gov.cdc.prime.router.fhirengine.config.HL7TranslationConfig import gov.cdc.prime.router.fhirengine.translation.hl7.FhirToHl7Context @@ -53,7 +54,8 @@ class FHIRTranslator( blob: BlobAccess = BlobAccess(), azureEventService: AzureEventService = AzureEventServiceImpl(), reportService: ReportService = ReportService(), -) : FHIREngine(metadata, settings, db, blob, azureEventService, reportService) { + reportStreamEventService: IReportStreamEventService, +) : FHIREngine(metadata, settings, db, blob, azureEventService, reportService, reportStreamEventService) { /** * Accepts a [FhirTranslateQueueMessage] [message] and, based on its parameters, sends a report to the next pipeline * step containing either the first ancestor's blob or a new blob that has been translated per diff --git a/prime-router/src/main/kotlin/fhirengine/engine/PrimeRouterQueueMessage.kt b/prime-router/src/main/kotlin/fhirengine/engine/PrimeRouterQueueMessage.kt index 6350a676a33..f547b24b975 100644 --- a/prime-router/src/main/kotlin/fhirengine/engine/PrimeRouterQueueMessage.kt +++ b/prime-router/src/main/kotlin/fhirengine/engine/PrimeRouterQueueMessage.kt @@ -37,14 +37,14 @@ abstract class ReportPipelineMessage : PrimeRouterQueueMessage() @JsonTypeName("receive") -data class FhirReceiveQueueMessage( +data class FhirConvertSubmissionQueueMessage( override val reportId: ReportId, override val blobURL: String, override val digest: String, override val blobSubFolderName: String, override val headers: Map = emptyMap(), ) : ReportPipelineMessage(), QueueMessage.ReceiveInformation { - override val messageQueueName = QueueMessage.Companion.elrReceiveQueueName + override val messageQueueName = QueueMessage.Companion.elrSubmissionConvertQueueName } @JsonTypeName("convert") diff --git a/prime-router/src/test/kotlin/azure/ActionHistoryTests.kt b/prime-router/src/test/kotlin/azure/ActionHistoryTests.kt index c6b6c5389a6..b6f26ae29b0 100644 --- a/prime-router/src/test/kotlin/azure/ActionHistoryTests.kt +++ b/prime-router/src/test/kotlin/azure/ActionHistoryTests.kt @@ -376,9 +376,9 @@ class ActionHistoryTests { "" ) every { - mockReportEventService.sendReportEvent(any(), any(), any(), any()) + mockReportEventService.sendReportEvent(any(), any(), any(), any(), any()) } returns Unit - every { mockReportEventService.sendItemEvent(any(), any(), any(), any()) } returns Unit + every { mockReportEventService.sendItemEvent(any(), any(), any(), any(), any()) } returns Unit mockkObject(Report) mockkObject(FhirTranscoder) every { FhirTranscoder.decode(any(), any()) } returns mockk() @@ -433,8 +433,8 @@ class ActionHistoryTests { assertThat(reportFile.itemCount).isEqualTo(15) assertThat(actionHistory1.action.externalName).isEqualTo("filename1") verify(exactly = 1) { - mockReportEventService.sendReportEvent(any(), any(), any(), any()) - mockReportEventService.sendItemEvent(any(), any(), any(), any()) + mockReportEventService.sendReportEvent(any(), any(), any(), any(), any()) + mockReportEventService.sendItemEvent(any(), any(), any(), any(), any()) } // not allowed to track the same report twice. assertFailure { @@ -516,9 +516,9 @@ class ActionHistoryTests { every { anyConstructed().generateDigest(any()) } returns mockk() val header = mockk() every { - mockReportEventService.sendReportEvent(any(), any(), any(), any()) + mockReportEventService.sendReportEvent(any(), any(), any(), any(), any()) } returns Unit - every { mockReportEventService.sendItemEvent(any(), any(), any(), any()) } returns Unit + every { mockReportEventService.sendItemEvent(any(), any(), any(), any(), any()) } returns Unit val inReportFile = mockk() every { header.reportFile } returns inReportFile every { header.content } returns "".toByteArray() @@ -571,8 +571,8 @@ class ActionHistoryTests { assertThat(actionHistory2.reportsOut[uuid]?.schemaName) .isEqualTo("STED/NESTED/STLTs/REALLY_LONG_STATE_NAME/REALLY_LONG_STATE_NAME") verify(exactly = 2) { - mockReportEventService.sendReportEvent(any(), any(), any(), any()) - mockReportEventService.sendItemEvent(any(), any(), any(), any()) + mockReportEventService.sendReportEvent(any(), any(), any(), any(), any()) + mockReportEventService.sendItemEvent(any(), any(), any(), any(), any()) } } @@ -784,9 +784,9 @@ class ActionHistoryTests { every { anyConstructed().generateDigest(any()) } returns mockk() val header = mockk() every { - mockReportEventService.sendReportEvent(any(), any(), any(), any()) + mockReportEventService.sendReportEvent(any(), any(), any(), any(), any()) } returns Unit - every { mockReportEventService.sendItemEvent(any(), any(), any(), any()) } returns Unit + every { mockReportEventService.sendItemEvent(any(), any(), any(), any(), any()) } returns Unit val inReportFile = mockk() every { header.reportFile } returns inReportFile every { header.content } returns "".toByteArray() @@ -837,8 +837,8 @@ class ActionHistoryTests { assertContains(blobUrls[0], org.receivers[0].fullName) assertContains(blobUrls[1], org.receivers[1].fullName) verify(exactly = 2) { - mockReportEventService.sendReportEvent(any(), any(), any(), any()) - mockReportEventService.sendItemEvent(any(), any(), any(), any()) + mockReportEventService.sendReportEvent(any(), any(), any(), any(), any()) + mockReportEventService.sendItemEvent(any(), any(), any(), any(), any()) } } diff --git a/prime-router/src/test/kotlin/azure/FHIRFunctionsTests.kt b/prime-router/src/test/kotlin/azure/FHIRFunctionsTests.kt index 1d3454c1824..6ccd26f0852 100644 --- a/prime-router/src/test/kotlin/azure/FHIRFunctionsTests.kt +++ b/prime-router/src/test/kotlin/azure/FHIRFunctionsTests.kt @@ -66,7 +66,11 @@ class FHIRFunctionsTests { .databaseAccess(accessSpy) .build() every { accessSpy.fetchReportFile(any()) } returns mockk(relaxed = true) - return FHIRFunctions(workflowEngine, databaseAccess = accessSpy) + return FHIRFunctions( + workflowEngine, + databaseAccess = accessSpy, + submissionTableService = mockk() + ) } @Test @@ -76,7 +80,14 @@ class FHIRFunctionsTests { val mockReportEventService = mockk(relaxed = true) val init = slot Unit>() every { - mockReportEventService.sendReportProcessingError(any(), any(), any(), any(), capture(init)) + mockReportEventService.sendReportProcessingError( + any(), + any(), + any(), + any(), + any(), + capture(init) + ) } returns Unit val mockFHIRConverter = mockk(relaxed = true) every { mockFHIRConverter.run(any(), any(), any(), any()) } throws RuntimeException("Error") @@ -94,6 +105,7 @@ class FHIRFunctionsTests { any(), TaskAction.convert, "Error", + any(), init.captured ) } diff --git a/prime-router/src/test/kotlin/azure/observability/event/ReportEventServiceTest.kt b/prime-router/src/test/kotlin/azure/observability/event/ReportEventServiceTest.kt index 2604cffdee4..00360b9600b 100644 --- a/prime-router/src/test/kotlin/azure/observability/event/ReportEventServiceTest.kt +++ b/prime-router/src/test/kotlin/azure/observability/event/ReportEventServiceTest.kt @@ -202,7 +202,7 @@ class ReportEventServiceTest { val data = reportEventService.getItemEventData( 1, - translateNode.node.reportId, + translateNode.node.reportId, 1, "" ) diff --git a/prime-router/src/test/kotlin/common/UniversalPipelineTestUtils.kt b/prime-router/src/test/kotlin/common/UniversalPipelineTestUtils.kt index f158a08993b..45f03b9514e 100644 --- a/prime-router/src/test/kotlin/common/UniversalPipelineTestUtils.kt +++ b/prime-router/src/test/kotlin/common/UniversalPipelineTestUtils.kt @@ -19,6 +19,7 @@ import gov.cdc.prime.router.azure.BlobAccess import gov.cdc.prime.router.azure.DataAccessTransaction import gov.cdc.prime.router.azure.Event import gov.cdc.prime.router.azure.ProcessEvent +import gov.cdc.prime.router.azure.SubmissionTableService import gov.cdc.prime.router.azure.WorkflowEngine import gov.cdc.prime.router.azure.db.Tables import gov.cdc.prime.router.azure.db.enums.TaskAction @@ -30,6 +31,7 @@ import gov.cdc.prime.router.db.ReportStreamTestDatabaseContainer import gov.cdc.prime.router.fhirengine.azure.FHIRFunctions import gov.cdc.prime.router.metadata.LookupTable import gov.cdc.prime.router.unittest.UnitTestUtils +import io.mockk.mockk import org.jooq.impl.DSL import org.testcontainers.containers.GenericContainer import java.io.File @@ -282,6 +284,7 @@ object UniversalPipelineTestUtils { txn: DataAccessTransaction, expectedItems: Int? = null, expectedReports: Int = 1, + parentIsRoot: Boolean = false, ): List { val itemLineages = DSL .using(txn) @@ -297,6 +300,8 @@ object UniversalPipelineTestUtils { // itemCount is on the report created by the test. It will not be null. if (parent.itemCount > 1) { assertThat(itemLineages.map { it.parentIndex }).isEqualTo((1..expectedItems).toList()) + } else if (parentIsRoot) { + assertThat(itemLineages.map { it.parentIndex }).isEqualTo((1..expectedItems).toList()) } else { assertThat(itemLineages.map { it.parentIndex }).isEqualTo(MutableList(expectedItems) { 1 }) } @@ -399,7 +404,11 @@ object UniversalPipelineTestUtils { .settingsProvider(settings) .databaseAccess(ReportStreamTestDatabaseContainer.testDatabaseAccess) .build() - return FHIRFunctions(workflowEngine, databaseAccess = ReportStreamTestDatabaseContainer.testDatabaseAccess) + return FHIRFunctions( + workflowEngine, + databaseAccess = ReportStreamTestDatabaseContainer.testDatabaseAccess, + submissionTableService = mockk() + ) } fun getBlobContainerMetadata(azuriteContainer: GenericContainer<*>): BlobAccess.BlobContainerMetadata { diff --git a/prime-router/src/test/kotlin/fhirengine/azure/FHIRConverterIntegrationTests.kt b/prime-router/src/test/kotlin/fhirengine/azure/FHIRConverterIntegrationTests.kt index bd483b02079..a5a553442a5 100644 --- a/prime-router/src/test/kotlin/fhirengine/azure/FHIRConverterIntegrationTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/azure/FHIRConverterIntegrationTests.kt @@ -9,6 +9,8 @@ import assertk.assertions.isEqualToIgnoringGivenProperties import assertk.assertions.matchesPredicate import gov.cdc.prime.reportstream.shared.BlobUtils import gov.cdc.prime.reportstream.shared.QueueMessage +import gov.cdc.prime.router.ClientSource +import gov.cdc.prime.router.CustomerStatus import gov.cdc.prime.router.FileSettings import gov.cdc.prime.router.Metadata import gov.cdc.prime.router.MimeFormat @@ -16,12 +18,14 @@ import gov.cdc.prime.router.Options import gov.cdc.prime.router.Report import gov.cdc.prime.router.Sender import gov.cdc.prime.router.Topic +import gov.cdc.prime.router.UniversalPipelineSender import gov.cdc.prime.router.azure.ActionHistory import gov.cdc.prime.router.azure.BlobAccess import gov.cdc.prime.router.azure.DatabaseLookupTableAccess import gov.cdc.prime.router.azure.Event import gov.cdc.prime.router.azure.ProcessEvent import gov.cdc.prime.router.azure.QueueAccess +import gov.cdc.prime.router.azure.SubmissionTableService import gov.cdc.prime.router.azure.WorkflowEngine import gov.cdc.prime.router.azure.db.Tables import gov.cdc.prime.router.azure.db.enums.ActionLogType @@ -35,6 +39,7 @@ import gov.cdc.prime.router.azure.observability.event.ItemEventData import gov.cdc.prime.router.azure.observability.event.ReportEventData import gov.cdc.prime.router.azure.observability.event.ReportStreamEventName import gov.cdc.prime.router.azure.observability.event.ReportStreamEventProperties +import gov.cdc.prime.router.azure.observability.event.ReportStreamEventService import gov.cdc.prime.router.azure.observability.event.ReportStreamItemEvent import gov.cdc.prime.router.cli.tests.CompareData import gov.cdc.prime.router.common.TestcontainersUtils @@ -67,11 +72,14 @@ import gov.cdc.prime.router.fhirengine.engine.FHIRConverter import gov.cdc.prime.router.fhirengine.engine.FhirDestinationFilterQueueMessage import gov.cdc.prime.router.fhirengine.utils.FhirTranscoder import gov.cdc.prime.router.history.DetailedActionLog +import gov.cdc.prime.router.history.db.ReportGraph import gov.cdc.prime.router.metadata.LookupTable import gov.cdc.prime.router.metadata.ObservationMappingConstants +import gov.cdc.prime.router.report.ReportService import gov.cdc.prime.router.unittest.UnitTestUtils import gov.cdc.prime.router.version.Version import io.mockk.every +import io.mockk.mockk import io.mockk.mockkConstructor import io.mockk.mockkObject import io.mockk.unmockkAll @@ -80,6 +88,7 @@ import org.jooq.impl.DSL import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows import org.junit.jupiter.api.extension.ExtendWith import org.testcontainers.junit.jupiter.Container import org.testcontainers.junit.jupiter.Testcontainers @@ -87,6 +96,7 @@ import tech.tablesaw.api.StringColumn import tech.tablesaw.api.Table import java.nio.charset.Charset import java.time.OffsetDateTime +import java.util.UUID @Testcontainers @ExtendWith(ReportStreamTestDatabaseSetupExtension::class) @@ -101,6 +111,14 @@ class FHIRConverterIntegrationTests { ) val azureEventService = InMemoryAzureEventService() + val mockSubmissionTableService = mockk() + val reportStreamEventService = ReportStreamEventService( + ReportStreamTestDatabaseContainer.testDatabaseAccess, azureEventService, + ReportService( + ReportGraph(ReportStreamTestDatabaseContainer.testDatabaseAccess), + ReportStreamTestDatabaseContainer.testDatabaseAccess + ) + ) private fun createFHIRFunctionsInstance(): FHIRFunctions { val settings = FileSettings().loadOrganizations(universalPipelineOrganization) @@ -114,7 +132,14 @@ class FHIRConverterIntegrationTests { .settingsProvider(settings) .databaseAccess(ReportStreamTestDatabaseContainer.testDatabaseAccess) .build() - return FHIRFunctions(workflowEngine, databaseAccess = ReportStreamTestDatabaseContainer.testDatabaseAccess) + + return FHIRFunctions( + workflowEngine, + databaseAccess = ReportStreamTestDatabaseContainer.testDatabaseAccess, + submissionTableService = mockSubmissionTableService, + azureEventService = azureEventService, + reportStreamEventService = reportStreamEventService + ) } private fun createFHIRConverter(): FHIRConverter { @@ -128,19 +153,15 @@ class FHIRConverterIntegrationTests { settings, ReportStreamTestDatabaseContainer.testDatabaseAccess, azureEventService = azureEventService, + reportStreamEventService = reportStreamEventService ) } - private fun generateQueueMessage( + private fun generateFHIRConvertQueueMessage( report: Report, blobContents: String, sender: Sender, - headers: Map? = null, ): String { - val headersString = headers?.entries?.joinToString(separator = ",\n") { (key, value) -> - """"$key": "$value"""" - } ?: "" - return """ { "type": "convert", @@ -150,7 +171,29 @@ class FHIRConverterIntegrationTests { "blobSubFolderName": "${sender.fullName}", "topic": "${sender.topic.jsonVal}", "schemaName": "${sender.schemaName}" - ${if (headersString.isNotEmpty()) ",\n$headersString" else ""} + } + """.trimIndent() + } + + private fun generateFHIRConvertSubmissionQueueMessage( + report: Report, + blobContents: String, + sender: Sender, + ): String { + // TODO: something is wrong with the Jackson configuration as it should not require the type to parse this + val headers = mapOf("client_id" to sender.fullName) + val headersStringMap = headers.entries.joinToString(separator = ",\n") { (key, value) -> + """"$key": "$value"""" + } + val headersString = "[\"java.util.LinkedHashMap\",{$headersStringMap}]" + return """ + { + "type": "receive-fhir", + "reportId": "${report.id}", + "blobURL": "${report.bodyURL}", + "digest": "${BlobUtils.digestToString(BlobUtils.sha256Digest(blobContents.toByteArray()))}", + "blobSubFolderName": "${sender.fullName}", + "headers":$headersString } """.trimIndent() } @@ -164,6 +207,9 @@ class FHIRConverterIntegrationTests { mockkObject(BlobAccess.BlobContainerMetadata) every { BlobAccess.BlobContainerMetadata.build(any(), any()) } returns getBlobContainerMetadata() mockkConstructor(DatabaseLookupTableAccess::class) + every { mockSubmissionTableService.insertSubmission(any()) } returns Unit + mockkObject(Metadata) + every { Metadata.getInstance() } returns UnitTestUtils.simpleMetadata } @AfterEach @@ -239,6 +285,235 @@ class FHIRConverterIntegrationTests { } } + @Test + fun `should add a message to the poison queue if the sender is not found and not do any work`() { + val receivedReportContents = + listOf(cleanHL7Record, invalidHL7Record, unparseableHL7Record, badEncodingHL7Record) + .joinToString("\n") + val receiveBlobUrl = BlobAccess.uploadBlob( + "receive/happy-path.hl7", + receivedReportContents.toByteArray(), + getBlobContainerMetadata() + ) + + val receiveReport = Report( + hl7SenderWithNoTransform.format, + listOf( + ClientSource( + organization = hl7SenderWithNoTransform.organizationName, + client = hl7SenderWithNoTransform.name + ) + ), + 1, + metadata = UnitTestUtils.simpleMetadata, + nextAction = TaskAction.convert, + topic = hl7SenderWithNoTransform.topic, + id = UUID.randomUUID(), + bodyURL = receiveBlobUrl + ) + val missingSender = UniversalPipelineSender( + "foo", + "phd", + MimeFormat.HL7, + CustomerStatus.ACTIVE, + topic = Topic.FULL_ELR, + ) + val queueMessage = + generateFHIRConvertSubmissionQueueMessage(receiveReport, receivedReportContents, missingSender) + val fhirFunctions = createFHIRFunctionsInstance() + + fhirFunctions.process(queueMessage, 1, createFHIRConverter(), ActionHistory(TaskAction.convert)) + ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> + assertThrows { + ReportStreamTestDatabaseContainer.testDatabaseAccess.fetchReportFile(receiveReport.id, txn = txn) + } + val processedReports = fetchChildReports( + receiveReport, txn, 0, 0, parentIsRoot = true + ) + assertThat(processedReports).hasSize(0) + verify(exactly = 1) { + QueueAccess.sendMessage( + "${QueueMessage.elrSubmissionConvertQueueName}-poison", + queueMessage + + ) + } + } + } + + @Test + fun `should successfully process a FhirConvertSubmissionQueueMessage`() { + val receivedReportContents = + listOf(cleanHL7Record, invalidHL7Record, unparseableHL7Record, badEncodingHL7Record) + .joinToString("\n") + val receiveBlobUrl = BlobAccess.uploadBlob( + "receive/happy-path.hl7", + receivedReportContents.toByteArray(), + getBlobContainerMetadata() + ) + + val receiveReport = Report( + hl7SenderWithNoTransform.format, + listOf( + ClientSource( + organization = hl7SenderWithNoTransform.organizationName, + client = hl7SenderWithNoTransform.name + ) + ), + 1, + metadata = UnitTestUtils.simpleMetadata, + nextAction = TaskAction.convert, + topic = hl7SenderWithNoTransform.topic, + id = UUID.randomUUID(), + bodyURL = receiveBlobUrl + ) + val queueMessage = + generateFHIRConvertSubmissionQueueMessage(receiveReport, receivedReportContents, hl7SenderWithNoTransform) + val fhirFunctions = createFHIRFunctionsInstance() + + fhirFunctions.process(queueMessage, 1, createFHIRConverter(), ActionHistory(TaskAction.convert)) + + ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> + val externalReportRecord = + ReportStreamTestDatabaseContainer.testDatabaseAccess.fetchReportFile(receiveReport.id, txn = txn) + assertThat(externalReportRecord.sendingOrg).isEqualTo(hl7SenderWithNoTransform.organizationName) + assertThat(externalReportRecord.sendingOrgClient).isEqualTo(hl7SenderWithNoTransform.name) + val (routedReports, unroutedReports) = fetchChildReports( + receiveReport, txn, 4, 4, parentIsRoot = true + ).partition { it.nextAction != TaskAction.none } + assertThat(routedReports).hasSize(2) + routedReports.forEach { + assertThat(it.nextAction).isEqualTo(TaskAction.destination_filter) + assertThat(it.receivingOrg).isEqualTo(null) + assertThat(it.receivingOrgSvc).isEqualTo(null) + assertThat(it.schemaName).isEqualTo("None") + assertThat(it.schemaTopic).isEqualTo(Topic.FULL_ELR) + assertThat(it.bodyFormat).isEqualTo("FHIR") + } + assertThat(unroutedReports).hasSize(2) + unroutedReports.forEach { + assertThat(it.nextAction).isEqualTo(TaskAction.none) + assertThat(it.receivingOrg).isEqualTo(null) + assertThat(it.receivingOrgSvc).isEqualTo(null) + assertThat(it.schemaName).isEqualTo("None") + assertThat(it.schemaTopic).isEqualTo(Topic.FULL_ELR) + assertThat(it.bodyFormat).isEqualTo("FHIR") + } + // Verify that the expected FHIR bundles were uploaded + val reportAndBundles = + routedReports.map { + Pair( + it, + BlobAccess.downloadBlobAsByteArray(it.bodyUrl, getBlobContainerMetadata()) + ) + } + + assertThat(reportAndBundles).transform { pairs -> pairs.map { it.second } }.each { + it.matchesPredicate { bytes -> + val invalidHL7Result = CompareData().compare( + cleanHL7RecordConverted.byteInputStream(), + bytes.inputStream(), + MimeFormat.FHIR, + null + ) + invalidHL7Result.passed + + val cleanHL7Result = CompareData().compare( + invalidHL7RecordConverted.byteInputStream(), + bytes.inputStream(), + MimeFormat.FHIR, + null + ) + invalidHL7Result.passed || cleanHL7Result.passed + } + } + + val expectedQueueMessages = reportAndBundles.map { (report, fhirBundle) -> + FhirDestinationFilterQueueMessage( + report.reportId, + report.bodyUrl, + BlobUtils.digestToString(BlobUtils.sha256Digest(fhirBundle)), + hl7SenderWithNoTransform.fullName, + hl7SenderWithNoTransform.topic + ) + }.map { it.serialize() } + + verify(exactly = 2) { + QueueAccess.sendMessage( + QueueMessage.elrDestinationFilterQueueName, + match { expectedQueueMessages.contains(it) } + ) + } + + val actionLogs = DSL.using(txn).select(Tables.ACTION_LOG.asterisk()).from(Tables.ACTION_LOG) + .where(Tables.ACTION_LOG.REPORT_ID.eq(receiveReport.id)) + .and(Tables.ACTION_LOG.TYPE.eq(ActionLogType.error)) + .fetchInto( + DetailedActionLog::class.java + ) + + assertThat(actionLogs).hasSize(2) + @Suppress("ktlint:standard:max-line-length") + assertThat(actionLogs).transform { logs -> logs.map { it.detail.message } } + .containsOnly( + "Item 3 in the report was not parseable. Reason: exception while parsing HL7: Determine encoding for message. The following is the first 50 chars of the message for reference, although this may not be where the issue is: MSH^~\\&|CDC PRIME - Atlanta, Georgia (Dekalb)^2.16", + "Item 4 in the report was not parseable. Reason: exception while parsing HL7: Invalid or incomplete encoding characters - MSH-2 is ^~\\&#!" + ) + assertThat(actionLogs).transform { + it.map { log -> + log.trackingId + } + }.containsOnly( + "", + "" + ) + + assertThat(azureEventService.reportStreamEvents[ReportStreamEventName.ITEM_ACCEPTED]!!).hasSize(2) + val event = + azureEventService + .reportStreamEvents[ReportStreamEventName.ITEM_ACCEPTED]!!.last() as ReportStreamItemEvent + assertThat(event.reportEventData).isEqualToIgnoringGivenProperties( + ReportEventData( + routedReports[1].reportId, + receiveReport.id, + listOf(receiveReport.id), + Topic.FULL_ELR, + routedReports[1].bodyUrl, + TaskAction.convert, + OffsetDateTime.now(), + Version.commitId + ), + ReportEventData::timestamp + ) + assertThat(event.itemEventData).isEqualToIgnoringGivenProperties( + ItemEventData( + 1, + 2, + 2, + "371784", + "phd.hl7-elr-no-transform" + ) + ) + assertThat(event.params).isEqualTo( + mapOf( + ReportStreamEventProperties.ITEM_FORMAT to MimeFormat.HL7, + ReportStreamEventProperties.BUNDLE_DIGEST to BundleDigestLabResult( + observationSummaries = AzureEventUtils + .getObservationSummaries( + FhirTranscoder.decode( + reportAndBundles[1].second.toString(Charset.defaultCharset()) + ) + ), + patientState = listOf("TX"), + orderingFacilityState = listOf("FL"), + performerState = emptyList(), + eventType = "ORU^R01^ORU_R01" + ) + ) + ) + } + } + @Test fun `should successfully convert HL7 messages`() { val receivedReportContents = @@ -251,7 +526,8 @@ class FHIRConverterIntegrationTests { ) val receiveReport = setupConvertStep(MimeFormat.HL7, hl7SenderWithNoTransform, receiveBlobUrl, 4) - val queueMessage = generateQueueMessage(receiveReport, receivedReportContents, hl7SenderWithNoTransform) + val queueMessage = + generateFHIRConvertQueueMessage(receiveReport, receivedReportContents, hl7SenderWithNoTransform) val fhirFunctions = createFHIRFunctionsInstance() fhirFunctions.process(queueMessage, 1, createFHIRConverter(), ActionHistory(TaskAction.convert)) @@ -362,7 +638,7 @@ class FHIRConverterIntegrationTests { OffsetDateTime.now(), Version.commitId ), - ReportEventData::timestamp + ReportEventData::timestamp ) assertThat(event.itemEventData).isEqualToIgnoringGivenProperties( ItemEventData( @@ -432,7 +708,7 @@ class FHIRConverterIntegrationTests { MimeFormat.FHIR, fhirSenderWithNoTransform, receiveBlobUrl, 4 ) - val queueMessage = generateQueueMessage( + val queueMessage = generateFHIRConvertQueueMessage( receiveReport, receivedReportContents, fhirSenderWithNoTransform ) @@ -535,7 +811,7 @@ class FHIRConverterIntegrationTests { OffsetDateTime.now(), Version.commitId ), - ReportEventData::timestamp + ReportEventData::timestamp ) assertThat(event.itemEventData).isEqualToIgnoringGivenProperties( ItemEventData( @@ -578,7 +854,7 @@ class FHIRConverterIntegrationTests { ) val receiveReport = setupConvertStep(MimeFormat.HL7, senderWithValidation, receiveBlobUrl, 2) - val queueMessage = generateQueueMessage(receiveReport, receivedReportContents, senderWithValidation) + val queueMessage = generateFHIRConvertQueueMessage(receiveReport, receivedReportContents, senderWithValidation) val fhirFunctions = createFHIRFunctionsInstance() fhirFunctions.process(queueMessage, 1, createFHIRConverter(), ActionHistory(TaskAction.convert)) @@ -671,7 +947,7 @@ class FHIRConverterIntegrationTests { OffsetDateTime.now(), Version.commitId ), - ReportEventData::timestamp + ReportEventData::timestamp ) assertThat(event.itemEventData).isEqualToIgnoringGivenProperties( ItemEventData( @@ -689,7 +965,7 @@ class FHIRConverterIntegrationTests { ReportStreamEventProperties.VALIDATION_PROFILE to Topic.MARS_OTC_ELR.validator.validatorProfileName, @Suppress("ktlint:standard:max-line-length") ReportStreamEventProperties.PROCESSING_ERROR - to "Item 2 in the report was not valid. Reason: HL7 was not valid at MSH[1]-21[1].3 for validator: RADx MARS" + to "Item 2 in the report was not valid. Reason: HL7 was not valid at MSH[1]-21[1].3 for validator: RADx MARS" ) ) } @@ -706,7 +982,7 @@ class FHIRConverterIntegrationTests { ) val receiveReport = setupConvertStep(MimeFormat.HL7, hl7Sender, receiveBlobUrl, 2) - val queueMessage = generateQueueMessage(receiveReport, receivedReportContents, hl7Sender) + val queueMessage = generateFHIRConvertQueueMessage(receiveReport, receivedReportContents, hl7Sender) val fhirFunctions = createFHIRFunctionsInstance() fhirFunctions.process(queueMessage, 1, createFHIRConverter(), ActionHistory(TaskAction.convert)) @@ -787,7 +1063,7 @@ class FHIRConverterIntegrationTests { ) val receiveReport = setupConvertStep(MimeFormat.HL7, hl7Sender, receiveBlobUrl, 1) - val queueMessage = generateQueueMessage(receiveReport, receivedReportContents, hl7Sender) + val queueMessage = generateFHIRConvertQueueMessage(receiveReport, receivedReportContents, hl7Sender) val fhirFunctions = createFHIRFunctionsInstance() fhirFunctions.process(queueMessage, 1, createFHIRConverter(), ActionHistory(TaskAction.convert)) @@ -816,7 +1092,7 @@ class FHIRConverterIntegrationTests { ) val receiveReport = setupConvertStep(MimeFormat.HL7, hl7Sender, receiveBlobUrl, 1) - val queueMessage = generateQueueMessage(receiveReport, receivedReportContents, hl7Sender) + val queueMessage = generateFHIRConvertQueueMessage(receiveReport, receivedReportContents, hl7Sender) val fhirFunctions = createFHIRFunctionsInstance() fhirFunctions.process(queueMessage, 1, createFHIRConverter(), ActionHistory(TaskAction.convert)) @@ -845,7 +1121,7 @@ class FHIRConverterIntegrationTests { ) val receiveReport = setupConvertStep(MimeFormat.HL7, hl7Sender, receiveBlobUrl, 1) - val queueMessage = generateQueueMessage(receiveReport, receivedReportContents, hl7Sender) + val queueMessage = generateFHIRConvertQueueMessage(receiveReport, receivedReportContents, hl7Sender) val fhirFunctions = createFHIRFunctionsInstance() fhirFunctions.process(queueMessage, 1, createFHIRConverter(), ActionHistory(TaskAction.convert)) diff --git a/prime-router/src/test/kotlin/fhirengine/azure/FHIRDestinationFilterIntegrationTests.kt b/prime-router/src/test/kotlin/fhirengine/azure/FHIRDestinationFilterIntegrationTests.kt index 8e2f420d888..2c8f314fbe6 100644 --- a/prime-router/src/test/kotlin/fhirengine/azure/FHIRDestinationFilterIntegrationTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/azure/FHIRDestinationFilterIntegrationTests.kt @@ -33,6 +33,7 @@ import gov.cdc.prime.router.azure.observability.event.ItemEventData import gov.cdc.prime.router.azure.observability.event.ReportEventData import gov.cdc.prime.router.azure.observability.event.ReportStreamEventName import gov.cdc.prime.router.azure.observability.event.ReportStreamEventProperties +import gov.cdc.prime.router.azure.observability.event.ReportStreamEventService import gov.cdc.prime.router.azure.observability.event.ReportStreamItemEvent import gov.cdc.prime.router.common.TestcontainersUtils import gov.cdc.prime.router.common.UniversalPipelineTestUtils @@ -120,7 +121,14 @@ class FHIRDestinationFilterIntegrationTests : Logging { settings, db = ReportStreamTestDatabaseContainer.testDatabaseAccess, reportService = ReportService(ReportGraph(ReportStreamTestDatabaseContainer.testDatabaseAccess)), - azureEventService = azureEventService + azureEventService = azureEventService, + reportStreamEventService = ReportStreamEventService( + ReportStreamTestDatabaseContainer.testDatabaseAccess, azureEventService, + ReportService( + ReportGraph(ReportStreamTestDatabaseContainer.testDatabaseAccess), + ReportStreamTestDatabaseContainer.testDatabaseAccess + ) + ) ) } diff --git a/prime-router/src/test/kotlin/fhirengine/azure/FHIRReceiverFilterIntegrationTests.kt b/prime-router/src/test/kotlin/fhirengine/azure/FHIRReceiverFilterIntegrationTests.kt index eae5b63fe2c..4acfecbf826 100644 --- a/prime-router/src/test/kotlin/fhirengine/azure/FHIRReceiverFilterIntegrationTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/azure/FHIRReceiverFilterIntegrationTests.kt @@ -39,6 +39,7 @@ import gov.cdc.prime.router.azure.observability.event.ItemEventData import gov.cdc.prime.router.azure.observability.event.ReportEventData import gov.cdc.prime.router.azure.observability.event.ReportStreamEventName import gov.cdc.prime.router.azure.observability.event.ReportStreamEventProperties +import gov.cdc.prime.router.azure.observability.event.ReportStreamEventService import gov.cdc.prime.router.azure.observability.event.ReportStreamItemEvent import gov.cdc.prime.router.common.TestcontainersUtils import gov.cdc.prime.router.common.UniversalPipelineTestUtils @@ -214,7 +215,14 @@ class FHIRReceiverFilterIntegrationTests : Logging { settings, db = ReportStreamTestDatabaseContainer.testDatabaseAccess, reportService = ReportService(ReportGraph(ReportStreamTestDatabaseContainer.testDatabaseAccess)), - azureEventService = azureEventService + azureEventService = azureEventService, + reportStreamEventService = ReportStreamEventService( + ReportStreamTestDatabaseContainer.testDatabaseAccess, azureEventService, + ReportService( + ReportGraph(ReportStreamTestDatabaseContainer.testDatabaseAccess), + ReportStreamTestDatabaseContainer.testDatabaseAccess + ) + ) ) } diff --git a/prime-router/src/test/kotlin/fhirengine/azure/FHIRReceiverIntegrationTests.kt b/prime-router/src/test/kotlin/fhirengine/azure/FHIRReceiverIntegrationTests.kt deleted file mode 100644 index 525e53905d2..00000000000 --- a/prime-router/src/test/kotlin/fhirengine/azure/FHIRReceiverIntegrationTests.kt +++ /dev/null @@ -1,881 +0,0 @@ -package gov.cdc.prime.router.fhirengine.azure - -import assertk.assertThat -import assertk.assertions.hasSize -import assertk.assertions.isEmpty -import assertk.assertions.isEqualTo -import assertk.assertions.isEqualToIgnoringGivenProperties -import assertk.assertions.isNull -import gov.cdc.prime.reportstream.shared.BlobUtils -import gov.cdc.prime.router.FileSettings -import gov.cdc.prime.router.MimeFormat -import gov.cdc.prime.router.Sender -import gov.cdc.prime.router.Topic -import gov.cdc.prime.router.azure.ActionHistory -import gov.cdc.prime.router.azure.BlobAccess -import gov.cdc.prime.router.azure.QueueAccess -import gov.cdc.prime.router.azure.SubmissionTableService -import gov.cdc.prime.router.azure.TableAccess -import gov.cdc.prime.router.azure.WorkflowEngine -import gov.cdc.prime.router.azure.db.Tables -import gov.cdc.prime.router.azure.db.enums.ActionLogType -import gov.cdc.prime.router.azure.db.enums.TaskAction -import gov.cdc.prime.router.azure.db.tables.pojos.ReportFile -import gov.cdc.prime.router.azure.observability.event.InMemoryAzureEventService -import gov.cdc.prime.router.azure.observability.event.ReportEventData -import gov.cdc.prime.router.azure.observability.event.ReportStreamEventName -import gov.cdc.prime.router.azure.observability.event.ReportStreamEventProperties -import gov.cdc.prime.router.azure.observability.event.ReportStreamReportEvent -import gov.cdc.prime.router.common.TestcontainersUtils -import gov.cdc.prime.router.common.UniversalPipelineTestUtils.csvSenderWithNoTransform -import gov.cdc.prime.router.common.UniversalPipelineTestUtils.fhirSenderWithNoTransform -import gov.cdc.prime.router.common.UniversalPipelineTestUtils.fhirSenderWithNoTransformInactive -import gov.cdc.prime.router.common.UniversalPipelineTestUtils.hl7SenderWithNoTransform -import gov.cdc.prime.router.common.UniversalPipelineTestUtils.universalPipelineOrganization -import gov.cdc.prime.router.common.cleanHL7Record -import gov.cdc.prime.router.common.invalidMalformedFHIRRecord -import gov.cdc.prime.router.common.unparseableHL7Record -import gov.cdc.prime.router.common.validFHIRRecord1 -import gov.cdc.prime.router.db.ReportStreamTestDatabaseContainer -import gov.cdc.prime.router.db.ReportStreamTestDatabaseSetupExtension -import gov.cdc.prime.router.fhirengine.engine.FHIRReceiver -import gov.cdc.prime.router.history.DetailedActionLog -import gov.cdc.prime.router.history.DetailedReport -import gov.cdc.prime.router.unittest.UnitTestUtils -import gov.cdc.prime.router.version.Version -import io.mockk.clearAllMocks -import io.mockk.every -import io.mockk.mockkObject -import io.mockk.unmockkAll -import io.mockk.verify -import org.jooq.impl.DSL -import org.junit.jupiter.api.AfterEach -import org.junit.jupiter.api.BeforeEach -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.extension.ExtendWith -import org.testcontainers.junit.jupiter.Container -import org.testcontainers.junit.jupiter.Testcontainers -import java.time.OffsetDateTime -import java.util.UUID -import kotlin.test.assertNotNull - -@Testcontainers -@ExtendWith(ReportStreamTestDatabaseSetupExtension::class) -class FHIRReceiverIntegrationTests { - - @Container - val azuriteContainer = TestcontainersUtils.createAzuriteContainer( - customImageName = "azurite_fhirreceiverintegration", - customEnv = mapOf( - "AZURITE_ACCOUNTS" to "devstoreaccount1:keydevstoreaccount1" - ) - ) - - private val azureEventService = InMemoryAzureEventService() - private lateinit var submissionTableService: SubmissionTableService - - private fun createFHIRFunctionsInstance(): FHIRFunctions { - val settings = FileSettings().loadOrganizations(universalPipelineOrganization) - val metadata = UnitTestUtils.simpleMetadata - val workflowEngine = WorkflowEngine - .Builder() - .metadata(metadata) - .settingsProvider(settings) - .databaseAccess(ReportStreamTestDatabaseContainer.testDatabaseAccess) - .build() - return FHIRFunctions(workflowEngine, databaseAccess = ReportStreamTestDatabaseContainer.testDatabaseAccess) - } - - private fun createFHIRReceiver(): FHIRReceiver { - val settings = FileSettings().loadOrganizations(universalPipelineOrganization) - val metadata = UnitTestUtils.simpleMetadata - return FHIRReceiver( - metadata, - settings, - ReportStreamTestDatabaseContainer.testDatabaseAccess, - azureEventService = azureEventService, - submissionTableService = submissionTableService - ) - } - - private fun generateReceiveQueueMessage( - reportId: String, - blobURL: String, - blobContents: String, - sender: Sender, - headers: Map, - ): String { - val headersStringMap = headers.entries.joinToString(separator = ",\n") { (key, value) -> - """"$key": "$value"""" - } - val headersString = "[\"java.util.LinkedHashMap\",{$headersStringMap}]" - - return """{"type":"receive-fhir","blobURL":"$blobURL", - "digest":"${BlobUtils.digestToString(BlobUtils.sha256Digest(blobContents.toByteArray()))}", - "blobSubFolderName":"${sender.fullName}","reportId":"$reportId","headers":$headersString} - """.trimIndent() - } - - @BeforeEach - fun beforeEach() { - clearAllMocks() - mockkObject(QueueAccess) - every { QueueAccess.sendMessage(any(), any()) } returns "" - mockkObject(BlobAccess) - every { BlobAccess getProperty "defaultBlobMetadata" } returns getBlobContainerMetadata() - mockkObject(BlobAccess.BlobContainerMetadata) - every { BlobAccess.BlobContainerMetadata.build(any(), any()) } returns getBlobContainerMetadata() - - mockkObject(TableAccess) - every { TableAccess.getConnectionString() } returns getConnString() - - submissionTableService = SubmissionTableService.getInstance() - submissionTableService.reset() - } - - @AfterEach - fun afterEach() { - unmockkAll() - } - - private fun getBlobContainerMetadata(): BlobAccess.BlobContainerMetadata = BlobAccess.BlobContainerMetadata( - "container1", - getConnString() - ) - - private fun getConnString(): String { - @Suppress("ktlint:standard:max-line-length") - return """DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=keydevstoreaccount1;BlobEndpoint=http://${azuriteContainer.host}:${azuriteContainer.getMappedPort(10000)}/devstoreaccount1;QueueEndpoint=http://${azuriteContainer.host}:${azuriteContainer.getMappedPort(10001)}/devstoreaccount1;TableEndpoint=http://${azuriteContainer.host}:${azuriteContainer.getMappedPort(10002)}/devstoreaccount1;""" - } - - @Test - fun `should handle inactive sender gracefully`() { - val receivedReportContents = - listOf(validFHIRRecord1) - .joinToString("\n") - val receiveBlobUrl = BlobAccess.uploadBlob( - "receive/happy-path.fhir", - receivedReportContents.toByteArray(), - getBlobContainerMetadata() - ) - - val reportId = UUID.randomUUID() - val headers = mapOf( - "content-type" to "application/fhir+ndjson;test", - "x-azure-clientip" to "0.0.0.0", - "payloadname" to "test_message", - "client_id" to fhirSenderWithNoTransformInactive.fullName, - "content-length" to "100" - ) - - val receiveQueueMessage = generateReceiveQueueMessage( - reportId.toString(), - receiveBlobUrl, - receivedReportContents, - fhirSenderWithNoTransformInactive, - headers = headers - ) - - val fhirFunctions = createFHIRFunctionsInstance() - - fhirFunctions.process( - receiveQueueMessage, - 1, - createFHIRReceiver(), - ActionHistory(TaskAction.receive) - ) - - ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> - - val actionLogs = DSL.using(txn).select(Tables.ACTION_LOG.asterisk()) - .from(Tables.ACTION_LOG) - .where(Tables.ACTION_LOG.REPORT_ID.eq(reportId)) - .and(Tables.ACTION_LOG.TYPE.eq(ActionLogType.error)) - .fetchInto(DetailedActionLog::class.java) - - assertThat(actionLogs.first()).transform { it.detail.message } - .isEqualTo("Sender has customer status INACTIVE: phd.fhir-elr-no-transform-inactive") - - val reportFile = DSL.using(txn).select(Tables.REPORT_FILE.asterisk()) - .from(Tables.REPORT_FILE) - .where(Tables.REPORT_FILE.REPORT_ID.eq(reportId)) - .fetchInto(DetailedReport::class.java) - - assertThat(actionLogs.count()).isEqualTo(1) - assertThat(reportFile.count()).isEqualTo(1) - } - - verify(exactly = 0) { - QueueAccess.sendMessage(any(), any()) - } - - val tableRow = submissionTableService.getSubmission(reportId.toString(), "Accepted") - - assertNotNull(tableRow) - assertThat(tableRow.detail).isEqualTo( - "[Sender has customer status INACTIVE: phd.fhir-elr-no-transform-inactive]" - ) - assertThat(tableRow.bodyURL).isEqualTo(receiveBlobUrl) - - assertThat(azureEventService.reportStreamEvents[ReportStreamEventName.REPORT_RECEIVED]!!).hasSize(1) - val event = - azureEventService - .reportStreamEvents[ReportStreamEventName.REPORT_RECEIVED]!!.last() as ReportStreamReportEvent - assertThat(event.reportEventData).isEqualToIgnoringGivenProperties( - ReportEventData( - reportId, - null, - emptyList(), - Topic.FULL_ELR, - receiveBlobUrl, - TaskAction.receive, - OffsetDateTime.now(), - Version.commitId - ), - ReportEventData::timestamp - ) - assertThat(event.params).isEqualTo( - mapOf( - ReportStreamEventProperties.ITEM_FORMAT to MimeFormat.FHIR, - ReportStreamEventProperties.SENDER_NAME to fhirSenderWithNoTransformInactive.fullName, - ReportStreamEventProperties.FILE_LENGTH to headers["content-length"], - ReportStreamEventProperties.SENDER_IP to headers["x-azure-clientip"], - ReportStreamEventProperties.REQUEST_PARAMETERS to headers.toString() - ) - ) - - assertThat(azureEventService.reportStreamEvents[ReportStreamEventName.REPORT_NOT_PROCESSABLE]!!).hasSize(1) - val notProcessableEvent = - azureEventService - .reportStreamEvents[ReportStreamEventName.REPORT_NOT_PROCESSABLE]!!.last() as ReportStreamReportEvent - assertThat(notProcessableEvent.reportEventData).isEqualToIgnoringGivenProperties( - ReportEventData( - reportId, - null, - emptyList(), - Topic.FULL_ELR, - receiveBlobUrl, - TaskAction.receive, - OffsetDateTime.now(), - Version.commitId - ), - ReportEventData::timestamp - ) - assertThat(notProcessableEvent.params).isEqualTo( - mapOf( - ReportStreamEventProperties.PROCESSING_ERROR to - "Submitted report was either empty or could not be parsed.", - ReportStreamEventProperties.REQUEST_PARAMETERS to headers.toString() - ) - ) - } - - @Test - fun `should handle sender not found gracefully`() { - val submissionMessageContents = validFHIRRecord1 - val submissionBlobUrl = "http://anyblob.com" - - val reportId = UUID.randomUUID() - val headers = mapOf( - "content-type" to "application/fhir+ndjson;test", - "x-azure-clientip" to "0.0.0.0", - "payloadname" to "test_message", - "client_id" to "unknown_sender", - "content-length" to "100" - ) - - val receiveQueueMessage = generateReceiveQueueMessage( - reportId.toString(), - submissionBlobUrl, - submissionMessageContents, - fhirSenderWithNoTransformInactive, - headers = headers - ) - - val fhirFunctions = createFHIRFunctionsInstance() - - fhirFunctions.process( - receiveQueueMessage, - 1, - createFHIRReceiver(), - ActionHistory(TaskAction.receive) - ) - - ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> - val actionLogs = DSL.using(txn).select(Tables.ACTION_LOG.asterisk()) - .from(Tables.ACTION_LOG) - .where(Tables.ACTION_LOG.TYPE.eq(ActionLogType.error)) - .fetchInto(DetailedActionLog::class.java) - - assertThat(actionLogs).isEmpty() - - val reportFile = DSL.using(txn).select(Tables.REPORT_FILE.asterisk()) - .from(Tables.REPORT_FILE) - .where(Tables.REPORT_FILE.REPORT_ID.eq(reportId)) - .fetchInto(DetailedReport::class.java) - - assertThat(reportFile).isEmpty() - } - - verify(exactly = 0) { - QueueAccess.sendMessage(any(), any()) - } - - val tableRow = submissionTableService.getSubmission( - reportId.toString(), - "Rejected" - ) - - assertNotNull(tableRow) - assertThat(tableRow.detail).isEqualTo("Sender not found matching client_id: unknown_sender") - assertThat(tableRow.bodyURL).isEqualTo(submissionBlobUrl) - - assertThat(azureEventService.reportStreamEvents[ReportStreamEventName.REPORT_NOT_RECEIVABLE]!!).hasSize(1) - val event = - azureEventService - .reportStreamEvents[ReportStreamEventName.REPORT_NOT_RECEIVABLE]!!.last() as ReportStreamReportEvent - assertThat(event.reportEventData).isEqualToIgnoringGivenProperties( - ReportEventData( - reportId, - null, - emptyList(), - null, - submissionBlobUrl, - TaskAction.receive, - OffsetDateTime.now(), - Version.commitId - ), - ReportEventData::timestamp - ) - assertThat(event.params).isEqualTo( - mapOf( - ReportStreamEventProperties.PROCESSING_ERROR to - "Sender is not found in matching client id: unknown_sender.", - ReportStreamEventProperties.REQUEST_PARAMETERS to headers.toString() - ) - ) - } - - @Test - fun `should successfully process valid FHIR message`() { - val receivedReportContents = - listOf(validFHIRRecord1) - .joinToString("\n") - val receiveBlobUrl = BlobAccess.uploadBlob( - "receive/happy-path.fhir", - receivedReportContents.toByteArray(), - getBlobContainerMetadata() - ) - - val reportId = UUID.randomUUID() - val headers = mapOf( - "content-type" to "application/fhir+ndjson;test", - "x-azure-clientip" to "0.0.0.0", - "payloadname" to "test_message", - "client_id" to fhirSenderWithNoTransform.fullName, - "content-length" to "100" - ) - - val receiveQueueMessage = generateReceiveQueueMessage( - reportId.toString(), - receiveBlobUrl, - receivedReportContents, - fhirSenderWithNoTransform, - headers = headers - ) - - val fhirFunctions = createFHIRFunctionsInstance() - - fhirFunctions.process( - receiveQueueMessage, - 1, - createFHIRReceiver(), - ActionHistory(TaskAction.receive) - ) - - ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> - val actionLogs = DSL.using(txn).select(Tables.ACTION_LOG.asterisk()) - .from(Tables.ACTION_LOG) - .where(Tables.ACTION_LOG.REPORT_ID.eq(reportId)) - .and(Tables.ACTION_LOG.TYPE.eq(ActionLogType.error)) - .fetchInto(DetailedActionLog::class.java) - - assertThat(actionLogs).isEmpty() - - val reportFile = DSL.using(txn).select(Tables.REPORT_FILE.asterisk()) - .from(Tables.REPORT_FILE) - .where(Tables.REPORT_FILE.REPORT_ID.eq(reportId)) - .fetchInto(ReportFile::class.java) - - assertThat(reportFile).hasSize(1) - reportFile.first().apply { - assertThat(nextAction).isEqualTo(TaskAction.convert) - assertThat(receivingOrg).isEqualTo(null) - assertThat(receivingOrgSvc).isEqualTo(null) - assertThat(schemaName).isEqualTo("None") - assertThat(schemaTopic).isEqualTo(Topic.FULL_ELR) - assertThat(bodyFormat).isEqualTo("FHIR") - assertThat(sendingOrg).isEqualTo("phd") - assertThat(sendingOrgClient).isEqualTo("fhir-elr-no-transform") - } - } - - verify(exactly = 1) { - QueueAccess.sendMessage(any(), any()) - } - - val tableRow = submissionTableService.getSubmission( - reportId.toString(), - "Accepted" - ) - - assertNotNull(tableRow) - assertThat(tableRow.bodyURL).isEqualTo(receiveBlobUrl) - assertThat(tableRow.detail).isNull() - - assertThat(azureEventService.reportStreamEvents[ReportStreamEventName.REPORT_RECEIVED]!!).hasSize(1) - val event = - azureEventService - .reportStreamEvents[ReportStreamEventName.REPORT_RECEIVED]!!.last() as ReportStreamReportEvent - assertThat(event.reportEventData).isEqualToIgnoringGivenProperties( - ReportEventData( - reportId, - null, - emptyList(), - Topic.FULL_ELR, - receiveBlobUrl, - TaskAction.receive, - OffsetDateTime.now(), - Version.commitId - ), - ReportEventData::timestamp - ) - assertThat(event.params).isEqualTo( - mapOf( - ReportStreamEventProperties.ITEM_FORMAT to MimeFormat.FHIR, - ReportStreamEventProperties.SENDER_NAME to fhirSenderWithNoTransform.fullName, - ReportStreamEventProperties.FILE_LENGTH to headers["content-length"], - ReportStreamEventProperties.SENDER_IP to headers["x-azure-clientip"], - ReportStreamEventProperties.REQUEST_PARAMETERS to headers.toString() - ) - ) - } - - @Test - fun `should successfully process valid HL7 message`() { - val receivedReportContents = - listOf(cleanHL7Record) - .joinToString("\n") - val receiveBlobUrl = BlobAccess.uploadBlob( - "receive/happy-path.hl7", - receivedReportContents.toByteArray(), - getBlobContainerMetadata() - ) - - val reportId = UUID.randomUUID() - val headers = mapOf( - "content-type" to "application/hl7-v2;test", - "x-azure-clientip" to "0.0.0.0", - "payloadname" to "test_message", - "client_id" to hl7SenderWithNoTransform.fullName, - "content-length" to "100" - ) - val receiveQueueMessage = generateReceiveQueueMessage( - reportId.toString(), - receiveBlobUrl, - receivedReportContents, - hl7SenderWithNoTransform, - headers - ) - - val fhirFunctions = createFHIRFunctionsInstance() - - fhirFunctions.process( - receiveQueueMessage, - 1, - createFHIRReceiver(), - ActionHistory(TaskAction.receive) - ) - - ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> - val actionLogs = DSL.using(txn).select(Tables.ACTION_LOG.asterisk()) - .from(Tables.ACTION_LOG) - .where(Tables.ACTION_LOG.REPORT_ID.eq(reportId)) - .and(Tables.ACTION_LOG.TYPE.eq(ActionLogType.error)) - .fetchInto(DetailedActionLog::class.java) - - assertThat(actionLogs).isEmpty() - - val reportFile = DSL.using(txn).select(Tables.REPORT_FILE.asterisk()) - .from(Tables.REPORT_FILE) - .where(Tables.REPORT_FILE.REPORT_ID.eq(reportId)) - .fetchInto(ReportFile::class.java) - - assertThat(reportFile).hasSize(1) - reportFile.first().apply { - assertThat(nextAction).isEqualTo(TaskAction.convert) - assertThat(receivingOrg).isEqualTo(null) - assertThat(receivingOrgSvc).isEqualTo(null) - assertThat(schemaName).isEqualTo("None") - assertThat(schemaTopic).isEqualTo(Topic.FULL_ELR) - assertThat(bodyFormat).isEqualTo("HL7") - assertThat(sendingOrg).isEqualTo("phd") - assertThat(sendingOrgClient).isEqualTo("hl7-elr-no-transform") - } - } - - verify(exactly = 1) { - QueueAccess.sendMessage(any(), any()) - } - - val tableRow = submissionTableService.getSubmission( - reportId.toString(), - "Accepted" - ) - - assertNotNull(tableRow) - assertThat(tableRow.bodyURL).isEqualTo(receiveBlobUrl) - assertThat(tableRow.detail).isNull() - - assertThat(azureEventService.reportStreamEvents[ReportStreamEventName.REPORT_RECEIVED]!!).hasSize(1) - val event = - azureEventService - .reportStreamEvents[ReportStreamEventName.REPORT_RECEIVED]!!.last() as ReportStreamReportEvent - assertThat(event.reportEventData).isEqualToIgnoringGivenProperties( - ReportEventData( - reportId, - null, - emptyList(), - Topic.FULL_ELR, - receiveBlobUrl, - TaskAction.receive, - OffsetDateTime.now(), - Version.commitId - ), - ReportEventData::timestamp - ) - assertThat(event.params).isEqualTo( - mapOf( - ReportStreamEventProperties.ITEM_FORMAT to MimeFormat.HL7, - ReportStreamEventProperties.SENDER_NAME to hl7SenderWithNoTransform.fullName, - ReportStreamEventProperties.FILE_LENGTH to headers["content-length"], - ReportStreamEventProperties.SENDER_IP to headers["x-azure-clientip"], - ReportStreamEventProperties.REQUEST_PARAMETERS to headers.toString() - ) - ) - } - - @Test - fun `test process invalid FHIR message`() { - val invalidReceivedReportContents = - listOf(invalidMalformedFHIRRecord) - .joinToString("\n") - val receiveBlobUrl = BlobAccess.uploadBlob( - "receive/fail-path.fhir", - invalidReceivedReportContents.toByteArray(), - getBlobContainerMetadata() - ) - - val reportId = UUID.randomUUID() - val headers = mapOf( - "content-type" to "application/fhir+ndjson;test", - "x-azure-clientip" to "0.0.0.0", - "payloadname" to "test_message", - "client_id" to fhirSenderWithNoTransform.fullName, - "content-length" to "100" - ) - - val receiveQueueMessage = generateReceiveQueueMessage( - reportId.toString(), - receiveBlobUrl, - invalidReceivedReportContents, - fhirSenderWithNoTransform, - headers = headers - ) - - val fhirFunctions = createFHIRFunctionsInstance() - - fhirFunctions.process( - receiveQueueMessage, - 1, - createFHIRReceiver(), - ActionHistory(TaskAction.receive) - ) - - ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> - val actionLogs = DSL.using(txn).select(Tables.ACTION_LOG.asterisk()) - .from(Tables.ACTION_LOG) - .where(Tables.ACTION_LOG.REPORT_ID.eq(reportId)) - .and(Tables.ACTION_LOG.TYPE.eq(ActionLogType.error)) - .fetchInto(DetailedActionLog::class.java) - - assertThat(actionLogs.count()).isEqualTo(1) - assertThat(actionLogs.first().detail.message).isEqualTo("1: Unable to parse FHIR data.") - - val reportFile = DSL.using(txn).select(Tables.REPORT_FILE.asterisk()) - .from(Tables.REPORT_FILE) - .where(Tables.REPORT_FILE.REPORT_ID.eq(reportId)) - .fetchInto(ReportFile::class.java) - - assertThat(reportFile).hasSize(1) - reportFile.first().apply { - assertThat(nextAction).isEqualTo(TaskAction.convert) - assertThat(receivingOrg).isEqualTo(null) - assertThat(receivingOrgSvc).isEqualTo(null) - assertThat(schemaName).isEqualTo("None") - assertThat(schemaTopic).isEqualTo(Topic.FULL_ELR) - assertThat(bodyFormat).isEqualTo("FHIR") - assertThat(sendingOrg).isEqualTo("phd") - assertThat(sendingOrgClient).isEqualTo("fhir-elr-no-transform") - } - } - - verify(exactly = 0) { - QueueAccess.sendMessage(any(), any()) - } - - val tableRow = submissionTableService.getSubmission( - reportId.toString(), - "Accepted" - ) - - assertNotNull(tableRow) - assertThat(tableRow.bodyURL).isEqualTo(receiveBlobUrl) - assertThat(tableRow.detail).isEqualTo("[1: Unable to parse FHIR data.]") - - assertThat(azureEventService.reportStreamEvents[ReportStreamEventName.REPORT_RECEIVED]!!).hasSize(1) - val event = - azureEventService - .reportStreamEvents[ReportStreamEventName.REPORT_RECEIVED]!!.last() as ReportStreamReportEvent - assertThat(event.reportEventData).isEqualToIgnoringGivenProperties( - ReportEventData( - reportId, - null, - emptyList(), - Topic.FULL_ELR, - receiveBlobUrl, - TaskAction.receive, - OffsetDateTime.now(), - Version.commitId - ), - ReportEventData::timestamp - ) - assertThat(event.params).isEqualTo( - mapOf( - ReportStreamEventProperties.ITEM_FORMAT to MimeFormat.FHIR, - ReportStreamEventProperties.SENDER_NAME to fhirSenderWithNoTransform.fullName, - ReportStreamEventProperties.FILE_LENGTH to headers["content-length"], - ReportStreamEventProperties.SENDER_IP to headers["x-azure-clientip"], - ReportStreamEventProperties.REQUEST_PARAMETERS to headers.toString() - ) - ) - } - - @Test - fun `test process invalid HL7 message`() { - val invalidReceivedReportContents = - listOf(unparseableHL7Record) - .joinToString("\n") - val receiveBlobUrl = BlobAccess.uploadBlob( - "receive/fail-path.hl7", - invalidReceivedReportContents.toByteArray(), - getBlobContainerMetadata() - ) - - val reportId = UUID.randomUUID() - val headers = mapOf( - "content-type" to "application/hl7-v2;test", - "x-azure-clientip" to "0.0.0.0", - "payloadname" to "test_message", - "client_id" to hl7SenderWithNoTransform.fullName, - "content-length" to "100" - ) - - val receiveQueueMessage = generateReceiveQueueMessage( - reportId.toString(), - receiveBlobUrl, - invalidReceivedReportContents, - hl7SenderWithNoTransform, - headers = headers - ) - - val fhirFunctions = createFHIRFunctionsInstance() - - fhirFunctions.process( - receiveQueueMessage, - 1, - createFHIRReceiver(), - ActionHistory(TaskAction.receive) - ) - - ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> - val actionLogs = DSL.using(txn).select(Tables.ACTION_LOG.asterisk()) - .from(Tables.ACTION_LOG) - .where(Tables.ACTION_LOG.REPORT_ID.eq(reportId)) - .and(Tables.ACTION_LOG.TYPE.eq(ActionLogType.error)) - .fetchInto(DetailedActionLog::class.java) - - assertThat(actionLogs.count()).isEqualTo(2) - - val reportFile = DSL.using(txn).select(Tables.REPORT_FILE.asterisk()) - .from(Tables.REPORT_FILE) - .where(Tables.REPORT_FILE.REPORT_ID.eq(reportId)) - .fetchInto(ReportFile::class.java) - - assertThat(reportFile).hasSize(1) - reportFile.first().apply { - assertThat(nextAction).isEqualTo(TaskAction.convert) - assertThat(receivingOrg).isEqualTo(null) - assertThat(receivingOrgSvc).isEqualTo(null) - assertThat(schemaName).isEqualTo("None") - assertThat(schemaTopic).isEqualTo(Topic.FULL_ELR) - assertThat(bodyFormat).isEqualTo("HL7") - assertThat(sendingOrg).isEqualTo("phd") - assertThat(sendingOrgClient).isEqualTo("hl7-elr-no-transform") - } - } - - verify(exactly = 0) { - QueueAccess.sendMessage(any(), any()) - } - - val tableRow = submissionTableService.getSubmission( - reportId.toString(), - "Accepted" - ) - - assertNotNull(tableRow) - assertThat(tableRow.bodyURL).isEqualTo(receiveBlobUrl) - assertThat(tableRow.detail).isEqualTo("[Failed to parse message, Failed to parse message]") - - assertThat(azureEventService.reportStreamEvents[ReportStreamEventName.REPORT_RECEIVED]!!).hasSize(1) - val event = - azureEventService - .reportStreamEvents[ReportStreamEventName.REPORT_RECEIVED]!!.last() as ReportStreamReportEvent - assertThat(event.reportEventData).isEqualToIgnoringGivenProperties( - ReportEventData( - reportId, - null, - emptyList(), - Topic.FULL_ELR, - receiveBlobUrl, - TaskAction.receive, - OffsetDateTime.now(), - Version.commitId - ), - ReportEventData::timestamp - ) - assertThat(event.params).isEqualTo( - mapOf( - ReportStreamEventProperties.ITEM_FORMAT to MimeFormat.HL7, - ReportStreamEventProperties.SENDER_NAME to hl7SenderWithNoTransform.fullName, - ReportStreamEventProperties.FILE_LENGTH to headers["content-length"], - ReportStreamEventProperties.SENDER_IP to headers["x-azure-clientip"], - ReportStreamEventProperties.REQUEST_PARAMETERS to headers.toString() - ) - ) - } - - @Test - fun `test process CSV message`() { - val invalidReceivedReportContents = - listOf(unparseableHL7Record) - .joinToString("\n") - val receiveBlobUrl = BlobAccess.uploadBlob( - "receive/fail-path.hl7", - invalidReceivedReportContents.toByteArray(), - getBlobContainerMetadata() - ) - - val reportId = UUID.randomUUID() - val headers = mapOf( - "content-type" to "application/hl7-v2;test", - "x-azure-clientip" to "0.0.0.0", - "payloadname" to "test_message", - "client_id" to csvSenderWithNoTransform.fullName, - "content-length" to "100" - ) - - val receiveQueueMessage = generateReceiveQueueMessage( - reportId.toString(), - receiveBlobUrl, - invalidReceivedReportContents, - csvSenderWithNoTransform, - headers = headers - ) - - val fhirFunctions = createFHIRFunctionsInstance() - - var exception: Exception? = null - try { - fhirFunctions.process( - receiveQueueMessage, - 1, - createFHIRReceiver(), - ActionHistory(TaskAction.receive) - ) - } catch (e: Exception) { - exception = e - } - - assertThat(exception!!.javaClass.name).isEqualTo("java.lang.IllegalStateException") - - ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> - val actionLogs = DSL.using(txn).select(Tables.ACTION_LOG.asterisk()) - .from(Tables.ACTION_LOG) - .where(Tables.ACTION_LOG.REPORT_ID.eq(reportId)) - .and(Tables.ACTION_LOG.TYPE.eq(ActionLogType.error)) - .fetchInto(DetailedActionLog::class.java) - - assertThat(actionLogs.count()).isEqualTo(0) - - val reportFile = DSL.using(txn).select(Tables.REPORT_FILE.asterisk()) - .from(Tables.REPORT_FILE) - .where(Tables.REPORT_FILE.REPORT_ID.eq(reportId)) - .fetchInto(ReportFile::class.java) - - assertThat(reportFile).isEmpty() - } - - verify(exactly = 0) { - QueueAccess.sendMessage(any(), any()) - } - - val tableRow = submissionTableService.getSubmission( - reportId.toString(), - "Rejected" - ) - - assertNotNull(tableRow) - assertThat(tableRow.bodyURL).isEqualTo(receiveBlobUrl) - assertThat(tableRow.detail).isEqualTo("[Unsupported sender format: CSV]") - - assertThat(azureEventService.reportStreamEvents[ReportStreamEventName.REPORT_NOT_PROCESSABLE]!!).hasSize(1) - val event = - azureEventService - .reportStreamEvents[ReportStreamEventName.REPORT_NOT_PROCESSABLE]!!.last() as ReportStreamReportEvent - assertThat(event.reportEventData).isEqualToIgnoringGivenProperties( - ReportEventData( - reportId, - null, - emptyList(), - null, - receiveBlobUrl, - TaskAction.receive, - OffsetDateTime.now(), - Version.commitId - ), - ReportEventData::timestamp - ) - assertThat(event.params).isEqualTo( - mapOf( - ReportStreamEventProperties.ITEM_FORMAT to MimeFormat.CSV, - ReportStreamEventProperties.SENDER_NAME to csvSenderWithNoTransform.fullName, - ReportStreamEventProperties.FILE_LENGTH to headers["content-length"], - ReportStreamEventProperties.SENDER_IP to headers["x-azure-clientip"], - ReportStreamEventProperties.REQUEST_PARAMETERS to headers.toString(), - ReportStreamEventProperties.PROCESSING_ERROR to "Unsupported sender format CSV." - ) - ) - } -} \ No newline at end of file diff --git a/prime-router/src/test/kotlin/fhirengine/azure/FHIRTranslatorIntegrationTests.kt b/prime-router/src/test/kotlin/fhirengine/azure/FHIRTranslatorIntegrationTests.kt index c45b9fe3e28..df375e85ed3 100644 --- a/prime-router/src/test/kotlin/fhirengine/azure/FHIRTranslatorIntegrationTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/azure/FHIRTranslatorIntegrationTests.kt @@ -22,6 +22,7 @@ import gov.cdc.prime.router.azure.db.enums.TaskAction import gov.cdc.prime.router.azure.db.tables.Task import gov.cdc.prime.router.azure.observability.event.AzureEventService import gov.cdc.prime.router.azure.observability.event.InMemoryAzureEventService +import gov.cdc.prime.router.azure.observability.event.ReportStreamEventService import gov.cdc.prime.router.cli.tests.CompareData import gov.cdc.prime.router.common.TestcontainersUtils import gov.cdc.prime.router.common.UniversalPipelineTestUtils @@ -102,7 +103,14 @@ class FHIRTranslatorIntegrationTests : Logging { metadata, settings, reportService = ReportService(ReportGraph(ReportStreamTestDatabaseContainer.testDatabaseAccess)), - azureEventService = azureEventService + azureEventService = azureEventService, + reportStreamEventService = ReportStreamEventService( + ReportStreamTestDatabaseContainer.testDatabaseAccess, azureEventService, + ReportService( + ReportGraph(ReportStreamTestDatabaseContainer.testDatabaseAccess), + ReportStreamTestDatabaseContainer.testDatabaseAccess + ) + ) ) } diff --git a/prime-router/src/test/kotlin/fhirengine/engine/FHIRReceiverTests.kt b/prime-router/src/test/kotlin/fhirengine/engine/FHIRReceiverTests.kt deleted file mode 100644 index 8edc912f5f1..00000000000 --- a/prime-router/src/test/kotlin/fhirengine/engine/FHIRReceiverTests.kt +++ /dev/null @@ -1,273 +0,0 @@ -package gov.cdc.prime.router.fhirengine.engine - -import assertk.assertThat -import assertk.assertions.hasSize -import assertk.assertions.isEqualTo -import com.microsoft.azure.functions.HttpStatus -import gov.cdc.prime.reportstream.shared.Submission -import gov.cdc.prime.router.ActionLog -import gov.cdc.prime.router.ActionLogDetail -import gov.cdc.prime.router.ActionLogger -import gov.cdc.prime.router.CovidSender -import gov.cdc.prime.router.CustomerStatus -import gov.cdc.prime.router.DeepOrganization -import gov.cdc.prime.router.FileSettings -import gov.cdc.prime.router.InvalidParamMessage -import gov.cdc.prime.router.Metadata -import gov.cdc.prime.router.MimeFormat -import gov.cdc.prime.router.Organization -import gov.cdc.prime.router.Receiver -import gov.cdc.prime.router.Schema -import gov.cdc.prime.router.SettingsProvider -import gov.cdc.prime.router.Topic -import gov.cdc.prime.router.azure.ActionHistory -import gov.cdc.prime.router.azure.BlobAccess -import gov.cdc.prime.router.azure.DatabaseAccess -import gov.cdc.prime.router.azure.SubmissionTableService -import gov.cdc.prime.router.azure.db.enums.TaskAction -import gov.cdc.prime.router.azure.db.tables.pojos.Action -import gov.cdc.prime.router.common.cleanHL7Record -import gov.cdc.prime.router.report.ReportService -import io.mockk.clearAllMocks -import io.mockk.every -import io.mockk.mockk -import io.mockk.mockkClass -import io.mockk.mockkObject -import io.mockk.spyk -import io.mockk.unmockkAll -import io.mockk.verify -import org.jooq.tools.jdbc.MockConnection -import org.jooq.tools.jdbc.MockDataProvider -import org.jooq.tools.jdbc.MockResult -import org.junit.jupiter.api.AfterEach -import org.junit.jupiter.api.BeforeEach -import java.util.UUID -import kotlin.test.Test - -class FHIRReceiverTest { - - // Common mock objects and setup - val dataProvider = MockDataProvider { emptyArray() } - val connection = MockConnection(dataProvider) - val accessSpy = spyk(DatabaseAccess(connection)) - val blobMock = mockkClass(BlobAccess::class) - val reportService: ReportService = mockk() - private val submissionTableService: SubmissionTableService = mockk() - - val oneOrganization = DeepOrganization( - "co-phd", - "test", - Organization.Jurisdiction.FEDERAL, - receivers = listOf(Receiver("elr", "co-phd", Topic.TEST, CustomerStatus.INACTIVE, "one")) - ) - val settings = FileSettings().loadOrganizations(oneOrganization) - val one = Schema(name = "None", topic = Topic.FULL_ELR, elements = emptyList()) - val metadata = Metadata(schema = one) - - private fun makeFhirReceiver(metadata: Metadata, settings: SettingsProvider): FHIRReceiver { - return FHIRReceiver( - metadata, - settings, - accessSpy, - blobMock, - reportService = reportService, - submissionTableService = submissionTableService - ) - -// FHIREngine.Builder().metadata(metadata).settingsProvider(settings).databaseAccess(accessSpy) -// .reportService(reportService).blobAccess(blobMock).build(taskAction) - } - - @BeforeEach - fun reset() { - clearAllMocks() - } - - @AfterEach - fun tearDown() { - unmockkAll() - } - - data class FHIRTestSetup( - val engine: FHIRReceiver, - val actionLogger: ActionLogger, - val actionHistory: ActionHistory, - val message: FhirReceiveQueueMessage, - ) - - private fun setupMocksForProcessingTest( - clientId: String, - contentType: String, - customerStatus: CustomerStatus, - hasErrors: Boolean, - reportID: UUID = UUID.randomUUID(), - - ): FHIRTestSetup { - mockkObject(BlobAccess) - val actionHistory = mockk() - val actionLogger = mockk() - val sender = CovidSender( - "Test Sender", - "test", - MimeFormat.HL7, - schemaName = "one", - customerStatus = customerStatus - ) - - val engine = spyk(makeFhirReceiver(metadata, settings)) - val message = mockk(relaxed = true) - val action = Action() - action.actionName = TaskAction.receive - - val headers = mapOf( - "x-azure-clientip" to "0.0.0.0", - "payloadname" to "test_message", - "client_id" to clientId, - "content-type" to contentType - ) - - every { message.headers } returns headers - every { message.reportId } returns reportID - every { actionLogger.hasErrors() } returns hasErrors - every { actionLogger.setReportId(any()) } returns actionLogger - every { actionLogger.error(any()) } returns Unit - every { engine.settings.findSender(any()) } returns sender - every { actionHistory.trackActionResult(any()) } returns Unit - every { actionHistory.trackActionParams(any()) } returns Unit - every { actionHistory.trackActionSenderInfo(any(), any()) } returns Unit - every { actionHistory.trackExternalInputReport(any(), any()) } returns Unit - every { actionHistory.trackLogs(any>()) } returns Unit - every { submissionTableService.insertSubmission(any()) } returns Unit - every { actionHistory.action } returns action - every { BlobAccess.downloadBlob(any(), any()) }.returns(cleanHL7Record) - - return FHIRTestSetup(engine, actionLogger, actionHistory, message) - } - - @Test - fun `test handle sender not found`() { - val fhirSetup = - setupMocksForProcessingTest( - "unknown_client_id", - "application/hl7-v2;test", - CustomerStatus.ACTIVE, - true - ) - val engine = fhirSetup.engine - val queueMessage = fhirSetup.message - val actionLogger = ActionLogger() - val actionHistory = fhirSetup.actionHistory - - every { engine.settings.findSender(any()) } returns null - - accessSpy.transact { txn -> - engine.run(queueMessage, actionLogger, actionHistory, txn) - } - - assertThat(actionLogger.errors).hasSize(0) - - val reportId = queueMessage.reportId.toString() - val blobURL = queueMessage.blobURL - verify(exactly = 1) { - Submission( - reportId, - "Rejected", - blobURL, - "Sender not found matching client_id: unknown_client_id" - ) - submissionTableService.insertSubmission(any()) - } - } - - @Test - fun `test handle inactive sender`() { - val fhirSetup = - setupMocksForProcessingTest( - "known_client_id", - "application/hl7-v2;test", - CustomerStatus.INACTIVE, - true - ) - val engine = fhirSetup.engine - val queueMessage = fhirSetup.message - val actionLogger = ActionLogger() - val actionHistory = fhirSetup.actionHistory - - accessSpy.transact { txn -> - engine.run(queueMessage, actionLogger, actionHistory, txn) - } - - assertThat(actionLogger.errors).hasSize(1) - - assertThat( - actionLogger.errors[0].equals( - actionLogger.errors[0].equals( - InvalidParamMessage("Sender has customer status INACTIVE: unknown_client_id") - ) - ) - ) - - verify(exactly = 1) { - submissionTableService.insertSubmission(any()) - actionHistory.trackActionResult(HttpStatus.NOT_ACCEPTABLE) - actionHistory.trackActionSenderInfo("test.Test Sender", "test_message") - } - } - - @Test - fun `test successful processing`() { - val reportID = UUID.randomUUID() - val fhirSetup = - setupMocksForProcessingTest( - "known_client_id", - "application/hl7-v2;test", - CustomerStatus.ACTIVE, - false, - reportID - ) - val engine = fhirSetup.engine - val queueMessage = fhirSetup.message - val actionLogger = fhirSetup.actionLogger - val actionHistory = fhirSetup.actionHistory - every { actionLogger.errors } returns emptyList() - - accessSpy.transact { txn -> - engine.run(queueMessage, actionLogger, actionHistory, txn) - } - - verify(exactly = 1) { - actionHistory.trackActionResult(HttpStatus.CREATED) - actionHistory.trackActionSenderInfo("test.Test Sender", "test_message") - actionHistory.trackExternalInputReport(any(), any()) - submissionTableService.insertSubmission(any()) - } - } - - @Test - fun `test invalid MIME type`() { - val fhirSetup = - setupMocksForProcessingTest( - "known_client_id", - "invalid/mime-type", - CustomerStatus.ACTIVE, - true - ) - val engine = fhirSetup.engine - val queueMessage = fhirSetup.message - val actionLogger = ActionLogger() - val actionHistory = fhirSetup.actionHistory - - var exception: Exception? = null - try { - accessSpy.transact { txn -> - engine.run(queueMessage, actionLogger, actionHistory, txn) - } - } catch (e: Exception) { - exception = e - } - - assertThat(exception!!.javaClass.name).isEqualTo("java.lang.IllegalArgumentException") - assertThat(actionLogger.errors).hasSize(1) - assertThat(actionLogger.errors[0].detail.message).isEqualTo("Unexpected MIME type invalid/mime-type.") - } -} \ No newline at end of file diff --git a/prime-router/src/test/kotlin/fhirengine/engine/FhirConverterTests.kt b/prime-router/src/test/kotlin/fhirengine/engine/FhirConverterTests.kt index 6b48d2e7260..33731c0e5af 100644 --- a/prime-router/src/test/kotlin/fhirengine/engine/FhirConverterTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/engine/FhirConverterTests.kt @@ -28,6 +28,7 @@ import gov.cdc.prime.router.Topic import gov.cdc.prime.router.azure.ActionHistory import gov.cdc.prime.router.azure.BlobAccess import gov.cdc.prime.router.azure.DatabaseAccess +import gov.cdc.prime.router.azure.SubmissionTableService import gov.cdc.prime.router.azure.db.enums.TaskAction import gov.cdc.prime.router.azure.db.tables.pojos.Action import gov.cdc.prime.router.azure.db.tables.pojos.ReportFile @@ -84,6 +85,7 @@ class FhirConverterTests { val connection = MockConnection(dataProvider) val accessSpy = spyk(DatabaseAccess(connection)) val blobMock = mockkClass(BlobAccess::class) + val mockSubmissionTableService = mockk() val reportService: ReportService = mockk() val oneOrganization = DeepOrganization( "co-phd", @@ -137,7 +139,8 @@ class FhirConverterTests { private fun makeFhirEngine(metadata: Metadata, settings: SettingsProvider, taskAction: TaskAction): FHIREngine { return FHIREngine.Builder().metadata(metadata).settingsProvider(settings).databaseAccess(accessSpy) - .reportService(reportService).blobAccess(blobMock).build(taskAction) + .reportService(reportService).blobAccess(blobMock) + .submissionTableService(mockSubmissionTableService).build(taskAction) } @BeforeEach @@ -559,10 +562,10 @@ class FhirConverterTests { mockkObject(BlobAccess) val engine = spyk(makeFhirEngine(metadata, settings, TaskAction.process) as FHIRConverter) val actionLogger = ActionLogger() - val mockMessage = mockk(relaxed = true) - every { mockMessage.topic } returns Topic.FULL_ELR every { BlobAccess.downloadBlob(any(), any()) } returns "" - val bundles = engine.process(MimeFormat.FHIR, mockMessage, actionLogger) + val bundles = engine.process( + MimeFormat.FHIR, "", "", Topic.FULL_ELR, actionLogger + ) assertThat(bundles).isEmpty() assertThat(actionLogger.errors.map { it.detail.message }).contains("Provided raw data is empty.") } @@ -584,7 +587,9 @@ class FhirConverterTests { every { mockMessage.topic } returns Topic.FULL_ELR every { mockMessage.reportId } returns UUID.randomUUID() every { BlobAccess.downloadBlob(any(), any()) } returns simpleHL7 - val bundles = engine.process(MimeFormat.HL7, mockMessage, actionLogger) + val bundles = engine.process( + MimeFormat.HL7, "", "", Topic.FULL_ELR, actionLogger + ) assertThat(bundles).isEmpty() assertThat( actionLogger.errors.map { @@ -598,11 +603,10 @@ class FhirConverterTests { mockkObject(BlobAccess) val engine = spyk(makeFhirEngine(metadata, settings, TaskAction.process) as FHIRConverter) val actionLogger = ActionLogger() - val mockMessage = mockk(relaxed = true) - every { mockMessage.topic } returns Topic.FULL_ELR - every { mockMessage.reportId } returns UUID.randomUUID() every { BlobAccess.downloadBlob(any(), any()) } returns "test,1,2" - val bundles = engine.process(MimeFormat.CSV, mockMessage, actionLogger) + val bundles = engine.process( + MimeFormat.CSV, "", "", Topic.FULL_ELR, actionLogger + ) assertThat(bundles).isEmpty() assertThat(actionLogger.errors.map { it.detail.message }) .contains("Received unsupported report format: CSV") @@ -613,11 +617,8 @@ class FhirConverterTests { mockkObject(BlobAccess) val engine = spyk(makeFhirEngine(metadata, settings, TaskAction.process) as FHIRConverter) val actionLogger = ActionLogger() - val mockMessage = mockk(relaxed = true) - every { mockMessage.topic } returns Topic.FULL_ELR - every { mockMessage.reportId } returns UUID.randomUUID() every { BlobAccess.downloadBlob(any(), any()) } returns "{\"id\":}" - val processedItems = engine.process(MimeFormat.FHIR, mockMessage, actionLogger) + val processedItems = engine.process(MimeFormat.FHIR, "", "", Topic.FULL_ELR, actionLogger) assertThat(processedItems).hasSize(1) assertThat(processedItems.first().bundle).isNull() assertThat(actionLogger.errors.map { it.detail.message }).contains( @@ -647,7 +648,9 @@ class FhirConverterTests { every { mockMessage.topic } returns Topic.FULL_ELR every { mockMessage.reportId } returns UUID.randomUUID() every { BlobAccess.downloadBlob(any(), any()) } returns "{\"id\":\"1\", \"resourceType\":\"Bundle\"}" - val processedItems = engine.process(MimeFormat.FHIR, mockMessage, actionLogger) + val processedItems = engine.process( + MimeFormat.FHIR, "", "", Topic.FULL_ELR, actionLogger + ) assertThat(processedItems).hasSize(1) assertThat(processedItems.first().bundle).isNull() assertThat(actionLogger.errors.map { it.detail.message }).contains( @@ -666,7 +669,7 @@ class FhirConverterTests { every { BlobAccess.downloadBlob(any(), any()) } returns unparseableHL7 - val processedItems = engine.process(MimeFormat.HL7, mockMessage, actionLogger) + val processedItems = engine.process(MimeFormat.HL7, "", "", Topic.FULL_ELR, actionLogger) assertThat(processedItems).hasSize(1) assertThat(processedItems.first().bundle).isNull() assertThat( @@ -701,7 +704,7 @@ class FhirConverterTests { every { BlobAccess.downloadBlob(any(), any()) } returns simpleHL7 - val processedItems = engine.process(MimeFormat.HL7, mockMessage, actionLogger) + val processedItems = engine.process(MimeFormat.HL7, "", "", Topic.FULL_ELR, actionLogger) assertThat(processedItems).hasSize(1) assertThat(processedItems.first().bundle).isNull() @Suppress("ktlint:standard:max-line-length") @@ -730,7 +733,7 @@ class FhirConverterTests { every { BlobAccess.downloadBlob(any(), any()) } returns simpleHL7 - val processedItems = engine.process(MimeFormat.HL7, mockMessage, actionLogger) + val processedItems = engine.process(MimeFormat.HL7, "", "", Topic.FULL_ELR, actionLogger) assertThat(processedItems).hasSize(1) assertThat(processedItems.first().bundle).isNull() assertThat( @@ -756,14 +759,14 @@ class FhirConverterTests { } returns """{\"id\":} {"id":"1", "resourceType":"Bundle"} """.trimMargin() - val processedItems = engine.process(MimeFormat.FHIR, mockMessage, actionLogger) + val processedItems = engine.process(MimeFormat.FHIR, "", "", Topic.FULL_ELR, actionLogger) assertThat(processedItems).hasSize(2) assertThat(actionLogger.errors.map { it.detail.message }).contains( @Suppress("ktlint:standard:max-line-length") "Item 1 in the report was not parseable. Reason: exception while parsing FHIR: HAPI-1861: Failed to parse JSON encoded FHIR content: Unexpected character ('\\' (code 92)): was expecting double-quote to start field name\n at [line: 1, column: 2]" ) - val bundles2 = engine.process(MimeFormat.FHIR, mockMessage, actionLogger, false) + val bundles2 = engine.process(MimeFormat.FHIR, "", "", Topic.FULL_ELR, actionLogger, false) assertThat(bundles2).hasSize(0) assertThat(actionLogger.errors.map { it.detail.message }).contains( @Suppress("ktlint:standard:max-line-length") @@ -783,7 +786,7 @@ class FhirConverterTests { every { BlobAccess.downloadBlob(any(), any()) } returns simpleHL7 - val bundles = engine.process(MimeFormat.HL7, mockMessage, actionLogger) + val bundles = engine.process(MimeFormat.HL7, "", "", Topic.FULL_ELR, actionLogger) assertThat(bundles).hasSize(1) assertThat(actionLogger.errors).isEmpty() } @@ -803,7 +806,7 @@ class FhirConverterTests { every { BlobAccess.downloadBlob(any(), any()) } returns simpleHL7 + "\n" + simpleHL7 + "\n" + simpleHL7 - val bundles = engine.process(MimeFormat.HL7, mockMessage, actionLogger) + val bundles = engine.process(MimeFormat.HL7, "", "", Topic.FULL_ELR, actionLogger) assertThat(bundles).hasSize(3) assertThat(actionLogger.errors).isEmpty() @@ -834,7 +837,7 @@ class FhirConverterTests { every { BlobAccess.downloadBlob(any(), any()) } returns simpleHL7 - val bundles = engine.process(MimeFormat.HL7, mockMessage, actionLogger) + val bundles = engine.process(MimeFormat.HL7, "", "", Topic.FULL_ELR, actionLogger) assertThat(bundles).hasSize(1) assertThat(actionLogger.errors).isEmpty() } diff --git a/prime-router/src/test/kotlin/fhirengine/engine/FhirDestinationFilterTests.kt b/prime-router/src/test/kotlin/fhirengine/engine/FhirDestinationFilterTests.kt index 964308c2ed6..8dd47bbaf17 100644 --- a/prime-router/src/test/kotlin/fhirengine/engine/FhirDestinationFilterTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/engine/FhirDestinationFilterTests.kt @@ -236,13 +236,14 @@ class FhirDestinationFilterTests { ) private fun makeFhirEngine(metadata: Metadata, settings: SettingsProvider): FHIREngine { - val rootReport = mockk() + val rootReport = mockk(relaxed = true) every { rootReport.reportId } returns submittedId every { rootReport.sendingOrg } returns "sendingOrg" every { rootReport.sendingOrgClient } returns "sendingOrgClient" every { reportServiceMock.getRootReport(any()) } returns rootReport every { reportServiceMock.getRootReports(any()) } returns listOf(rootReport) every { reportServiceMock.getRootItemIndex(any(), any()) } returns 1 + every { accessSpy.fetchReportFile(any()) } returns rootReport return FHIREngine.Builder() .metadata(metadata) diff --git a/shared/src/main/kotlin/gov/cdc/prime/reportstream/shared/QueueMessage.kt b/shared/src/main/kotlin/gov/cdc/prime/reportstream/shared/QueueMessage.kt index f10cd20d744..06146f7bf86 100644 --- a/shared/src/main/kotlin/gov/cdc/prime/reportstream/shared/QueueMessage.kt +++ b/shared/src/main/kotlin/gov/cdc/prime/reportstream/shared/QueueMessage.kt @@ -68,7 +68,7 @@ interface QueueMessage { /** * Constant for receive queue on UP */ - const val elrReceiveQueueName = "elr-fhir-receive" + const val elrSubmissionConvertQueueName = "elr-fhir-convert-submission" /** * Constant for convert queue on UP @@ -152,7 +152,7 @@ interface QueueMessage { ) : QueueMessage, ReportInformation, ReceiveInformation { - override val messageQueueName = elrReceiveQueueName + override val messageQueueName = elrSubmissionConvertQueueName } /** diff --git a/submissions/src/main/resources/application.yml b/submissions/src/main/resources/application.yml index c75d070b3fa..b4794b2d1fa 100644 --- a/submissions/src/main/resources/application.yml +++ b/submissions/src/main/resources/application.yml @@ -13,7 +13,7 @@ azure: storage: connection-string: ${AZURE_STORAGE_CONNECTION_STRING:DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://localhost:10000/devstoreaccount1;QueueEndpoint=http://localhost:10001/devstoreaccount1;TableEndpoint=http://127.0.0.1:10002/devstoreaccount1;} container-name: ${AZURE_STORAGE_CONTAINER_NAME:reports} - queue-name: ${AZURE_STORAGE_QUEUE_NAME:elr-fhir-receive} + queue-name: ${AZURE_STORAGE_QUEUE_NAME:elr-fhir-convert-submission} table-name: ${AZURE_STORAGE_TABLE_NAME:submission} allowed: