diff --git a/README.md b/README.md index 30fdf1b..0f0f709 100644 --- a/README.md +++ b/README.md @@ -53,3 +53,12 @@ Currently, supported brokers are: - RabbitMQ (GCP, AWS) - Azure Service Bus +### Migrations + +The correct way to migrate the workload is: +1) Create new task with new workload +2) Maintain 2 tasks until messages for the old one are depleted +3) Delete old task + +Important do not change task input as Json deserializer will break causing the queue to block. +To mitigate the mentioned problem we introduced default behavior to automatically drop tasks on SerialisationError \ No newline at end of file diff --git a/src/main/kotlin/RabbitMQBroker.kt b/src/main/kotlin/RabbitMQBroker.kt index b23c796..f8519e9 100644 --- a/src/main/kotlin/RabbitMQBroker.kt +++ b/src/main/kotlin/RabbitMQBroker.kt @@ -8,8 +8,6 @@ import com.rabbitmq.client.impl.MicrometerMetricsCollector import io.github.oshai.kotlinlogging.KotlinLogging import io.micrometer.core.instrument.logging.LoggingMeterRegistry import kotlinx.coroutines.* -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock import loggingScope import withLogCtx import kotlin.time.Duration.Companion.minutes @@ -26,7 +24,7 @@ class RabbitMQBroker( ) : IMessageBroker { private val createdQueues = mutableSetOf() private val createdDelayQueues = mutableSetOf() - private var queues: RabbitMQQueues + private var queues: QueueDefinitions var connection: Connection var channel: Channel @@ -54,7 +52,7 @@ class RabbitMQBroker( factory.isAutomaticRecoveryEnabled = true connection = factory.newConnection() channel = connection.createChannel() - queues = RabbitMQQueues(channel) + queues = QueueDefinitions(channel) // Create DELAYED exchange channel.exchangeDeclare(delayExchangeName, "headers", true) @@ -197,10 +195,10 @@ class RabbitMQConsumer(val consumer: DefaultConsumer) : IConsumer { } -class RabbitMQQueues( +private class QueueDefinitions( var channel: Channel, ) { - data class RabbitMQQueueDeclaration( + data class QueueDeclaration( val queueName: String, val durable: Boolean, val exclusive: Boolean, @@ -208,7 +206,7 @@ class RabbitMQQueues( val arguments: Map?, ) - val declarations = mutableSetOf() + val declarations = mutableSetOf() fun declare( queueName: String, @@ -218,12 +216,12 @@ class RabbitMQQueues( arguments: Map?, ) { val declaration = - RabbitMQQueueDeclaration(queueName, durable, exclusive, autoDelete, arguments) + QueueDeclaration(queueName, durable, exclusive, autoDelete, arguments) declarations.add(declaration) this.declare(declaration) } - fun declare(d: RabbitMQQueueDeclaration): AMQP.Queue.DeclareOk? { + fun declare(d: QueueDeclaration): AMQP.Queue.DeclareOk? { return channel.queueDeclare(d.queueName, d.durable, d.exclusive, d.autoDelete, d.arguments) } } diff --git a/src/main/kotlin/Task.kt b/src/main/kotlin/Task.kt index b2f57ca..8312725 100644 --- a/src/main/kotlin/Task.kt +++ b/src/main/kotlin/Task.kt @@ -1,5 +1,6 @@ package com.zamna.kotask +import io.github.oshai.kotlinlogging.KLogger import io.github.oshai.kotlinlogging.KotlinLogging import kotlinx.serialization.KSerializer import kotlinx.serialization.Serializable @@ -36,6 +37,7 @@ class Task @PublishedApi internal constructor( private var logger = KotlinLogging.logger { } companion object { + inline fun create( name: String, retry: IRetryPolicy? = null, noinline handler: TaskHandler, ) = Task(serializer(), name, retry, handler) @@ -43,7 +45,6 @@ class Task @PublishedApi internal constructor( inline fun create( name: String, retry: IRetryPolicy? = null, noinline handler: OnlyInputTaskHandler, ) = create(name, retry, handler.toTaskHandler()) - fun create( name: String, retry: IRetryPolicy? = null, handler: NoArgTaskHandler, ) = create(name, retry, handler.toTaskHandler()) @@ -77,6 +78,7 @@ class Task @PublishedApi internal constructor( return TaskCallFactory(this as Task, NoInput, manager) } + fun createTaskCall( input: T, params: CallParams = CallParams(), @@ -86,6 +88,8 @@ class Task @PublishedApi internal constructor( return manager.createTaskCall(this, inputStr, params) } +// } + suspend fun execute(inputStr: String, params: CallParams, manager: TaskManager) { val logCtx = mapOf( "task" to name, @@ -106,29 +110,32 @@ class Task @PublishedApi internal constructor( withLogCtx("action" to TaskEvents.MESSAGE_COMPLETE) { logger.info { "Complete task $name with callId=${params.callId} with $inputStr" } } - } catch (e: RepeatTask) { - withLogCtx("action" to TaskEvents.MESSAGE_SUBMIT_RETRY) { - logger.info { "Received RepeatTask from task $name with callId=${params.callId} with $inputStr" } - manager.enqueueTaskCall(this, inputStr, e.getRetryCallParams(params)) - } - } catch (e: ForceRetry) { - withLogCtx("action" to TaskEvents.MESSAGE_SUBMIT_RETRY) { - logger.info { "Received ForceRetry from task $name with callId=${params.callId} with $inputStr" } - manager.enqueueTaskCall(this, inputStr, e.getRetryCallParams(params)) - } - } catch (e: FailNoRetry) { - withLogCtx("action" to TaskEvents.MESSAGE_FAIL_NO_RETRY) { - logger.info { "Received FailNoRetry from task $name with callId=${params.callId} with $inputStr" } - } - } catch (e: SerializationException) { - withLogCtx("action" to TaskEvents.MESSAGE_FAIL_NO_RETRY) { - logger.error { "Task got bad json" } + } catch (e: RetryControlException) { + when (e) { + is RepeatTask -> withLogCtx("action" to TaskEvents.MESSAGE_SUBMIT_RETRY) { + logger.info { "Received RepeatTask from task $name with callId=${params.callId} with $inputStr" } + manager.enqueueTaskCall(this, inputStr, e.getRetryCallParams(params)) + } + is ForceRetry -> withLogCtx("action" to TaskEvents.MESSAGE_SUBMIT_RETRY) { + logger.info { "Received ForceRetry from task $name with callId=${params.callId} with $inputStr" } + manager.enqueueTaskCall(this, inputStr, e.getRetryCallParams(params)) + } + is FailNoRetry -> withLogCtx("action" to TaskEvents.MESSAGE_FAIL_NO_RETRY) { + logger.info { "Received FailNoRetry from task $name with callId=${params.callId} with $inputStr" } + } } } catch (e: Throwable) { withLogCtx("action" to TaskEvents.MESSAGE_FAIL) { logger.error(e) { "Task $name failed with callId=${params.callId} with $inputStr" } } + for ((exception, handler) in manager.taskErrorHandlers) { + if (e::class.java == exception) { + handler.invoke(logger, inputStr) + return + } + } + if (getRetryPolicy(manager).shouldRetry(params)) { withLogCtx("action" to TaskEvents.MESSAGE_FAIL_RETRY) { logger.info(e) { "Retry task $name with callId=${params.callId} with $inputStr" } @@ -197,3 +204,7 @@ data class CallParams( fun nextAttempt() = copy(attemptNum = attemptNum + 1) } + +fun interface TaskErrorHandler { + fun invoke(logger: KLogger, inputMessage: String) +} diff --git a/src/main/kotlin/TaskManager.kt b/src/main/kotlin/TaskManager.kt index 796e694..bfdd777 100644 --- a/src/main/kotlin/TaskManager.kt +++ b/src/main/kotlin/TaskManager.kt @@ -7,7 +7,9 @@ import kotlinx.datetime.Clock import kotlinx.datetime.Instant import kotlinx.serialization.Serializable import cleanScheduleWorker +import io.github.oshai.kotlinlogging.KLogger import io.github.oshai.kotlinlogging.KotlinLogging +import kotlinx.serialization.SerializationException import loggingScope import withLogCtx import kotlin.time.Duration @@ -16,6 +18,14 @@ import kotlin.time.Duration.Companion.seconds import kotlin.time.DurationUnit import kotlin.time.toDuration + +public val discardTaskOnSerialisationProblem: Pair, TaskErrorHandler> = SerializationException::class.java to + TaskErrorHandler { logger, inputStr -> + withLogCtx("action" to TaskEvents.MESSAGE_FAIL_NO_RETRY) { + logger.error { "Can't deserialize json as task input. Json: $inputStr" } + } + } + // TODO(baitcode): TaskManager is getting huge class TaskManager( private val broker: IMessageBroker, @@ -23,7 +33,11 @@ class TaskManager( private val queueNamePrefix: String = "kotask-", val defaultRetryPolicy: IRetryPolicy = RetryPolicy(4.seconds, 20, expBackoff = true, maxDelay = 1.hours), schedulersScope: CoroutineScope? = null, + val taskErrorHandlers: List, TaskErrorHandler>> = listOf( + discardTaskOnSerialisationProblem, // deafault + ) ): AutoCloseable { + private val knownTasks: MutableMap> = mutableMapOf() // TODO(baitcode): Why use list? When we always have single consumer. Is it for concurrency. internal val tasksConsumers: MutableMap> = mutableMapOf() diff --git a/src/test/kotlin/TaskManagerTest.kt b/src/test/kotlin/TaskManagerTest.kt index 6bd76a2..a8beefd 100644 --- a/src/test/kotlin/TaskManagerTest.kt +++ b/src/test/kotlin/TaskManagerTest.kt @@ -13,6 +13,9 @@ import io.kotest.framework.concurrency.eventually import io.kotest.framework.concurrency.until import io.kotest.matchers.shouldBe import io.kotest.matchers.shouldNotBe +import io.mockk.every +import io.mockk.mockk +import io.mockk.verify import kotlinx.coroutines.launch import kotlinx.datetime.Clock import org.testcontainers.containers.wait.strategy.Wait @@ -212,3 +215,37 @@ fun taskManagerTest(taskManager: TaskManager) = funSpec { } } + + + + +class TaskManagerErrorHandling: FunSpec({ + class UnhadledError : Exception() + + val errorHandler = mockk() + every { errorHandler.invoke(any(), any()) } returns Unit + + val tm = TaskManager( + LocalBroker(), + taskErrorHandlers = listOf( + UnhadledError::class.java to errorHandler + ) + ) + + val testTask1 = + Task.create("failing-task-${randomSuffix()}",) { ctx, input: TaskTrackExecutionWithContextCountInput -> + throw UnhadledError() + } + + test("test error") { + tm.startWorkers(testTask1) + + TaskTrackExecutionWithContextCountInput.new().let { + testTask1.callLater(it) + eventually(1000) { + verify(exactly = 1) { errorHandler.invoke(any(), any()) } + } + + } + } +}) \ No newline at end of file