Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add function to report metrics for a HTTP.Connection.Pool #1116

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"

[compat]
CodecZlib = "0.7"
ConcurrentUtilities = "2.2"
ConcurrentUtilities = "2.3"
ExceptionUnwrapping = "0.1"
LoggingExtras = "0.4.9,1"
MbedTLS = "0.6.8, 0.7, 1"
Expand Down
107 changes: 82 additions & 25 deletions src/Connections.jl
Original file line number Diff line number Diff line change
Expand Up @@ -350,10 +350,10 @@ end
const CPool{T} = ConcurrentUtilities.Pool{ConnectionKeyType, Connection{T}}

"""
HTTP.Pool(max::Int=HTTP.default_connection_limit[])
HTTP.Pool(limit::Int=HTTP.default_connection_limit[])

Connection pool for managing the reuse of HTTP connections.
`max` controls the maximum number of concurrent connections allowed
`limit` controls the maximum number of concurrent connections allowed
and defaults to the `HTTP.default_connection_limit` value.

A pool can be passed to any of the `HTTP.request` methods via the `pool` keyword argument.
Expand All @@ -364,17 +364,17 @@ struct Pool
mbedtls::CPool{MbedTLS.SSLContext}
openssl::CPool{OpenSSL.SSLStream}
other::IdDict{Type, CPool}
max::Int
limit::Int
end

function Pool(max::Union{Int, Nothing}=nothing)
max = something(max, default_connection_limit[])
function Pool(limit::Union{Int, Nothing}=nothing)
limit = something(limit, default_connection_limit[])
return Pool(ReentrantLock(),
CPool{Sockets.TCPSocket}(max),
CPool{MbedTLS.SSLContext}(max),
CPool{OpenSSL.SSLStream}(max),
CPool{Sockets.TCPSocket}(limit),
CPool{MbedTLS.SSLContext}(limit),
CPool{OpenSSL.SSLStream}(limit),
IdDict{Type, CPool}(),
max,
limit,
)
end

Expand All @@ -384,6 +384,64 @@ const MBEDTLS_POOL = Ref{CPool{MbedTLS.SSLContext}}()
const OPENSSL_POOL = Ref{CPool{OpenSSL.SSLStream}}()
const OTHER_POOL = Lockable(IdDict{Type, CPool}())

"""
HTTP.Connections.metrics([nothing]) -> IdDict{Type,Metrics}

Return a dictionary of connection metrics, keyed by the connection type, for the default global pool.
"""
function metrics(pool::Nothing=nothing)
return IdDict{Type,Metrics}(
Sockets.TCPSocket => Metrics(TCP_POOL[]),
MbedTLS.SSLContext => Metrics(MBEDTLS_POOL[]),
OpenSSL.SSLStream => Metrics(OPENSSL_POOL[]),
(Base.@lock OTHER_POOL.lock (k => Metrics(v) for (k, v) in OTHER_POOL[]))...,
)
end

"""
HTTP.Connections.metrics(pool::Pool) -> IdDict{Type,Metrics}

Return a dictionary of connection metrics, keyed by the connection type, for the given `pool`.
"""
function metrics(pool::Pool)
return IdDict{Type,Metrics}(
Sockets.TCPSocket => Metrics(pool.tcp),
MbedTLS.SSLContext => Metrics(pool.mbedtls),
OpenSSL.SSLStream => Metrics(pool.openssl),
(Base.@lock pool.lock (k => Metrics(v) for (k, v) in pool.other))...,
)
end

Base.@kwdef struct Metrics
limit::Int
in_use::Int
in_pool::Int
end

"""
Metrics(cpool::$CPool)

