diff --git a/Project.toml b/Project.toml index 565193acb..f39d175d9 100644 --- a/Project.toml +++ b/Project.toml @@ -22,7 +22,7 @@ UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4" [compat] CodecZlib = "0.7" -ConcurrentUtilities = "2.2" +ConcurrentUtilities = "2.3" ExceptionUnwrapping = "0.1" LoggingExtras = "0.4.9,1" MbedTLS = "0.6.8, 0.7, 1" diff --git a/src/Connections.jl b/src/Connections.jl index 60d6eb9b7..ae9635def 100644 --- a/src/Connections.jl +++ b/src/Connections.jl @@ -350,10 +350,10 @@ end const CPool{T} = ConcurrentUtilities.Pool{ConnectionKeyType, Connection{T}} """ - HTTP.Pool(max::Int=HTTP.default_connection_limit[]) + HTTP.Pool(limit::Int=HTTP.default_connection_limit[]) Connection pool for managing the reuse of HTTP connections. -`max` controls the maximum number of concurrent connections allowed +`limit` controls the maximum number of concurrent connections allowed and defaults to the `HTTP.default_connection_limit` value. A pool can be passed to any of the `HTTP.request` methods via the `pool` keyword argument. @@ -364,17 +364,17 @@ struct Pool mbedtls::CPool{MbedTLS.SSLContext} openssl::CPool{OpenSSL.SSLStream} other::IdDict{Type, CPool} - max::Int + limit::Int end -function Pool(max::Union{Int, Nothing}=nothing) - max = something(max, default_connection_limit[]) +function Pool(limit::Union{Int, Nothing}=nothing) + limit = something(limit, default_connection_limit[]) return Pool(ReentrantLock(), - CPool{Sockets.TCPSocket}(max), - CPool{MbedTLS.SSLContext}(max), - CPool{OpenSSL.SSLStream}(max), + CPool{Sockets.TCPSocket}(limit), + CPool{MbedTLS.SSLContext}(limit), + CPool{OpenSSL.SSLStream}(limit), IdDict{Type, CPool}(), - max, + limit, ) end @@ -384,6 +384,64 @@ const MBEDTLS_POOL = Ref{CPool{MbedTLS.SSLContext}}() const OPENSSL_POOL = Ref{CPool{OpenSSL.SSLStream}}() const OTHER_POOL = Lockable(IdDict{Type, CPool}()) +""" + HTTP.Connections.metrics([nothing]) -> IdDict{Type,Metrics} + +Return a dictionary of connection metrics, keyed by the connection type, for the default global pool. +""" +function metrics(pool::Nothing=nothing) + return IdDict{Type,Metrics}( + Sockets.TCPSocket => Metrics(TCP_POOL[]), + MbedTLS.SSLContext => Metrics(MBEDTLS_POOL[]), + OpenSSL.SSLStream => Metrics(OPENSSL_POOL[]), + (Base.@lock OTHER_POOL.lock (k => Metrics(v) for (k, v) in OTHER_POOL[]))..., + ) +end + +""" + HTTP.Connections.metrics(pool::Pool) -> IdDict{Type,Metrics} + +Return a dictionary of connection metrics, keyed by the connection type, for the given `pool`. +""" +function metrics(pool::Pool) + return IdDict{Type,Metrics}( + Sockets.TCPSocket => Metrics(pool.tcp), + MbedTLS.SSLContext => Metrics(pool.mbedtls), + OpenSSL.SSLStream => Metrics(pool.openssl), + (Base.@lock pool.lock (k => Metrics(v) for (k, v) in pool.other))..., + ) +end + +Base.@kwdef struct Metrics + limit::Int + in_use::Int + in_pool::Int +end + +""" + Metrics(cpool::$CPool) + +Metrics for the given connection pool: +- `limit`: the maximum number of connections allowed to be in-use at the same time. +- `in_use`: the number of connections currently in use. +- `in_pool`: the number of connections available for re-use. +""" +function Metrics(cpool::CPool) + return Metrics( + limit=ConcurrentUtilities.Pools.limit(cpool), + in_use=ConcurrentUtilities.Pools.in_use(cpool), + in_pool=ConcurrentUtilities.Pools.in_pool(cpool), + ) +end + +function Base.show(io::IO, m::Metrics) + print(io, "Metrics(") + print(io, "limit=", m.limit) + print(io, ", in_use=", m.in_use) + print(io, ", in_pool=", m.in_pool) + print(io, ")") +end + getpool(::Nothing, ::Type{Sockets.TCPSocket}) = TCP_POOL[] getpool(::Nothing, ::Type{MbedTLS.SSLContext}) = MBEDTLS_POOL[] getpool(::Nothing, ::Type{OpenSSL.SSLStream}) = OPENSSL_POOL[] @@ -399,7 +457,7 @@ function getpool(pool::Pool, ::Type{T})::CPool{T} where {T} elseif T === OpenSSL.SSLStream return pool.openssl else - return Base.@lock pool.lock get!(() -> CPool{T}(pool.max), pool.other, T) + return Base.@lock pool.lock get!(() -> CPool{T}(pool.limit), pool.other, T) end end @@ -451,22 +509,21 @@ function newconnection(::Type{T}, keepalive::Bool=true, kw...) where {T <: IO} connection_limit_warning(connection_limit) - return acquire( - getpool(pool, T), - (host, port, require_ssl_verification, keepalive, true); - forcenew=forcenew, - isvalid=c->connection_isvalid(c, Int(idle_timeout))) do - Connection(host, port, - idle_timeout, require_ssl_verification, keepalive, - connect_timeout > 0 ? - try_with_timeout(_ -> - getconnection(T, host, port; - require_ssl_verification=require_ssl_verification, keepalive=keepalive, kw...), - connect_timeout) : - getconnection(T, host, port; - require_ssl_verification=require_ssl_verification, keepalive=keepalive, kw...) - ) + function connect(timeout) + if timeout > 0 + try_with_timeout(timeout) do _ + getconnection(T, host, port; require_ssl_verification=require_ssl_verification, keepalive=keepalive, kw...) + end + else + getconnection(T, host, port; require_ssl_verification=require_ssl_verification, keepalive=keepalive, kw...) + end end + newconn() = Connection(host, port, idle_timeout, require_ssl_verification, keepalive, connect(connect_timeout)) + key = (host, port, require_ssl_verification, keepalive, true) + return acquire( + newconn, getpool(pool, T), key; + forcenew=forcenew, isvalid=c->connection_isvalid(c, Int(idle_timeout)), + ) end function releaseconnection(c::Connection{T}, reuse; pool::Union{Nothing, Pool}=nothing, kw...) where {T} diff --git a/test/client.jl b/test/client.jl index 868ac8ded..6bceb3740 100644 --- a/test/client.jl +++ b/test/client.jl @@ -14,11 +14,13 @@ using InteractiveUtils: @which using ConcurrentUtilities # test we can adjust default_connection_limit -for x in (10, 12) - HTTP.set_default_connection_limit!(x) - @test HTTP.Connections.TCP_POOL[].max == x - @test HTTP.Connections.MBEDTLS_POOL[].max == x - @test HTTP.Connections.OPENSSL_POOL[].max == x +@testset "set_default_connection_limit!" begin + for x in (10, 12) + HTTP.set_default_connection_limit!(x) + @test HTTP.Connections.TCP_POOL[].limit == x + @test HTTP.Connections.MBEDTLS_POOL[].limit == x + @test HTTP.Connections.OPENSSL_POOL[].limit == x + end end @testset "@client macro" begin @@ -325,11 +327,11 @@ end end @testset "connect_timeout does not include the time needed to acquire a connection from the pool" begin - connection_limit = HTTP.Connections.TCP_POOL[].max + connection_limit = HTTP.Connections.TCP_POOL[].limit try dummy_conn = HTTP.Connection(Sockets.TCPSocket()) HTTP.set_default_connection_limit!(1) - @assert HTTP.Connections.TCP_POOL[].max == 1 + @assert HTTP.Connections.TCP_POOL[].limit == 1 # drain the pool acquire(()->dummy_conn, HTTP.Connections.TCP_POOL[], HTTP.Connections.connectionkey(dummy_conn)) # Put it back in 10 seconds @@ -342,6 +344,65 @@ end end end +@testset "Connections.metrics" begin + # Test that `metrics` function returns the expected values as we use / release connections. + # Initialise a pool, and check that it is empty and has the expected limit + pool = HTTP.Pool(3) + for (T, v) in HTTP.Connections.metrics(pool) + @test v.limit == 3 + @test v.in_pool == 0 + @test v.in_use == 0 + end + + TCP = Sockets.TCPSocket + # After a request, check the connection is put in the pool for reuse + HTTP.get("https://$httpbin/get"; pool=pool, socket_type_tls=TCP) + metrics = HTTP.Connections.metrics(pool) + @test metrics[TCP].in_pool == 1 + @test metrics[TCP].in_use == 0 + + # A second request should use this same connection and put it back again + HTTP.get("https://$httpbin/get"; pool=pool, socket_type_tls=TCP) + metrics = HTTP.Connections.metrics(pool) + @test metrics[TCP].in_pool == 1 + @test metrics[TCP].in_use == 0 + + # Force a new connection -- the one in the pool should remain there. + c1 = HTTP.Connections.newconnection(TCP, httpbin, ""; forcenew=true, connect_timeout=3, pool=pool) + metrics = HTTP.Connections.metrics(pool) + @test metrics[TCP].in_pool == 1 + @test metrics[TCP].in_use == 1 + + # Get another "new connection", since we didn't force new, this should use the one from the pool. + c2 = HTTP.Connections.newconnection(TCP, httpbin, ""; forcenew=false, connect_timeout=3, pool=pool) + metrics = HTTP.Connections.metrics(pool) + @test metrics[TCP].in_pool == 0 + @test metrics[TCP].in_use == 2 + + # Release the first connection back to the pool + HTTP.Connections.releaseconnection(c1, true; pool=pool) + metrics = HTTP.Connections.metrics(pool) + @test metrics[TCP].in_pool == 1 + @test metrics[TCP].in_use == 1 + + # Release the second connection but do not put back in the pool + HTTP.Connections.releaseconnection(c1, false; pool=pool) + metrics = HTTP.Connections.metrics(pool) + @test metrics[TCP].in_pool == 1 + @test metrics[TCP].in_use == 0 + + # Test another connection type + SSL = MbedTLS.SSLContext + @test metrics[SSL].limit == 3 + @test metrics[SSL].in_pool == 0 + @test metrics[SSL].in_use == 0 + HTTP.get("https://$httpbin/get"; pool=pool, socket_type_tls=SSL) + metrics = HTTP.Connections.metrics(pool) + @test metrics[SSL].limit == 3 + @test metrics[SSL].in_pool == 1 + @test metrics[SSL].in_use == 0 +end + @testset "Retry all resolved IP addresses" begin # See issue https://github.com/JuliaWeb/HTTP.jl/issues/672 # Bit tricky to test, but can at least be tested if localhost