Skip to content

Commit

Permalink
Added Interval Expression to manage time generation in PostgreSQL
Browse files Browse the repository at this point in the history
  • Loading branch information
baitcode committed Apr 9, 2024
1 parent 830d38c commit b198488
Showing 1 changed file with 29 additions and 15 deletions.
44 changes: 29 additions & 15 deletions src/main/kotlin/brokers/PgBroker.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@ package brokers

import MDCContext
import com.zamna.kotask.*
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.*
import kotlinx.serialization.decodeFromString
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import kotlinx.serialization.SerializationException
import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
import org.jetbrains.exposed.sql.javatime.CurrentTimestamp
import org.jetbrains.exposed.sql.javatime.timestamp
import org.jetbrains.exposed.sql.javatime.*
import org.jetbrains.exposed.sql.transactions.transaction
import org.jetbrains.exposed.sql.vendors.ForUpdateOption
import org.jetbrains.exposed.sql.vendors.PostgreSQLDialect
import org.jetbrains.exposed.sql.vendors.currentDialect
import plugins.scheduler.pg.DbWrapper
import java.time.Instant
import java.util.UUID
Expand Down Expand Up @@ -42,6 +45,17 @@ private fun Message.getId() = UUID.fromString(this.headers[PgBroker.ID_HEADER])
private fun Message.getCallId() = UUID.fromString(this.headers["call-id"])
private fun Message.getAttemptNum() = Integer.parseInt(this.headers.getOrDefault("attempt-num", "0"))

class Interval(
val duration: Duration,
) : Expression<Instant>() {
override fun toQueryBuilder(queryBuilder: QueryBuilder) = queryBuilder {
+when {
(currentDialect as? PostgreSQLDialect) != null -> "INTERVAL '1 second' * ${duration.inWholeSeconds}"
else -> TODO("not implemented")
}
}
}

class PgBroker(
val dbWrapper: DbWrapper,
val scope: CoroutineScope = GlobalScope,
Expand All @@ -54,6 +68,8 @@ class PgBroker(
const val ID_HEADER = "${HEADERS_PREFIX}-id"
}

val logger = KotlinLogging.logger { }

init {
transaction(dbWrapper.connection) {
SchemaUtils.createMissingTablesAndColumns(KotaskMessages)
Expand All @@ -71,7 +87,7 @@ class PgBroker(
it[this.body] = message.body
it[this.attemptNum] = message.getAttemptNum()
it[this.delayMs] = message.delayMs
it[this.scheduledAt] = Instant.now().plusMillis(message.delayMs)
it[this.scheduledAt] = PlusOp(CurrentTimestamp(), Interval(message.delayMs.milliseconds), JavaInstantColumnType())
it[this.headers] = Json.encodeToString(
buildMap {
putAll(message.headers)
Expand All @@ -83,22 +99,20 @@ class PgBroker(
}

override fun startConsumer(queueName: QueueName, handler: ConsumerHandler): IConsumer {
var isStopped = false
val job = scope.launch(MDCContext()) {
while (!isStopped) {
while (isActive) {
val messages = transaction(dbWrapper.connection) {
val messages = KotaskMessages
.selectAll()
.andWhere {
.select {
(KotaskMessages.queueName eq queueName) and
(KotaskMessages.scheduledAt less Instant.now()) and
(KotaskMessages.scheduledAt less CurrentTimestamp()) and
(
KotaskMessages.startedAt less Instant.now().minusSeconds(messageReservationTimeoutMs.inWholeSeconds) or // settings
KotaskMessages.startedAt less MinusOp(CurrentTimestamp(), Interval(messageReservationTimeoutMs), JavaInstantColumnType()) or
KotaskMessages.startedAt.isNull()
)
}
.limit(100) // settings
.forUpdate()
.forUpdate(ForUpdateOption.PostgreSQL.ForUpdate(ForUpdateOption.PostgreSQL.MODE.SKIP_LOCKED))
.map { message ->
try {
val headers =
Expand All @@ -110,12 +124,13 @@ class PgBroker(
delayMs = message[KotaskMessages.delayMs],
)
} catch (e: SerializationException) {
logger.error(e) { "Message dropped due to SerializationException" }
KotaskMessages.deleteWhere { callId eq message[callId]}
null
}
}
}.filterNotNull()

val callIds = messages.filterNotNull().map { it.getCallId() }
val callIds = messages.map { it.getCallId() }

KotaskMessages.update({
KotaskMessages.callId inList callIds
Expand All @@ -131,7 +146,7 @@ class PgBroker(
continue
}

messages.filterNotNull().forEach { message ->
messages.forEach { message ->
scope.launch {
handler(message) {
transaction {
Expand All @@ -147,10 +162,9 @@ class PgBroker(

return object: IConsumer {
override fun stop() {
isStopped = true
runBlocking {
// Timeout?
job.join()
job.cancelAndJoin()
}
}
}
Expand Down

0 comments on commit b198488

Please sign in to comment.