From 4bf9b3e74944f9122fe2796015250d22bf106f3d Mon Sep 17 00:00:00 2001 From: Nick Robinson Date: Wed, 1 Nov 2023 14:43:24 +0000 Subject: [PATCH] Add functions to count object in use and objects in pool (#29) * Add functions to count object in use and objects in pool * Define `keytype` and `valtype` for `Pool` * fixup! Define `keytype` and `valtype` for `Pool` * fixup! Add functions to count object in use and objects in pool * Bump version * Rename in terms of "use" * Rename `max` -> `limit` * Docs for keytype/valtype --- Project.toml | 2 +- src/pools.jl | 89 +++++++++++++++++++++++++++++++++------------ test/pools.jl | 99 ++++++++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 161 insertions(+), 29 deletions(-) diff --git a/Project.toml b/Project.toml index 698fcaa..316aa30 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "ConcurrentUtilities" uuid = "f0e56b4a-5159-44fe-b623-3e5288b988bb" authors = ["Jacob Quinn "] -version = "2.2.1" +version = "2.3.0" [deps] Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b" diff --git a/src/pools.jl b/src/pools.jl index ee95c39..a4c0af9 100644 --- a/src/pools.jl +++ b/src/pools.jl @@ -4,39 +4,42 @@ export Pool, acquire, release, drain! import Base: acquire, release """ - Pool{T}(max::Int=4096) - Pool{K, T}(max::Int=4096) + Pool{T}(limit::Int=4096) + Pool{K, T}(limit::Int=4096) A threadsafe object for managing a pool of objects of type `T`, optionally keyed by objects -of type `K`. Objects can be requested by calling `acquire(f, pool, [key])`, where `f` is a +of type `K`. + +Objects can be requested by calling `acquire(f, pool, [key])`, where `f` is a function that returns a new object of type `T`. The `key` argument is optional and can be used to lookup objects that match a certain criteria -(a Dict is used internally, so matching is `isequal`). +(a `Dict` is used internally, so matching is `isequal`). -The `max` argument will limit the number of objects -that can be acquired at any given time. If the limit has been reached, `acquire` will -block until an object is returned to the pool via `release`. +The `limit` argument will limit the number of objects that can be in use at any given time. +If the limit has been reached, `acquire` will block until an object is released +via `release`. -By default, `release(pool, obj)` will return the object to the pool for reuse. -`release(pool)` will return the "permit" to the pool while not returning -any object for reuse. +- `release(pool, obj)` will return the object to the pool for reuse. +- `release(pool)` will decrement the number in use but not return any object for reuse. +- `drain!` can be used to remove objects that have been returned to the pool for reuse; + it does *not* release any objects that are in use. -`drain!` can be used to remove any cached objects for reuse, but it does *not* release -any active acquires. +See also `acquire`, `release`, `Pools.limit`, `Pools.in_use`, `Pools.in_pool`, `drain!`. +The key and object types can be inspected with `keytype` and `valtype` respectively. """ mutable struct Pool{K, T} lock::Threads.Condition - max::Int + limit::Int cur::Int keyedvalues::Dict{K, Vector{T}} values::Vector{T} - function Pool{K, T}(max::Int=4096) where {K, T} + function Pool{K, T}(limit::Int=4096) where {K, T} T === Nothing && throw(ArgumentError("Pool type can not be `Nothing`")) - x = new(Threads.Condition(), max, 0) + x = new(Threads.Condition(), limit, 0) if K === Nothing x.values = T[] - safesizehint!(x.values, max) + safesizehint!(x.values, limit) else x.keyedvalues = Dict{K, Vector{T}}() end @@ -44,13 +47,53 @@ mutable struct Pool{K, T} end end -Pool{T}(max::Int=4096) where {T} = Pool{Nothing, T}(max) +Pool{T}(limit::Int=4096) where {T} = Pool{Nothing, T}(limit) safesizehint!(x, n) = sizehint!(x, min(4096, n)) # determines whether we'll look up object caches in .keyedvalues or .values iskeyed(::Pool{K}) where {K} = K !== Nothing +""" + keytype(::Pool) + +Return the type of the keys for the pool. +If the pool is not keyed, this will return `Nothing`. +""" +Base.keytype(::Type{<:Pool{K}}) where {K} = K +Base.keytype(p::Pool) = keytype(typeof(p)) + +""" + valtype(::Pool) + +Return the type of the objects that can be stored in the pool. +""" +Base.valtype(::Type{<:Pool{<:Any, T}}) where {T} = T +Base.valtype(p::Pool) = valtype(typeof(p)) + +""" + Pools.limit(pool::Pool) -> Int + +Return the maximum number of objects permitted to be in use at the same time. +See `Pools.in_use(pool)` for the number of objects currently in use. +""" +limit(pool::Pool) = Base.@lock pool.lock pool.limit + +""" + Pools.in_use(pool::Pool) -> Int + +Return the number of objects currently in use. Less than or equal to `Pools.limit(pool)`. +""" +in_use(pool::Pool) = Base.@lock pool.lock pool.cur + +""" + Pools.in_pool(pool::Pool) -> Int + +Return the number of objects in the pool available for reuse. +""" +in_pool(pool::Pool) = Base.@lock pool.lock mapreduce(length, +, values(pool.keyedvalues); init=0) +in_pool(pool::Pool{Nothing}) = Base.@lock pool.lock length(pool.values) + """ drain!(pool) @@ -72,7 +115,7 @@ end TRUE(x) = true @noinline keyerror(key, K) = throw(ArgumentError("invalid key `$key` provided for pool key type $K")) -@noinline releaseerror() = throw(ArgumentError("cannot release permit when pool is empty")) +@noinline releaseerror() = throw(ArgumentError("cannot release when no objects are in use")) # NOTE: assumes you have the lock! function releasepermit(pool::Pool) @@ -92,19 +135,19 @@ The `forcenew` keyword argument can be used to force the creation of a new objec The `isvalid` keyword argument can be used to specify a function that will be called to determine if an object is still valid for reuse. By default, all objects are considered valid. If there are no objects available for reuse, `f` will be called to create a new object. -If the pool is already at its maximum capacity, `acquire` will block until an object is returned to the pool via `release`. +If the pool is already at its usage limit, `acquire` will block until an object is returned to the pool via `release`. """ function Base.acquire(f, pool::Pool{K, T}, key=nothing; forcenew::Bool=false, isvalid::Function=TRUE) where {K, T} key isa K || keyerror(key, K) Base.@lock pool.lock begin # first get a permit - while pool.cur >= pool.max + while pool.cur >= pool.limit wait(pool.lock) end pool.cur += 1 # now see if we can get an object from the pool for reuse if !forcenew - objs = iskeyed(pool) ? get!(() -> safesizehint!(T[], pool.max), pool.keyedvalues, key) : pool.values + objs = iskeyed(pool) ? get!(() -> safesizehint!(T[], pool.limit), pool.keyedvalues, key) : pool.values while !isempty(objs) obj = pop!(objs) isvalid(obj) && return obj @@ -126,10 +169,10 @@ end release(pool::Pool{K, T}, obj::T) release(pool::Pool{K, T}) -Return an object to a `pool`, optionally keyed by the provided `key`. +Release an object from usage by a `pool`, optionally keyed by the provided `key`. If `obj` is provided, it will be returned to the pool for reuse. Otherwise, if `nothing` is returned, or `release(pool)` is called, -just the "permit" will be returned to the pool. +the usage count will be decremented without an object being returned to the pool for reuse. """ function Base.release(pool::Pool{K, T}, key, obj::Union{T, Nothing}=nothing) where {K, T} key isa K || keyerror(key, K) diff --git a/test/pools.jl b/test/pools.jl index d2ff678..bd98521 100644 --- a/test/pools.jl +++ b/test/pools.jl @@ -1,24 +1,49 @@ -using ConcurrentUtilities, Test +using ConcurrentUtilities.Pools, Test @testset "Pools" begin + pool_size = length∘Pools.values @testset "nonkeyed and pool basics" begin pool = Pool{Int}(3) + @test keytype(pool) === Nothing + @test valtype(pool) === Int + + @test Pools.limit(pool) == 3 + @test Pools.in_use(pool) == 0 + @test Pools.in_pool(pool) == 0 + # acquire an object from the pool x1 = acquire(() -> 1, pool) # no existing objects in the pool, so our function was called to create a new one @test x1 == 1 + @test Pools.limit(pool) == 3 + @test Pools.in_use(pool) == 1 + @test Pools.in_pool(pool) == 0 + # release back to the pool for reuse release(pool, x1) + @test Pools.in_use(pool) == 0 + @test Pools.in_pool(pool) == 1 + # acquire another object from the pool x1 = acquire(() -> 2, pool) # this time, the pool had an existing object, so our function was not called @test x1 == 1 + @test Pools.in_use(pool) == 1 + @test Pools.in_pool(pool) == 0 + # but now there are no objects to reuse again, so the next acquire will call our function x2 = acquire(() -> 2, pool) @test x2 == 2 + @test Pools.in_use(pool) == 2 + @test Pools.in_pool(pool) == 0 + x3 = acquire(() -> 3, pool) @test x3 == 3 - # the pool is now at capacity, so the next acquire will block until an object is released + @test Pools.in_use(pool) == 3 + @test Pools.in_pool(pool) == 0 + + # the pool is now at `Pools.limit`, so the next acquire will block until an object is released + @test Pools.in_use(pool) == Pools.limit(pool) tsk = @async acquire(() -> 4, pool; forcenew=true) yield() @test !istaskdone(tsk) @@ -28,60 +53,110 @@ using ConcurrentUtilities, Test x1 = fetch(tsk) # even though we released 1 for reuse, we passed forcenew, so our function was called to create new @test x1 == 4 + @test Pools.in_use(pool) == 3 + @test Pools.in_pool(pool) == 1 + # error to try and provide a key to a non-keyed pool @test_throws ArgumentError acquire(() -> 1, pool, 1) + # release objects back to the pool release(pool, x1) release(pool, x2) release(pool, x3) + @test Pools.in_use(pool) == 0 + @test Pools.in_pool(pool) == 4 + # acquire an object, but checking isvalid x1 = acquire(() -> 5, pool; isvalid=x -> x == 1) @test x1 == 1 + @test Pools.in_use(pool) == 1 + # no valid objects, so our function was called to create a new one x2 = acquire(() -> 6, pool; isvalid=x -> x == 1) @test x2 == 6 - # we have one slot left in the pool, we now throw while creating new + @test Pools.in_use(pool) == 2 + + # we have one permit left, we now throw while creating a new object # and we want to test that the permit isn't permanently lost for the pool @test_throws ErrorException acquire(() -> error("oops"), pool; forcenew=true) + @test Pools.in_use(pool) == 2 + # we can still acquire a new object x3 = acquire(() -> 7, pool; forcenew=true) @test x3 == 7 + @test Pools.in_use(pool) == 3 + # release objects back to the pool + drain!(pool) release(pool, x1) release(pool, x2) release(pool, x3) + @test Pools.in_use(pool) == 0 + @test Pools.in_pool(pool) == 3 + # try to do an invalid release @test_throws ArgumentError release(pool, 10) + # test that the invalid release didn't push the object to our pool for reuse x1 = acquire(() -> 8, pool) @test x1 == 7 + @test Pools.in_use(pool) == 1 + @test Pools.in_pool(pool) == 2 # calling drain! removes all objects for reuse drain!(pool) + @test Pools.in_use(pool) == 1 + @test Pools.in_pool(pool) == 0 + x2 = acquire(() -> 9, pool) @test x2 == 9 + @test Pools.in_use(pool) == 2 + @test Pools.in_pool(pool) == 0 end @testset "keyed pool" begin # now test a keyed pool pool = Pool{String, Int}(3) + @test keytype(pool) === String + @test valtype(pool) === Int + + @test Pools.limit(pool) == 3 + @test Pools.in_use(pool) == 0 + @test Pools.in_pool(pool) == 0 + # acquire an object from the pool x1 = acquire(() -> 1, pool, "a") # no existing objects in the pool, so our function was called to create a new one @test x1 == 1 + @test Pools.in_use(pool) == 1 + @test Pools.in_pool(pool) == 0 + # release back to the pool for reuse release(pool, "a", x1) + @test Pools.in_use(pool) == 0 + @test Pools.in_pool(pool) == 1 + # test for a different key x2 = acquire(() -> 2, pool, "b") # there's an existing object, but for a different key, so we don't reuse @test x2 == 2 + @test Pools.in_use(pool) == 1 + @test Pools.in_pool(pool) == 1 + # acquire another object from the pool x1 = acquire(() -> 2, pool, "a") # this time, the pool had an existing object, so our function was not called @test x1 == 1 + @test Pools.in_use(pool) == 2 + @test Pools.in_pool(pool) == 0 + x3 = acquire(() -> 3, pool, "a") @test x3 == 3 + @test Pools.in_use(pool) == 3 + @test Pools.in_pool(pool) == 0 + # the pool is now at capacity, so the next acquire will block until an object is released # even though we've acquired using different keys, the capacity is shared across the pool + @test Pools.in_use(pool) == Pools.limit(pool) tsk = @async acquire(() -> 4, pool, "c"; forcenew=true) yield() @test !istaskdone(tsk) @@ -91,13 +166,27 @@ using ConcurrentUtilities, Test x1 = fetch(tsk) # even though we released 1 for reuse, we passed forcenew, so our function was called to create new @test x1 == 4 + @test Pools.in_use(pool) == 3 + @test Pools.in_pool(pool) == 1 + # error to try and provide an invalid key to a keyed pool @test_throws ArgumentError acquire(() -> 1, pool, 1) - # error to release an invalid key back to the pool - @test_throws KeyError release(pool, "z", 1) + @test Pools.in_use(pool) == 3 + @test Pools.in_pool(pool) == 1 + # error to *not* provide a key to a keyed pool @test_throws ArgumentError acquire(() -> 1, pool) + @test Pools.in_use(pool) == 3 + @test Pools.in_pool(pool) == 1 + # error to *not* provide a key when releasing to a keyed pool @test_throws ArgumentError release(pool) + @test Pools.in_use(pool) == 3 + @test Pools.in_pool(pool) == 1 + + # error to release an invalid key back to the pool + @test_throws KeyError release(pool, "z", 1) + @test_broken Pools.in_use(pool) == 3 + @test Pools.in_pool(pool) == 1 end end