Skip to content

Commit

Permalink
Merge branch 'kotlin-vertx-benchmarks-revamp-coroutine-router-support…
Browse files Browse the repository at this point in the history
…' into kotlin-vertx-benchmarks-revamp
  • Loading branch information
ShreckYe committed Oct 30, 2024
2 parents 11a2473 + bfdcf67 commit f607f4e
Showing 1 changed file with 20 additions and 25 deletions.
45 changes: 20 additions & 25 deletions frameworks/Kotlin/vertx-web-kotlinx/src/main/kotlin/MainVerticle.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import io.vertx.ext.web.Route
import io.vertx.ext.web.Router
import io.vertx.ext.web.RoutingContext
import io.vertx.kotlin.core.http.httpServerOptionsOf
import io.vertx.kotlin.coroutines.CoroutineRouterSupport
import io.vertx.kotlin.coroutines.CoroutineVerticle
import io.vertx.kotlin.coroutines.coAwait
import io.vertx.kotlin.pgclient.pgConnectOptionsOf
Expand All @@ -17,7 +18,6 @@ import io.vertx.sqlclient.Row
import io.vertx.sqlclient.RowSet
import io.vertx.sqlclient.Tuple
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.html.*
import kotlinx.html.stream.appendHTML
import kotlinx.io.buffered
Expand All @@ -29,21 +29,7 @@ import kotlinx.serialization.json.io.encodeToSink
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter

class MainVerticle(val hasDb: Boolean) : CoroutineVerticle() {
inline fun Route.checkedCoroutineHandlerUnconfined(crossinline requestHandler: suspend (RoutingContext) -> Unit): Route =
handler { ctx ->
/* Some conclusions from the Plaintext test results with trailing `await()`s:
1. `launch { /*...*/ }` < `launch(start = CoroutineStart.UNDISPATCHED) { /*...*/ }` < `launch(Dispatchers.Unconfined) { /*...*/ }`.
1. `launch { /*...*/ }` without `context` or `start` lead to `io.netty.channel.StacklessClosedChannelException` and `io.netty.channel.unix.Errors$NativeIoException: sendAddress(..) failed: Connection reset by peer`. */
launch(Dispatchers.Unconfined) {
try {
requestHandler(ctx)
} catch (t: Throwable) {
ctx.fail(t)
}
}
}

class MainVerticle(val hasDb: Boolean) : CoroutineVerticle(), CoroutineRouterSupport {
// `PgConnection`s as used in the "vertx" portion offers better performance than `PgPool`s.
lateinit var pgConnection: PgConnection
lateinit var date: String
Expand Down Expand Up @@ -118,10 +104,18 @@ class MainVerticle(val hasDb: Boolean) : CoroutineVerticle() {
putHeader(HttpHeaders.CONTENT_TYPE, "application/json")
}

inline fun <reified T : Any> Route.jsonResponseHandler(
serializer: SerializationStrategy<T>, crossinline requestHandler: suspend (RoutingContext) -> @Serializable T

fun Route.coHandlerUnconfined(requestHandler: suspend (RoutingContext) -> Unit): Route =
/* Some conclusions from the Plaintext test results with trailing `await()`s:
1. `launch { /*...*/ }` < `launch(start = CoroutineStart.UNDISPATCHED) { /*...*/ }` < `launch(Dispatchers.Unconfined) { /*...*/ }`.
1. `launch { /*...*/ }` without `context` or `start` lead to `io.netty.channel.StacklessClosedChannelException` and `io.netty.channel.unix.Errors$NativeIoException: sendAddress(..) failed: Connection reset by peer`. */
coHandler(Dispatchers.Unconfined, requestHandler)

inline fun <reified T : Any> Route.jsonResponseCoHandler(
serializer: SerializationStrategy<T>,
crossinline requestHandler: suspend (RoutingContext) -> @Serializable T
) =
checkedCoroutineHandlerUnconfined {
coHandlerUnconfined {
it.response().run {
putJsonResponseHeader()

Expand Down Expand Up @@ -149,6 +143,7 @@ class MainVerticle(val hasDb: Boolean) : CoroutineVerticle() {
}
}


suspend fun selectRandomWorlds(queries: Int): List<World> {
val rowSets = List(queries) {
selectWorldQuery.execute(Tuple.of(randomIntBetween1And10000()))
Expand All @@ -157,21 +152,21 @@ class MainVerticle(val hasDb: Boolean) : CoroutineVerticle() {
}

fun Router.routes() {
get("/json").jsonResponseHandler(Serializers.message) {
get("/json").jsonResponseCoHandler(Serializers.message) {
jsonSerializationMessage
}

get("/db").jsonResponseHandler(Serializers.world) {
get("/db").jsonResponseCoHandler(Serializers.world) {
val rowSet = selectWorldQuery.execute(Tuple.of(randomIntBetween1And10000())).coAwait()
rowSet.single().toWorld()
}

get("/queries").jsonResponseHandler(Serializers.worlds) {
get("/queries").jsonResponseCoHandler(Serializers.worlds) {
val queries = it.request().getQueries()
selectRandomWorlds(queries)
}

get("/fortunes").checkedCoroutineHandlerUnconfined {
get("/fortunes").coHandlerUnconfined {
val fortunes = mutableListOf<Fortune>()
selectFortuneQuery.execute().coAwait()
.mapTo(fortunes) { it.toFortune() }
Expand Down Expand Up @@ -208,7 +203,7 @@ class MainVerticle(val hasDb: Boolean) : CoroutineVerticle() {
}
}

get("/updates").jsonResponseHandler(Serializers.worlds) {
get("/updates").jsonResponseCoHandler(Serializers.worlds) {
val queries = it.request().getQueries()
val worlds = selectRandomWorlds(queries)
val updatedWorlds = worlds.map { it.copy(randomNumber = randomIntBetween1And10000()) }
Expand All @@ -228,7 +223,7 @@ class MainVerticle(val hasDb: Boolean) : CoroutineVerticle() {
updatedWorlds
}

get("/plaintext").checkedCoroutineHandlerUnconfined {
get("/plaintext").coHandlerUnconfined {
it.response().run {
putCommonHeaders()
putHeader(HttpHeaders.CONTENT_TYPE, "text/plain")
Expand Down

0 comments on commit f607f4e

Please sign in to comment.