Metrics for the given connection pool:
- `limit`: the maximum number of connections allowed to be in-use at the same time.
- `in_use`: the number of connections currently in use.
- `in_pool`: the number of connections available for re-use.
"""
function Metrics(cpool::CPool)
return Metrics(
limit=ConcurrentUtilities.Pools.limit(cpool),
in_use=ConcurrentUtilities.Pools.in_use(cpool),
in_pool=ConcurrentUtilities.Pools.in_pool(cpool),
)
end

function Base.show(io::IO, m::Metrics)
print(io, "Metrics(")
print(io, "limit=", m.limit)
print(io, ", in_use=", m.in_use)
print(io, ", in_pool=", m.in_pool)
print(io, ")")
end

getpool(::Nothing, ::Type{Sockets.TCPSocket}) = TCP_POOL[]
getpool(::Nothing, ::Type{MbedTLS.SSLContext}) = MBEDTLS_POOL[]
getpool(::Nothing, ::Type{OpenSSL.SSLStream}) = OPENSSL_POOL[]
Expand All @@ -399,7 +457,7 @@ function getpool(pool::Pool, ::Type{T})::CPool{T} where {T}
elseif T === OpenSSL.SSLStream
return pool.openssl
else
return Base.@lock pool.lock get!(() -> CPool{T}(pool.max), pool.other, T)
return Base.@lock pool.lock get!(() -> CPool{T}(pool.limit), pool.other, T)
end
end

Expand Down Expand Up @@ -451,22 +509,21 @@ function newconnection(::Type{T},
keepalive::Bool=true,
kw...) where {T <: IO}
connection_limit_warning(connection_limit)
return acquire(
getpool(pool, T),
(host, port, require_ssl_verification, keepalive, true);
forcenew=forcenew,
isvalid=c->connection_isvalid(c, Int(idle_timeout))) do
Connection(host, port,
idle_timeout, require_ssl_verification, keepalive,
connect_timeout > 0 ?
try_with_timeout(_ ->
getconnection(T, host, port;
require_ssl_verification=require_ssl_verification, keepalive=keepalive, kw...),
connect_timeout) :
getconnection(T, host, port;
require_ssl_verification=require_ssl_verification, keepalive=keepalive, kw...)
)
function connect(timeout)
if timeout > 0
try_with_timeout(timeout) do _
getconnection(T, host, port; require_ssl_verification=require_ssl_verification, keepalive=keepalive, kw...)
end
else
getconnection(T, host, port; require_ssl_verification=require_ssl_verification, keepalive=keepalive, kw...)
end
end
newconn() = Connection(host, port, idle_timeout, require_ssl_verification, keepalive, connect(connect_timeout))
key = (host, port, require_ssl_verification, keepalive, true)
return acquire(
newconn, getpool(pool, T), key;
forcenew=forcenew, isvalid=c->connection_isvalid(c, Int(idle_timeout)),
)
end

function releaseconnection(c::Connection{T}, reuse; pool::Union{Nothing, Pool}=nothing, kw...) where {T}
Expand Down
75 changes: 68 additions & 7 deletions test/client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ using InteractiveUtils: @which
using ConcurrentUtilities

# test we can adjust default_connection_limit
for x in (10, 12)
HTTP.set_default_connection_limit!(x)
@test HTTP.Connections.TCP_POOL[].max == x
@test HTTP.Connections.MBEDTLS_POOL[].max == x
@test HTTP.Connections.OPENSSL_POOL[].max == x
@testset "set_default_connection_limit!" begin
for x in (10, 12)
HTTP.set_default_connection_limit!(x)
@test HTTP.Connections.TCP_POOL[].limit == x
@test HTTP.Connections.MBEDTLS_POOL[].limit == x
@test HTTP.Connections.OPENSSL_POOL[].limit == x
end
end

@testset "@client macro" begin
Expand Down Expand Up @@ -325,11 +327,11 @@ end
end

@testset "connect_timeout does not include the time needed to acquire a connection from the pool" begin
connection_limit = HTTP.Connections.TCP_POOL[].max
connection_limit = HTTP.Connections.TCP_POOL[].limit
try
dummy_conn = HTTP.Connection(Sockets.TCPSocket())
HTTP.set_default_connection_limit!(1)
@assert HTTP.Connections.TCP_POOL[].max == 1
@assert HTTP.Connections.TCP_POOL[].limit == 1
# drain the pool
acquire(()->dummy_conn, HTTP.Connections.TCP_POOL[], HTTP.Connections.connectionkey(dummy_conn))
# Put it back in 10 seconds
Expand All @@ -342,6 +344,65 @@ end
end
end

@testset "Connections.metrics" begin
# Test that `metrics` function returns the expected values as we use / release connections.
# Initialise a pool, and check that it is empty and has the expected limit
pool = HTTP.Pool(3)
for (T, v) in HTTP.Connections.metrics(pool)
@test v.limit == 3
@test v.in_pool == 0
@test v.in_use == 0
end

TCP = Sockets.TCPSocket
# After a request, check the connection is put in the pool for reuse
HTTP.get("https://$httpbin/get"; pool=pool, socket_type_tls=TCP)
metrics = HTTP.Connections.metrics(pool)
@test metrics[TCP].in_pool == 1
@test metrics[TCP].in_use == 0

# A second request should use this same connection and put it back again
HTTP.get("https://$httpbin/get"; pool=pool, socket_type_tls=TCP)
metrics = HTTP.Connections.metrics(pool)
@test metrics[TCP].in_pool == 1
@test metrics[TCP].in_use == 0

# Force a new connection -- the one in the pool should remain there.
c1 = HTTP.Connections.newconnection(TCP, httpbin, ""; forcenew=true, connect_timeout=3, pool=pool)
metrics = HTTP.Connections.metrics(pool)
@test metrics[TCP].in_pool == 1
@test metrics[TCP].in_use == 1

# Get another "new connection", since we didn't force new, this should use the one from the pool.
c2 = HTTP.Connections.newconnection(TCP, httpbin, ""; forcenew=false, connect_timeout=3, pool=pool)
metrics = HTTP.Connections.metrics(pool)
@test metrics[TCP].in_pool == 0
@test metrics[TCP].in_use == 2

# Release the first connection back to the pool
HTTP.Connections.releaseconnection(c1, true; pool=pool)
metrics = HTTP.Connections.metrics(pool)
@test metrics[TCP].in_pool == 1
@test metrics[TCP].in_use == 1

# Release the second connection but do not put back in the pool
HTTP.Connections.releaseconnection(c1, false; pool=pool)
metrics = HTTP.Connections.metrics(pool)
@test metrics[TCP].in_pool == 1
@test metrics[TCP].in_use == 0

# Test another connection type
SSL = MbedTLS.SSLContext
@test metrics[SSL].limit == 3
@test metrics[SSL].in_pool == 0
@test metrics[SSL].in_use == 0
HTTP.get("https://$httpbin/get"; pool=pool, socket_type_tls=SSL)
metrics = HTTP.Connections.metrics(pool)
@test metrics[SSL].limit == 3
@test metrics[SSL].in_pool == 1
@test metrics[SSL].in_use == 0
end

@testset "Retry all resolved IP addresses" begin
# See issue https://github.com/JuliaWeb/HTTP.jl/issues/672
# Bit tricky to test, but can at least be tested if localhost
Expand Down
Loading