Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KTOR-7777 Release proper InetSocketAddress #4482

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ internal class Endpoint(
private val deliveryPoint: Channel<RequestTask> = Channel()
private val maxEndpointIdleTime: Long = 2 * config.endpoint.connectTimeout

private var connectionAddress: InetSocketAddress? = null

private val timeout = launch(coroutineContext + CoroutineName("Endpoint timeout($host:$port)")) {
try {
while (true) {
Expand Down Expand Up @@ -100,7 +98,7 @@ internal class Endpoint(
callContext: CoroutineContext
): HttpResponseData {
try {
val connection = connect(request)
val (address, connection) = connect(request)
val input = connection.input
val originOutput = connection.output

Expand All @@ -118,7 +116,7 @@ internal class Endpoint(
} catch (cause: Throwable) {
LOGGER.debug("An error occurred while closing connection", cause)
} finally {
releaseConnection()
releaseConnection(address)
}
}

Expand Down Expand Up @@ -182,7 +180,7 @@ internal class Endpoint(
}

private suspend fun createPipeline(request: HttpRequestData) {
val connection = connect(request)
val (address, connection) = connect(request)

val pipeline = ConnectionPipeline(
config.endpoint.keepAliveTime,
Expand All @@ -193,11 +191,11 @@ internal class Endpoint(
coroutineContext
)

pipeline.pipelineContext.invokeOnCompletion { releaseConnection() }
pipeline.pipelineContext.invokeOnCompletion { releaseConnection(address) }
}

@Suppress("UNUSED_EXPRESSION")
private suspend fun connect(requestData: HttpRequestData): Connection {
private suspend fun connect(requestData: HttpRequestData): Pair<InetSocketAddress, Connection> {
val connectAttempts = config.endpoint.connectAttempts
val (connectTimeout, socketTimeout) = retrieveTimeouts(requestData)
var timeoutFails = 0
Expand All @@ -211,7 +209,7 @@ internal class Endpoint(
val connect: suspend CoroutineScope.() -> Socket = {
connectionFactory.connect(address) {
this.socketTimeout = socketTimeout
}.also { connectionAddress = address }
}
}

val socket = when (connectTimeout) {
Expand All @@ -227,7 +225,7 @@ internal class Endpoint(
}

val connection = socket.connection()
if (!secure) return@connect connection
if (!secure) return@connect address to connection

try {
if (proxy?.type == ProxyType.HTTP) {
Expand All @@ -241,7 +239,7 @@ internal class Endpoint(
takeFrom(config.https)
serverName = serverName ?: realAddress.hostname
}
return tlsSocket.connection()
return address to tlsSocket.connection()
} catch (cause: Throwable) {
try {
socket.close()
Expand Down Expand Up @@ -288,8 +286,7 @@ internal class Endpoint(
return connectTimeout to socketTimeout
}

private fun releaseConnection() {
val address = connectionAddress ?: return
private fun releaseConnection(address: InetSocketAddress) {
connectionFactory.release(address)
connections.decrementAndGet()
}
Expand Down